Skip to main content

qubit_batch/execute/
batch_execution_state.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::{
11    Mutex,
12    MutexGuard,
13};
14use std::time::Duration;
15
16use qubit_atomic::AtomicCount;
17use qubit_progress::model::ProgressCounters;
18
19use crate::{
20    BatchOutcome,
21    BatchOutcomeBuilder,
22    BatchTaskError,
23    BatchTaskFailure,
24};
25
26/// Shared state collected while a batch executor is running.
27pub struct BatchExecutionState<E> {
28    /// Declared task count.
29    task_count: usize,
30    /// Number of tasks observed from the source.
31    observed_count: AtomicCount,
32    /// Number of tasks currently running.
33    active_count: AtomicCount,
34    /// Number of tasks that reached a terminal outcome.
35    completed_count: AtomicCount,
36    /// Number of tasks that completed successfully.
37    succeeded_count: AtomicCount,
38    /// Number of tasks that returned their own errors.
39    failed_count: AtomicCount,
40    /// Number of tasks that panicked.
41    panicked_count: AtomicCount,
42    /// Detailed failures collected during execution.
43    failures: Mutex<Vec<BatchTaskFailure<E>>>,
44}
45
46impl<E> BatchExecutionState<E> {
47    /// Creates empty execution state for a declared task count.
48    ///
49    /// # Parameters
50    ///
51    /// * `task_count` - Declared number of tasks in the batch.
52    ///
53    /// # Returns
54    ///
55    /// Empty execution state.
56    #[inline]
57    pub const fn new(task_count: usize) -> Self {
58        Self {
59            task_count,
60            observed_count: AtomicCount::zero(),
61            active_count: AtomicCount::zero(),
62            completed_count: AtomicCount::zero(),
63            succeeded_count: AtomicCount::zero(),
64            failed_count: AtomicCount::zero(),
65            panicked_count: AtomicCount::zero(),
66            failures: Mutex::new(Vec::new()),
67        }
68    }
69
70    /// Records one observed task.
71    ///
72    /// # Returns
73    ///
74    /// The observed task count after this task was recorded.
75    #[inline]
76    pub fn record_task_observed(&self) -> usize {
77        self.observed_count.inc()
78    }
79
80    /// Records that one task has started.
81    #[inline]
82    pub fn record_task_started(&self) {
83        self.active_count.inc();
84    }
85
86    /// Records one successful task completion.
87    ///
88    /// # Panics
89    ///
90    /// Panics if no active task was recorded for this completion.
91    #[inline]
92    pub fn record_task_succeeded(&self) {
93        self.active_count.dec();
94        self.completed_count.inc();
95        self.succeeded_count.inc();
96    }
97
98    /// Records one task error.
99    ///
100    /// # Parameters
101    ///
102    /// * `index` - Zero-based task index.
103    /// * `error` - Task error returned by the task.
104    ///
105    /// # Panics
106    ///
107    /// Panics if no active task was recorded for this completion.
108    #[inline]
109    pub fn record_task_failed(&self, index: usize, error: E) {
110        self.active_count.dec();
111        self.completed_count.inc();
112        self.failed_count.inc();
113        Self::lock_failures(&self.failures)
114            .push(BatchTaskFailure::new(index, BatchTaskError::Failed(error)));
115    }
116
117    /// Records one task panic.
118    ///
119    /// # Parameters
120    ///
121    /// * `index` - Zero-based task index.
122    /// * `error` - Captured task panic.
123    ///
124    /// # Panics
125    ///
126    /// Panics if no active task was recorded for this completion.
127    #[inline]
128    pub fn record_task_panicked(&self, index: usize, error: BatchTaskError<E>) {
129        self.active_count.dec();
130        self.completed_count.inc();
131        self.panicked_count.inc();
132        Self::lock_failures(&self.failures).push(BatchTaskFailure::new(index, error));
133    }
134
135    /// Returns generic progress counters for this execution state.
136    ///
137    /// # Returns
138    ///
139    /// Counters suitable for progress reporting.
140    #[inline]
141    pub fn progress_counters(&self) -> ProgressCounters {
142        ProgressCounters::new(Some(self.task_count))
143            .with_active_count(self.active_count.get())
144            .with_completed_count(self.completed_count.get())
145            .with_succeeded_count(self.succeeded_count.get())
146            .with_failed_count(
147                self.failed_count
148                    .get()
149                    .saturating_add(self.panicked_count.get()),
150            )
151    }
152
153    /// Consumes this state and builds a batch outcome.
154    ///
155    /// # Parameters
156    ///
157    /// * `elapsed` - Monotonic elapsed duration.
158    ///
159    /// # Returns
160    ///
161    /// The final or partial outcome represented by this state.
162    #[inline]
163    pub fn into_outcome(self, elapsed: Duration) -> BatchOutcome<E> {
164        let failures = self
165            .failures
166            .into_inner()
167            .unwrap_or_else(std::sync::PoisonError::into_inner);
168        BatchOutcomeBuilder::builder(self.task_count)
169            .completed_count(self.completed_count.get())
170            .succeeded_count(self.succeeded_count.get())
171            .failed_count(self.failed_count.get())
172            .panicked_count(self.panicked_count.get())
173            .elapsed(elapsed)
174            .failures(failures)
175            .build()
176            .expect("batch execution state should collect consistent counters")
177    }
178
179    /// Acquires the failure list lock while tolerating poisoned locks.
180    ///
181    /// # Parameters
182    ///
183    /// * `failures` - Failure list mutex to lock.
184    ///
185    /// # Returns
186    ///
187    /// A guard for the failure list.
188    fn lock_failures(
189        failures: &Mutex<Vec<BatchTaskFailure<E>>>,
190    ) -> MutexGuard<'_, Vec<BatchTaskFailure<E>>> {
191        failures
192            .lock()
193            .unwrap_or_else(std::sync::PoisonError::into_inner)
194    }
195}