qubit-batch 0.3.1

Batch task execution abstractions and sequential executor with progress reporting
Documentation
/*******************************************************************************
 *
 *    Copyright (c) 2025 - 2026 Haixing Hu.
 *
 *    SPDX-License-Identifier: Apache-2.0
 *
 *    Licensed under the Apache License, Version 2.0.
 *
 ******************************************************************************/
//! Tests for [`SequentialBatchExecutor`](qubit_batch::SequentialBatchExecutor).

use std::{
    panic::{
        AssertUnwindSafe,
        catch_unwind,
    },
    sync::{
        Arc,
        atomic::{
            AtomicUsize,
            Ordering,
        },
    },
    time::Duration,
};

use qubit_batch::{
    BatchExecutionError,
    BatchExecutor,
    SequentialBatchExecutor,
};

use crate::support::{
    PanickingProgressReporter,
    ProgressEvent,
    ProgressPanicPhase,
    RecordingProgressReporter,
    TestCallable,
    TestTask,
    panic_payload_message,
};

#[test]
fn test_sequential_batch_executor_executes_successfully() {
    let executor = SequentialBatchExecutor::new();
    let counter = Arc::new(AtomicUsize::new(0));
    let tasks = vec![
        TestTask::count_success(Arc::clone(&counter)),
        TestTask::count_success(Arc::clone(&counter)),
        TestTask::count_success(Arc::clone(&counter)),
    ];

    let result = executor
        .execute(tasks, 3)
        .expect("sequential batch should succeed");

    assert_eq!(counter.load(Ordering::Acquire), 3);
    assert_eq!(result.completed_count(), 3);
    assert_eq!(result.succeeded_count(), 3);
    assert_eq!(result.failure_count(), 0);
}

#[test]
fn test_sequential_batch_executor_accessors_and_value_reporter() {
    let executor = SequentialBatchExecutor::new()
        .with_reporter(RecordingProgressReporter::new())
        .with_report_interval(Duration::from_millis(25));

    assert_eq!(executor.report_interval(), Duration::from_millis(25));
    executor.reporter().start(1);
}

#[test]
fn test_sequential_batch_executor_collects_failures_and_panics() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![
        TestTask::succeed(),
        TestTask::fail("failed"),
        TestTask::panic("panic in sequential batch"),
    ];

    let result = executor
        .execute(tasks, 3)
        .expect("task failures should stay in the batch result");

    assert_eq!(result.completed_count(), 3);
    assert_eq!(result.succeeded_count(), 1);
    assert_eq!(result.failed_count(), 1);
    assert_eq!(result.panicked_count(), 1);
    assert_eq!(result.failures().len(), 2);
    assert_eq!(result.failures()[0].index(), 1);
    assert_eq!(result.failures()[1].index(), 2);
    assert_eq!(
        result.failures()[1].error().panic_message(),
        Some("panic in sequential batch")
    );
}

#[test]
fn test_sequential_batch_executor_calls_callables_and_collects_values() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![
        TestCallable::returning(10),
        TestCallable::returning(20),
        TestCallable::returning(30),
    ];

    let result = executor.call(tasks, 3).expect("call batch should succeed");

    assert_eq!(result.execution_result().completed_count(), 3);
    assert_eq!(result.values(), &[Some(10), Some(20), Some(30)]);
    assert_eq!(result.into_values(), vec![Some(10), Some(20), Some(30)]);

    let tasks = vec![TestCallable::returning(40)];
    let result = executor.call(tasks, 1).expect("call batch should succeed");
    assert_eq!(result.into_execution_result().completed_count(), 1);

    let tasks = vec![TestCallable::returning(50)];
    let result = executor.call(tasks, 1).expect("call batch should succeed");
    let (execution_result, values) = result.into_parts();
    assert_eq!(execution_result.completed_count(), 1);
    assert_eq!(values, vec![Some(50)]);
}

#[test]
fn test_sequential_batch_executor_call_preserves_failure_indexes() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![
        TestCallable::returning(10),
        TestCallable::fail("failed"),
        TestCallable::panic("panic in callable"),
        TestCallable::returning(40),
    ];

    let result = executor
        .call(tasks, 4)
        .expect("callable failures should stay in the batch result");

    assert_eq!(result.values(), &[Some(10), None, None, Some(40)]);
    assert_eq!(result.execution_result().failed_count(), 1);
    assert_eq!(result.execution_result().panicked_count(), 1);
    assert_eq!(result.execution_result().failures()[0].index(), 1);
    assert_eq!(result.execution_result().failures()[1].index(), 2);
}

#[test]
fn test_sequential_batch_executor_records_non_string_panic_without_message() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![TestTask::panic_usize(7)];

    let result = executor
        .execute(tasks, 1)
        .expect("task panic should stay in the batch result");

    assert_eq!(result.completed_count(), 1);
    assert_eq!(result.panicked_count(), 1);
    assert_eq!(result.failures()[0].error().panic_message(), None);
}

