qubit_batch/execute/impls/sequential_batch_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 panic::{
12 AssertUnwindSafe,
13 catch_unwind,
14 },
15 sync::Arc,
16 time::Duration,
17};
18
19use qubit_function::Runnable;
20use qubit_progress::{
21 Progress,
22 reporter::ProgressReporter,
23};
24
25use crate::{
26 BatchExecutionError,
27 BatchOutcome,
28 execute::{
29 BatchExecutionState,
30 BatchExecutor,
31 panic_payload_to_error,
32 },
33};
34
35use super::SequentialBatchExecutorBuilder;
36
37/// Executes a whole batch sequentially on the caller thread.
38///
39/// Progress updates are emitted only between tasks. A long-running single task
40/// therefore does not produce intermediate sequential progress callbacks.
41///
42/// ```rust
43/// use qubit_batch::{
44/// BatchExecutor,
45/// SequentialBatchExecutor,
46/// };
47///
48/// let outcome = SequentialBatchExecutor::new()
49/// .for_each(["a", "b", "c"], |item| {
50/// assert!(!item.is_empty());
51/// Ok::<(), &'static str>(())
52/// })
53/// .expect("array length should be exact");
54///
55/// assert!(outcome.is_success());
56/// ```
57#[derive(Clone)]
58pub struct SequentialBatchExecutor {
59 /// Interval between progress callbacks while the batch is running.
60 pub(crate) report_interval: Duration,
61 /// Reporter receiving batch lifecycle callbacks.
62 pub(crate) reporter: Arc<dyn ProgressReporter>,
63}
64
65impl SequentialBatchExecutor {
66 /// Default interval between progress callbacks.
67 pub const DEFAULT_REPORT_INTERVAL: Duration = Duration::from_secs(5);
68
69 /// Creates a sequential batch executor with default configuration.
70 ///
71 /// # Returns
72 ///
73 /// A sequential batch executor using no-op progress reporting.
74 #[inline]
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 /// Creates a builder for configuring a sequential batch executor.
80 ///
81 /// # Returns
82 ///
83 /// A builder initialized with default settings.
84 #[inline]
85 pub fn builder() -> SequentialBatchExecutorBuilder {
86 SequentialBatchExecutorBuilder::default()
87 }
88
89 /// Returns the configured progress-report interval.
90 ///
91 /// # Returns
92 ///
93 /// The minimum time between due-based running progress callbacks.
94 #[inline]
95 pub const fn report_interval(&self) -> Duration {
96 self.report_interval
97 }
98
99 /// Returns the progress reporter used by this executor.
100 ///
101 /// # Returns
102 ///
103 /// A shared reference to the configured progress reporter.
104 #[inline]
105 pub fn reporter(&self) -> &Arc<dyn ProgressReporter> {
106 &self.reporter
107 }
108}
109
110impl Default for SequentialBatchExecutor {
111 /// Creates a sequential batch executor with default configuration.
112 ///
113 /// # Returns
114 ///
115 /// A sequential batch executor using no-op progress reporting.
116 #[inline]
117 fn default() -> Self {
118 Self::builder().build()
119 }
120}
121
122impl BatchExecutor for SequentialBatchExecutor {
123 /// Executes the batch sequentially on the caller thread.
124 ///
125 /// # Parameters
126 ///
127 /// * `tasks` - Task source for the batch.
128 /// * `count` - Declared task count expected from `tasks`.
129 ///
130 /// # Returns
131 ///
132 /// A structured batch result when the declared task count matches, or a
133 /// batch-count mismatch error with the attached partial result.
134 ///
135 /// # Errors
136 ///
137 /// Returns [`BatchExecutionError`] when `tasks` yields fewer or more tasks
138 /// than `count`.
139 ///
140 /// # Panics
141 ///
142 /// Panics from tasks are captured in the result. Panics from the configured
143 /// progress reporter are propagated to the caller.
144 fn execute_with_count<T, E, I>(
145 &self,
146 tasks: I,
147 count: usize,
148 ) -> Result<BatchOutcome<E>, BatchExecutionError<E>>
149 where
150 I: IntoIterator<Item = T>,
151 T: Runnable<E> + Send,
152 E: Send,
153 {
154 let state = BatchExecutionState::new(count);
155 let mut progress = Progress::new(self.reporter.as_ref(), self.report_interval);
156 progress.report_started(state.progress_counters());
157 let mut actual_count = 0;
158 for task in tasks {
159 actual_count = state.record_task_observed();
160 if actual_count > count {
161 let failed = progress.report_failed(state.progress_counters());
162 let outcome = state.into_outcome(failed.elapsed());
163 return Err(BatchExecutionError::CountExceeded {
164 expected: count,
165 observed_at_least: actual_count,
166 outcome,
167 });
168 }
169 // Execute the task and update the state.
170 let mut task = task;
171 state.record_task_started();
172 match catch_unwind(AssertUnwindSafe(|| task.run())) {
173 Ok(Ok(())) => state.record_task_succeeded(),
174 Ok(Err(error)) => state.record_task_failed(actual_count - 1, error),
175 Err(payload) => state.record_task_panicked(
176 actual_count - 1,
177 panic_payload_to_error(payload.as_ref()),
178 ),
179 }
180 // Update the actual task count and report progress if due.
181 let _ = progress.report_running_if_due(state.progress_counters());
182 }
183
184 if actual_count < count {
185 let failed = progress.report_failed(state.progress_counters());
186 Err(BatchExecutionError::CountShortfall {
187 expected: count,
188 actual: actual_count,
189 outcome: state.into_outcome(failed.elapsed()),
190 })
191 } else {
192 let finished = progress.report_finished(state.progress_counters());
193 Ok(state.into_outcome(finished.elapsed()))
194 }
195 }
196}