venus_core/execute/
context.rs

1//! Execution context and callbacks for Venus cells.
2//!
3//! Provides resource management, progress reporting, and cooperative cancellation
4//! during cell execution.
5
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8
9use crate::error::Error;
10use crate::graph::CellId;
11
12/// Handle for cooperative cancellation of cell execution.
13///
14/// `AbortHandle` provides a thread-safe mechanism for signaling that execution
15/// should be cancelled. It can be cloned and shared across threads, and any
16/// clone can trigger the abort which will be visible to all other clones.
17///
18/// # Example
19///
20/// ```
21/// use venus_core::execute::AbortHandle;
22///
23/// let handle = AbortHandle::new();
24/// let handle_clone = handle.clone();
25///
26/// // Check abort status
27/// assert!(!handle.is_aborted());
28///
29/// // Trigger abort from any clone
30/// handle_clone.abort();
31///
32/// // All clones see the abort
33/// assert!(handle.is_aborted());
34/// ```
35#[derive(Clone, Default)]
36pub struct AbortHandle {
37    /// Shared abort flag.
38    aborted: Arc<AtomicBool>,
39}
40
41impl AbortHandle {
42    /// Create a new abort handle.
43    pub fn new() -> Self {
44        Self {
45            aborted: Arc::new(AtomicBool::new(false)),
46        }
47    }
48
49    /// Check if abort has been requested.
50    ///
51    /// Cells should call this periodically during long-running operations
52    /// and exit early if it returns `true`.
53    pub fn is_aborted(&self) -> bool {
54        self.aborted.load(Ordering::Relaxed)
55    }
56
57    /// Request abort of execution.
58    ///
59    /// This is a cooperative mechanism - cells must check `is_aborted()`
60    /// and honor the request by returning early.
61    pub fn abort(&self) {
62        self.aborted.store(true, Ordering::Relaxed);
63    }
64
65    /// Reset the abort flag.
66    ///
67    /// Called before starting a new execution to clear any previous abort.
68    pub fn reset(&self) {
69        self.aborted.store(false, Ordering::Relaxed);
70    }
71}
72
73/// Callback trait for execution progress reporting.
74pub trait ExecutionCallback: Send + Sync {
75    /// Called when a cell starts executing.
76    fn on_cell_started(&self, cell_id: CellId, name: &str);
77
78    /// Called when a cell completes successfully.
79    fn on_cell_completed(&self, cell_id: CellId, name: &str);
80
81    /// Called when a cell execution fails.
82    fn on_cell_error(&self, cell_id: CellId, name: &str, error: &Error);
83
84    /// Called when a parallel level starts.
85    fn on_level_started(&self, _level: usize, _cell_count: usize) {}
86
87    /// Called when a parallel level completes.
88    fn on_level_completed(&self, _level: usize) {}
89}
90
91// Note: LoggingCallback was removed as unused dead code.
92// Users can implement ExecutionCallback trait directly for custom logging.
93
94/// Execution context for a running cell.
95///
96/// Provides resource management and cleanup hooks for cells that
97/// need to manage background tasks or external resources.
98pub struct CellContext {
99    /// Cell identifier
100    cell_id: CellId,
101    /// Cell name for logging
102    name: String,
103    /// Registered cleanup handlers
104    cleanup_handlers: Vec<Box<dyn FnOnce() + Send>>,
105    /// Whether the cell has been aborted
106    aborted: bool,
107}
108
109impl CellContext {
110    /// Create a new cell context.
111    pub fn new(cell_id: CellId, name: String) -> Self {
112        Self {
113            cell_id,
114            name,
115            cleanup_handlers: Vec::new(),
116            aborted: false,
117        }
118    }
119
120    /// Get the cell ID.
121    pub fn cell_id(&self) -> CellId {
122        self.cell_id
123    }
124
125    /// Get the cell name.
126    pub fn name(&self) -> &str {
127        &self.name
128    }
129
130    /// Check if execution has been aborted.
131    pub fn is_aborted(&self) -> bool {
132        self.aborted
133    }
134
135    /// Register a cleanup handler to be called when the cell is unloaded.
136    ///
137    /// Cleanup handlers are called in reverse order of registration.
138    pub fn on_cleanup(&mut self, handler: impl FnOnce() + Send + 'static) {
139        self.cleanup_handlers.push(Box::new(handler));
140    }
141
142    /// Abort cell execution.
143    ///
144    /// This sets the aborted flag and runs all cleanup handlers.
145    pub fn abort(&mut self) {
146        if !self.aborted {
147            self.aborted = true;
148            self.run_cleanup();
149        }
150    }
151
152    /// Run all cleanup handlers.
153    fn run_cleanup(&mut self) {
154        // Run handlers in reverse order
155        while let Some(handler) = self.cleanup_handlers.pop() {
156            // Catch panics to ensure all handlers run
157            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(handler));
158            if let Err(e) = result {
159                tracing::error!(
160                    "Cleanup handler for cell {:?} panicked: {:?}",
161                    self.cell_id,
162                    e
163                );
164            }
165        }
166    }
167}
168
169impl Drop for CellContext {
170    fn drop(&mut self) {
171        self.run_cleanup();
172    }
173}
174
175// Note: SharedContext types were removed as unused dead code.
176// ParallelExecutor uses LinearExecutor internally with a Mutex, which is sufficient for current needs.
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use std::sync::atomic::{AtomicBool, Ordering};
182
183    #[test]
184    fn test_cell_context_creation() {
185        let ctx = CellContext::new(CellId::new(0), "test".to_string());
186        assert_eq!(ctx.cell_id().as_usize(), 0);
187        assert_eq!(ctx.name(), "test");
188        assert!(!ctx.is_aborted());
189    }
190
191    #[test]
192    fn test_cleanup_handlers() {
193        let called = Arc::new(AtomicBool::new(false));
194        let called_clone = called.clone();
195
196        {
197            let mut ctx = CellContext::new(CellId::new(0), "test".to_string());
198            ctx.on_cleanup(move || {
199                called_clone.store(true, Ordering::SeqCst);
200            });
201        }
202
203        // Handler should be called when context is dropped
204        assert!(called.load(Ordering::SeqCst));
205    }
206
207    #[test]
208    fn test_cleanup_order() {
209        let order = Arc::new(std::sync::Mutex::new(Vec::new()));
210        let order1 = order.clone();
211        let order2 = order.clone();
212
213        {
214            let mut ctx = CellContext::new(CellId::new(0), "test".to_string());
215            ctx.on_cleanup(move || {
216                order1.lock().unwrap().push(1);
217            });
218            ctx.on_cleanup(move || {
219                order2.lock().unwrap().push(2);
220            });
221        }
222
223        // Handlers should be called in reverse order (LIFO)
224        assert_eq!(*order.lock().unwrap(), vec![2, 1]);
225    }
226
227    #[test]
228    fn test_abort() {
229        let called = Arc::new(AtomicBool::new(false));
230        let called_clone = called.clone();
231
232        let mut ctx = CellContext::new(CellId::new(0), "test".to_string());
233        ctx.on_cleanup(move || {
234            called_clone.store(true, Ordering::SeqCst);
235        });
236
237        assert!(!ctx.is_aborted());
238        ctx.abort();
239        assert!(ctx.is_aborted());
240        assert!(called.load(Ordering::SeqCst));
241    }
242
243    #[test]
244    fn test_abort_handle_creation() {
245        let handle = AbortHandle::new();
246        assert!(!handle.is_aborted());
247    }
248
249    #[test]
250    fn test_abort_handle_abort() {
251        let handle = AbortHandle::new();
252        assert!(!handle.is_aborted());
253
254        handle.abort();
255        assert!(handle.is_aborted());
256    }
257
258    #[test]
259    fn test_abort_handle_clone_shares_state() {
260        let handle = AbortHandle::new();
261        let clone = handle.clone();
262
263        assert!(!handle.is_aborted());
264        assert!(!clone.is_aborted());
265
266        clone.abort();
267
268        assert!(handle.is_aborted());
269        assert!(clone.is_aborted());
270    }
271
272    #[test]
273    fn test_abort_handle_reset() {
274        let handle = AbortHandle::new();
275        handle.abort();
276        assert!(handle.is_aborted());
277
278        handle.reset();
279        assert!(!handle.is_aborted());
280    }
281
282    #[test]
283    fn test_abort_handle_default() {
284        let handle = AbortHandle::default();
285        assert!(!handle.is_aborted());
286    }
287}