#[test]
fn test_sequential_batch_executor_reports_count_shortfall() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![TestTask::succeed(), TestTask::succeed()];

    let error = executor
        .execute(tasks, 3)
        .expect_err("shortfall should be reported");

    match error {
        BatchExecutionError::CountShortfall {
            expected,
            actual,
            result,
        } => {
            assert_eq!(expected, 3);
            assert_eq!(actual, 2);
            assert_eq!(result.completed_count(), 2);
        }
        other => panic!("unexpected error: {other:?}"),
    }
}

#[test]
fn test_sequential_batch_executor_reports_count_exceeded() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![TestTask::succeed(), TestTask::succeed()];

    let error = executor
        .execute(tasks, 1)
        .expect_err("overflow should be reported");

    match error {
        BatchExecutionError::CountExceeded {
            expected,
            observed_at_least,
            result,
        } => {
            assert_eq!(expected, 1);
            assert_eq!(observed_at_least, 2);
            assert_eq!(result.completed_count(), 1);
        }
        other => panic!("unexpected error: {other:?}"),
    }
}

#[test]
fn test_sequential_batch_executor_for_each_maps_items() {
    let executor = SequentialBatchExecutor::new();

    let result = executor
        .for_each(0..4, 4, |value| {
            if value == 2 {
                Err("bad item")
            } else {
                Ok::<(), &'static str>(())
            }
        })
        .expect("for_each should keep item failures in the result");

    assert_eq!(result.completed_count(), 4);
    assert_eq!(result.succeeded_count(), 3);
    assert_eq!(result.failed_count(), 1);
    assert_eq!(result.failures()[0].index(), 2);
}

#[test]
fn test_sequential_batch_executor_reports_progress() {
    let reporter = Arc::new(RecordingProgressReporter::new());
    let executor = SequentialBatchExecutor::new()
        .with_reporter_arc(reporter.clone())
        .with_report_interval(Duration::from_millis(10));
    let tasks = vec![
        TestTask::sleep_success(Duration::from_millis(20)),
        TestTask::sleep_success(Duration::from_millis(20)),
        TestTask::sleep_success(Duration::from_millis(20)),
    ];

    let result = executor
        .execute(tasks, 3)
        .expect("sequential batch should succeed");
    let events = reporter.events();

    assert_eq!(result.completed_count(), 3);
    assert!(matches!(
        events.first(),
        Some(ProgressEvent::Start { total_count: 3 })
    ));
    assert!(events.iter().any(|event| matches!(
        event,
        ProgressEvent::Process {
            total_count: 3,
            active_count: 0,
            completed_count,
            ..
        } if *completed_count >= 1
    )));
    assert!(matches!(
        events.last(),
        Some(ProgressEvent::Finish { total_count: 3, .. })
    ));
}

#[test]
fn test_sequential_batch_executor_propagates_progress_reporter_start_panic() {
    const PANIC_MESSAGE: &str = "progress reporter start panic";
    let executor = SequentialBatchExecutor::new().with_reporter(PanickingProgressReporter::new(
        ProgressPanicPhase::Start,
        PANIC_MESSAGE,
    ));
    let tasks = vec![TestTask::succeed()];

    let payload = catch_unwind(AssertUnwindSafe(|| executor.execute(tasks, 1)))
        .expect_err("progress reporter start panic should be propagated");

    assert_eq!(panic_payload_message(payload.as_ref()), Some(PANIC_MESSAGE));
}

#[test]
fn test_sequential_batch_executor_propagates_progress_reporter_process_panic() {
    const PANIC_MESSAGE: &str = "progress reporter process panic";
    let executor = SequentialBatchExecutor::new()
        .with_reporter(PanickingProgressReporter::new(
            ProgressPanicPhase::Process,
            PANIC_MESSAGE,
        ))
        .with_report_interval(Duration::from_nanos(1));
    let tasks = vec![TestTask::sleep_success(Duration::from_millis(1))];

    let payload = catch_unwind(AssertUnwindSafe(|| executor.execute(tasks, 1)))
        .expect_err("progress reporter process panic should be propagated");

    assert_eq!(panic_payload_message(payload.as_ref()), Some(PANIC_MESSAGE));
}

#[test]
fn test_sequential_batch_executor_propagates_progress_reporter_finish_panic() {
    const PANIC_MESSAGE: &str = "progress reporter finish panic";
    let executor = SequentialBatchExecutor::new().with_reporter(PanickingProgressReporter::new(
        ProgressPanicPhase::Finish,
        PANIC_MESSAGE,
    ));
    let tasks = vec![TestTask::succeed()];

    let payload = catch_unwind(AssertUnwindSafe(|| executor.execute(tasks, 1)))
        .expect_err("progress reporter finish panic should be propagated");

    assert_eq!(panic_payload_message(payload.as_ref()), Some(PANIC_MESSAGE));
}