Skip to main content

qubit_batch/execute/impls/
sequential_batch_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    panic::{
12        AssertUnwindSafe,
13        catch_unwind,
14    },
15    sync::Arc,
16    time::Duration,
17};
18
19use qubit_function::Runnable;
20use qubit_progress::{
21    Progress,
22    reporter::ProgressReporter,
23};
24
25use crate::{
26    BatchExecutionError,
27    BatchOutcome,
28    execute::{
29        BatchExecutionState,
30        BatchExecutor,
31        panic_payload_to_error,
32    },
33};
34
35use super::SequentialBatchExecutorBuilder;
36
37/// Executes a whole batch sequentially on the caller thread.
38///
39/// Progress updates are emitted only between tasks. A long-running single task
40/// therefore does not produce intermediate sequential progress callbacks.
41///
42/// ```rust
43/// use qubit_batch::{
44///     BatchExecutor,
45///     SequentialBatchExecutor,
46/// };
47///
48/// let outcome = SequentialBatchExecutor::new()
49///     .for_each(["a", "b", "c"], |item| {
50///         assert!(!item.is_empty());
51///         Ok::<(), &'static str>(())
52///     })
53///     .expect("array length should be exact");
54///
55/// assert!(outcome.is_success());
56/// ```
57#[derive(Clone)]
58pub struct SequentialBatchExecutor {
59    /// Interval between progress callbacks while the batch is running.
60    pub(crate) report_interval: Duration,
61    /// Reporter receiving batch lifecycle callbacks.
62    pub(crate) reporter: Arc<dyn ProgressReporter>,
63}
64
65impl SequentialBatchExecutor {
66    /// Default interval between progress callbacks.
67    pub const DEFAULT_REPORT_INTERVAL: Duration = Duration::from_secs(5);
68
69    /// Creates a sequential batch executor with default configuration.
70    ///
71    /// # Returns
72    ///
73    /// A sequential batch executor using no-op progress reporting.
74    #[inline]
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Creates a builder for configuring a sequential batch executor.
80    ///
81    /// # Returns
82    ///
83    /// A builder initialized with default settings.
84    #[inline]
85    pub fn builder() -> SequentialBatchExecutorBuilder {
86        SequentialBatchExecutorBuilder::default()
87    }
88
89    /// Returns the configured progress-report interval.
90    ///
91    /// # Returns
92    ///
93    /// The minimum time between due-based running progress callbacks.
94    #[inline]
95    pub const fn report_interval(&self) -> Duration {
96        self.report_interval
97    }
98
99    /// Returns the progress reporter used by this executor.
100    ///
101    /// # Returns
102    ///
103    /// A shared reference to the configured progress reporter.
104    #[inline]
105    pub fn reporter(&self) -> &Arc<dyn ProgressReporter> {
106        &self.reporter
107    }
108}
109
110impl Default for SequentialBatchExecutor {
111    /// Creates a sequential batch executor with default configuration.
112    ///
113    /// # Returns
114    ///
115    /// A sequential batch executor using no-op progress reporting.
116    #[inline]
117    fn default() -> Self {
118        Self::builder().build()
119    }
120}
121
122impl BatchExecutor for SequentialBatchExecutor {
123    /// Executes the batch sequentially on the caller thread.
124    ///
125    /// # Parameters
126    ///
127    /// * `tasks` - Task source for the batch.
128    /// * `count` - Declared task count expected from `tasks`.
129    ///
130    /// # Returns
131    ///
132    /// A structured batch result when the declared task count matches, or a
133    /// batch-count mismatch error with the attached partial result.
134    ///
135    /// # Errors
136    ///
137    /// Returns [`BatchExecutionError`] when `tasks` yields fewer or more tasks
138    /// than `count`.
139    ///
140    /// # Panics
141    ///
142    /// Panics from tasks are captured in the result. Panics from the configured
143    /// progress reporter are propagated to the caller.
144    fn execute_with_count<T, E, I>(
145        &self,
146        tasks: I,
147        count: usize,
148    ) -> Result<BatchOutcome<E>, BatchExecutionError<E>>
149    where
150        I: IntoIterator<Item = T>,
151        T: Runnable<E> + Send,
152        E: Send,
153    {
154        let state = BatchExecutionState::new(count);
155        let mut progress = Progress::new(self.reporter.as_ref(), self.report_interval);
156        progress.report_started(state.progress_counters());
157        let mut actual_count = 0;
158        for task in tasks {
159            actual_count = state.record_task_observed();
160            if actual_count > count {
161                let failed = progress.report_failed(state.progress_counters());
162                let outcome = state.into_outcome(failed.elapsed());
163                return Err(BatchExecutionError::CountExceeded {
164                    expected: count,
165                    observed_at_least: actual_count,
166                    outcome,
167                });
168            }
169            // Execute the task and update the state.
170            let mut task = task;
171            state.record_task_started();
172            match catch_unwind(AssertUnwindSafe(|| task.run())) {
173                Ok(Ok(())) => state.record_task_succeeded(),
174                Ok(Err(error)) => state.record_task_failed(actual_count - 1, error),
175                Err(payload) => state.record_task_panicked(
176                    actual_count - 1,
177                    panic_payload_to_error(payload.as_ref()),
178                ),
179            }
180            // Update the actual task count and report progress if due.
181            let _ = progress.report_running_if_due(state.progress_counters());
182        }
183
184        if actual_count < count {
185            let failed = progress.report_failed(state.progress_counters());
186            Err(BatchExecutionError::CountShortfall {
187                expected: count,
188                actual: actual_count,
189                outcome: state.into_outcome(failed.elapsed()),
190            })
191        } else {
192            let finished = progress.report_finished(state.progress_counters());
193            Ok(state.into_outcome(finished.elapsed()))
194        }
195    }
196}