qubit_batch/execute/batch_execution_state.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::sync::{
11 Mutex,
12 MutexGuard,
13};
14use std::time::Duration;
15
16use qubit_atomic::AtomicCount;
17use qubit_progress::model::ProgressCounters;
18
19use crate::{
20 BatchOutcome,
21 BatchOutcomeBuilder,
22 BatchTaskError,
23 BatchTaskFailure,
24};
25
26/// Shared state collected while a batch executor is running.
27pub struct BatchExecutionState<E> {
28 /// Declared task count.
29 task_count: usize,
30 /// Number of tasks observed from the source.
31 observed_count: AtomicCount,
32 /// Number of tasks currently running.
33 active_count: AtomicCount,
34 /// Number of tasks that reached a terminal outcome.
35 completed_count: AtomicCount,
36 /// Number of tasks that completed successfully.
37 succeeded_count: AtomicCount,
38 /// Number of tasks that returned their own errors.
39 failed_count: AtomicCount,
40 /// Number of tasks that panicked.
41 panicked_count: AtomicCount,
42 /// Detailed failures collected during execution.
43 failures: Mutex<Vec<BatchTaskFailure<E>>>,
44}
45
46impl<E> BatchExecutionState<E> {
47 /// Creates empty execution state for a declared task count.
48 ///
49 /// # Parameters
50 ///
51 /// * `task_count` - Declared number of tasks in the batch.
52 ///
53 /// # Returns
54 ///
55 /// Empty execution state.
56 #[inline]
57 pub const fn new(task_count: usize) -> Self {
58 Self {
59 task_count,
60 observed_count: AtomicCount::zero(),
61 active_count: AtomicCount::zero(),
62 completed_count: AtomicCount::zero(),
63 succeeded_count: AtomicCount::zero(),
64 failed_count: AtomicCount::zero(),
65 panicked_count: AtomicCount::zero(),
66 failures: Mutex::new(Vec::new()),
67 }
68 }
69
70 /// Records one observed task.
71 ///
72 /// # Returns
73 ///
74 /// The observed task count after this task was recorded.
75 #[inline]
76 pub fn record_task_observed(&self) -> usize {
77 self.observed_count.inc()
78 }
79
80 /// Records that one task has started.
81 #[inline]
82 pub fn record_task_started(&self) {
83 self.active_count.inc();
84 }
85
86 /// Records one successful task completion.
87 ///
88 /// # Panics
89 ///
90 /// Panics if no active task was recorded for this completion.
91 #[inline]
92 pub fn record_task_succeeded(&self) {
93 self.active_count.dec();
94 self.completed_count.inc();
95 self.succeeded_count.inc();
96 }
97
98 /// Records one task error.
99 ///
100 /// # Parameters
101 ///
102 /// * `index` - Zero-based task index.
103 /// * `error` - Task error returned by the task.
104 ///
105 /// # Panics
106 ///
107 /// Panics if no active task was recorded for this completion.
108 #[inline]
109 pub fn record_task_failed(&self, index: usize, error: E) {
110 self.active_count.dec();
111 self.completed_count.inc();
112 self.failed_count.inc();
113 Self::lock_failures(&self.failures)
114 .push(BatchTaskFailure::new(index, BatchTaskError::Failed(error)));
115 }
116
117 /// Records one task panic.
118 ///
119 /// # Parameters
120 ///
121 /// * `index` - Zero-based task index.
122 /// * `error` - Captured task panic.
123 ///
124 /// # Panics
125 ///
126 /// Panics if no active task was recorded for this completion.
127 #[inline]
128 pub fn record_task_panicked(&self, index: usize, error: BatchTaskError<E>) {
129 self.active_count.dec();
130 self.completed_count.inc();
131 self.panicked_count.inc();
132 Self::lock_failures(&self.failures).push(BatchTaskFailure::new(index, error));
133 }
134
135 /// Returns generic progress counters for this execution state.
136 ///
137 /// # Returns
138 ///
139 /// Counters suitable for progress reporting.
140 #[inline]
141 pub fn progress_counters(&self) -> ProgressCounters {
142 ProgressCounters::new(Some(self.task_count))
143 .with_active_count(self.active_count.get())
144 .with_completed_count(self.completed_count.get())
145 .with_succeeded_count(self.succeeded_count.get())
146 .with_failed_count(
147 self.failed_count
148 .get()
149 .saturating_add(self.panicked_count.get()),
150 )
151 }
152
153 /// Consumes this state and builds a batch outcome.
154 ///
155 /// # Parameters
156 ///
157 /// * `elapsed` - Monotonic elapsed duration.
158 ///
159 /// # Returns
160 ///
161 /// The final or partial outcome represented by this state.
162 #[inline]
163 pub fn into_outcome(self, elapsed: Duration) -> BatchOutcome<E> {
164 let failures = self
165 .failures
166 .into_inner()
167 .unwrap_or_else(std::sync::PoisonError::into_inner);
168 BatchOutcomeBuilder::builder(self.task_count)
169 .completed_count(self.completed_count.get())
170 .succeeded_count(self.succeeded_count.get())
171 .failed_count(self.failed_count.get())
172 .panicked_count(self.panicked_count.get())
173 .elapsed(elapsed)
174 .failures(failures)
175 .build()
176 .expect("batch execution state should collect consistent counters")
177 }
178
179 /// Acquires the failure list lock while tolerating poisoned locks.
180 ///
181 /// # Parameters
182 ///
183 /// * `failures` - Failure list mutex to lock.
184 ///
185 /// # Returns
186 ///
187 /// A guard for the failure list.
188 fn lock_failures(
189 failures: &Mutex<Vec<BatchTaskFailure<E>>>,
190 ) -> MutexGuard<'_, Vec<BatchTaskFailure<E>>> {
191 failures
192 .lock()
193 .unwrap_or_else(std::sync::PoisonError::into_inner)
194 }
195}