qubit-batch 0.4.5

One-shot batch execution and processing with sequential and scoped parallel utilities
Documentation
/*******************************************************************************
 *
 *    Copyright (c) 2025 - 2026 Haixing Hu.
 *
 *    SPDX-License-Identifier: Apache-2.0
 *
 *    Licensed under the Apache License, Version 2.0.
 *
 ******************************************************************************/
//! Tests for [`SequentialBatchExecutor`](qubit_batch::SequentialBatchExecutor).

use std::{
    panic::{
        AssertUnwindSafe,
        catch_unwind,
    },
    sync::Arc,
    time::Duration,
};

use qubit_atomic::ArcAtomicCount;
use qubit_batch::{
    BatchExecutionError,
    BatchExecutor,
    SequentialBatchExecutor,
};
use qubit_function::Runnable;

use crate::support::{
    PanickingProgressReporter,
    ProgressEvent,
    ProgressPanicPhase,
    RecordingProgressReporter,
    TestTask,
    panic_payload_message,
};

#[test]
fn test_sequential_batch_executor_executes_successfully() {
    let executor = SequentialBatchExecutor::new();
    let counter = ArcAtomicCount::zero();
    let tasks = vec![
        TestTask::count_success(counter.clone()),
        TestTask::count_success(counter.clone()),
        TestTask::count_success(counter.clone()),
    ];

    let result = executor
        .execute(tasks, 3)
        .expect("sequential batch should succeed");

    assert_eq!(counter.get(), 3);
    assert_eq!(result.completed_count(), 3);
    assert_eq!(result.succeeded_count(), 3);
    assert_eq!(result.failure_count(), 0);
}

#[test]
fn test_sequential_batch_executor_accepts_non_debug_errors() {
    let executor = SequentialBatchExecutor::new();
    let result = executor.execute([NonDebugTask], 1);

    match result {
        Ok(outcome) => assert!(outcome.is_success()),
        Err(_) => panic!("non-debug task error type should be accepted"),
    }
}

#[test]
fn test_sequential_batch_executor_accessors_and_value_reporter() {
    let executor = SequentialBatchExecutor::new()
        .with_reporter(RecordingProgressReporter::new())
        .with_report_interval(Duration::from_millis(25));

    assert_eq!(executor.report_interval(), Duration::from_millis(25));
    assert!(Arc::strong_count(executor.reporter()) >= 1);
}

#[test]
fn test_sequential_batch_executor_collects_failures_and_panics() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![
        TestTask::succeed(),
        TestTask::fail("failed"),
        TestTask::panic("panic in sequential batch"),
    ];

    let result = executor
        .execute(tasks, 3)
        .expect("task failures should stay in the batch result");

    assert_eq!(result.completed_count(), 3);
    assert_eq!(result.succeeded_count(), 1);
    assert_eq!(result.failed_count(), 1);
    assert_eq!(result.panicked_count(), 1);
    assert_eq!(result.failures().len(), 2);
    assert_eq!(result.failures()[0].index(), 1);
    assert_eq!(result.failures()[1].index(), 2);
    assert_eq!(
        result.failures()[1].error().panic_message(),
        Some("panic in sequential batch")
    );
}

#[test]
fn test_sequential_batch_executor_records_non_string_panic_without_message() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![TestTask::panic_usize(7)];

    let result = executor
        .execute(tasks, 1)
        .expect("task panic should stay in the batch result");

    assert_eq!(result.completed_count(), 1);
    assert_eq!(result.panicked_count(), 1);
    assert_eq!(result.failures()[0].error().panic_message(), None);
}

#[test]
fn test_sequential_batch_executor_reports_count_shortfall() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![TestTask::succeed(), TestTask::succeed()];

    let error = executor
        .execute(tasks, 3)
        .expect_err("shortfall should be reported");

    match error {
        BatchExecutionError::CountShortfall {
            expected,
            actual,
            outcome,
        } => {
            assert_eq!(expected, 3);
            assert_eq!(actual, 2);
            assert_eq!(outcome.completed_count(), 2);
        }
        other => panic!("unexpected error: {other:?}"),
    }
}

#[test]
fn test_sequential_batch_executor_reports_count_exceeded() {
    let executor = SequentialBatchExecutor::new();
    let tasks = vec![TestTask::succeed(), TestTask::succeed()];

    let error = executor
        .execute(tasks, 1)
        .expect_err("overflow should be reported");

    match error {
        BatchExecutionError::CountExceeded {
            expected,
            observed_at_least,
            outcome,
        } => {
            assert_eq!(expected, 1);
            assert_eq!(observed_at_least, 2);
            assert_eq!(outcome.completed_count(), 1);
        }
        other => panic!("unexpected error: {other:?}"),
    }
}

#[test]
fn test_sequential_batch_executor_reports_progress() {
    let reporter = Arc::new(RecordingProgressReporter::new());
    let executor = SequentialBatchExecutor::new()
        .with_reporter_arc(reporter.clone())
        .with_report_interval(Duration::from_millis(10));
    let tasks = vec![
        TestTask::sleep_success(Duration::from_millis(20)),
        TestTask::sleep_success(Duration::from_millis(20)),
        TestTask::sleep_success(Duration::from_millis(20)),
    ];

    let result = executor
        .execute(tasks, 3)
        .expect("sequential batch should succeed");
    let events = reporter.events();

    assert_eq!(result.completed_count(), 3);
    assert!(matches!(
        events.first(),
        Some(ProgressEvent::Start { total_count: 3 })
    ));
    assert!(events.iter().any(|event| matches!(
        event,
        ProgressEvent::Process {
            total_count: 3,
            active_count: 0,
            completed_count,
            ..
        } if *completed_count >= 1
    )));
    assert!(matches!(
        events.last(),
        Some(ProgressEvent::Finish { total_count: 3, .. })
    ));
}

#[test]
fn test_sequential_batch_executor_reports_progress_with_zero_interval() {
    let reporter = Arc::new(RecordingProgressReporter::new());
    let executor = SequentialBatchExecutor::new()
        .with_reporter_arc(reporter.clone())
        .with_report_interval(Duration::ZERO);
    let tasks = vec![TestTask::succeed(), TestTask::succeed()];

    let result = executor
        .execute(tasks, 2)
        .expect("sequential batch should succeed");
    let events = reporter.events();

    assert_eq!(result.completed_count(), 2);
    assert!(events.iter().any(|event| matches!(
        event,
        ProgressEvent::Process {
            total_count: 2,
            active_count: 0,
            completed_count,
            ..
        } if *completed_count >= 1
    )));
}

#[test]
fn test_sequential_batch_executor_propagates_progress_reporter_start_panic() {
    const PANIC_MESSAGE: &str = "progress reporter start panic";
    let executor = SequentialBatchExecutor::new().with_reporter(PanickingProgressReporter::new(
        ProgressPanicPhase::Start,
        PANIC_MESSAGE,
    ));
    let tasks = vec![TestTask::succeed()];

    let payload = catch_unwind(AssertUnwindSafe(|| executor.execute(tasks, 1)))
        .expect_err("progress reporter start panic should be propagated");

    assert_eq!(panic_payload_message(payload.as_ref()), Some(PANIC_MESSAGE));
}

#[test]
fn test_sequential_batch_executor_propagates_progress_reporter_process_panic() {
    const PANIC_MESSAGE: &str = "progress reporter process panic";
    let executor = SequentialBatchExecutor::new()
        .with_reporter(PanickingProgressReporter::new(
            ProgressPanicPhase::Process,
            PANIC_MESSAGE,
        ))
        .with_report_interval(Duration::from_nanos(1));
    let tasks = vec![TestTask::sleep_success(Duration::from_millis(1))];

    let payload = catch_unwind(AssertUnwindSafe(|| executor.execute(tasks, 1)))
        .expect_err("progress reporter process panic should be propagated");

    assert_eq!(panic_payload_message(payload.as_ref()), Some(PANIC_MESSAGE));
}

#[test]
fn test_sequential_batch_executor_propagates_progress_reporter_finish_panic() {
    const PANIC_MESSAGE: &str = "progress reporter finish panic";
    let executor = SequentialBatchExecutor::new().with_reporter(PanickingProgressReporter::new(
        ProgressPanicPhase::Finish,
        PANIC_MESSAGE,
    ));
    let tasks = vec![TestTask::succeed()];

    let payload = catch_unwind(AssertUnwindSafe(|| executor.execute(tasks, 1)))
        .expect_err("progress reporter finish panic should be propagated");

    assert_eq!(panic_payload_message(payload.as_ref()), Some(PANIC_MESSAGE));
}

/// Error type that intentionally does not implement [`std::fmt::Debug`].
struct NonDebugError;

/// Runnable task used to prove executor APIs do not require debug errors.
struct NonDebugTask;

impl Runnable<NonDebugError> for NonDebugTask {
    /// Runs successfully without constructing the non-debug error.
    ///
    /// # Returns
    ///
    /// Always returns `Ok(())`.
    fn run(&mut self) -> Result<(), NonDebugError> {
        Ok(())
    }
}