venus_core/execute/
context.rs1use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8
9use crate::error::Error;
10use crate::graph::CellId;
11
12#[derive(Clone, Default)]
36pub struct AbortHandle {
37 aborted: Arc<AtomicBool>,
39}
40
41impl AbortHandle {
42 pub fn new() -> Self {
44 Self {
45 aborted: Arc::new(AtomicBool::new(false)),
46 }
47 }
48
49 pub fn is_aborted(&self) -> bool {
54 self.aborted.load(Ordering::Relaxed)
55 }
56
57 pub fn abort(&self) {
62 self.aborted.store(true, Ordering::Relaxed);
63 }
64
65 pub fn reset(&self) {
69 self.aborted.store(false, Ordering::Relaxed);
70 }
71}
72
73pub trait ExecutionCallback: Send + Sync {
75 fn on_cell_started(&self, cell_id: CellId, name: &str);
77
78 fn on_cell_completed(&self, cell_id: CellId, name: &str);
80
81 fn on_cell_error(&self, cell_id: CellId, name: &str, error: &Error);
83
84 fn on_level_started(&self, _level: usize, _cell_count: usize) {}
86
87 fn on_level_completed(&self, _level: usize) {}
89}
90
91pub struct CellContext {
99 cell_id: CellId,
101 name: String,
103 cleanup_handlers: Vec<Box<dyn FnOnce() + Send>>,
105 aborted: bool,
107}
108
109impl CellContext {
110 pub fn new(cell_id: CellId, name: String) -> Self {
112 Self {
113 cell_id,
114 name,
115 cleanup_handlers: Vec::new(),
116 aborted: false,
117 }
118 }
119
120 pub fn cell_id(&self) -> CellId {
122 self.cell_id
123 }
124
125 pub fn name(&self) -> &str {
127 &self.name
128 }
129
130 pub fn is_aborted(&self) -> bool {
132 self.aborted
133 }
134
135 pub fn on_cleanup(&mut self, handler: impl FnOnce() + Send + 'static) {
139 self.cleanup_handlers.push(Box::new(handler));
140 }
141
142 pub fn abort(&mut self) {
146 if !self.aborted {
147 self.aborted = true;
148 self.run_cleanup();
149 }
150 }
151
152 fn run_cleanup(&mut self) {
154 while let Some(handler) = self.cleanup_handlers.pop() {
156 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(handler));
158 if let Err(e) = result {
159 tracing::error!(
160 "Cleanup handler for cell {:?} panicked: {:?}",
161 self.cell_id,
162 e
163 );
164 }
165 }
166 }
167}
168
169impl Drop for CellContext {
170 fn drop(&mut self) {
171 self.run_cleanup();
172 }
173}
174
175#[cfg(test)]
179mod tests {
180 use super::*;
181 use std::sync::atomic::{AtomicBool, Ordering};
182
183 #[test]
184 fn test_cell_context_creation() {
185 let ctx = CellContext::new(CellId::new(0), "test".to_string());
186 assert_eq!(ctx.cell_id().as_usize(), 0);
187 assert_eq!(ctx.name(), "test");
188 assert!(!ctx.is_aborted());
189 }
190
191 #[test]
192 fn test_cleanup_handlers() {
193 let called = Arc::new(AtomicBool::new(false));
194 let called_clone = called.clone();
195
196 {
197 let mut ctx = CellContext::new(CellId::new(0), "test".to_string());
198 ctx.on_cleanup(move || {
199 called_clone.store(true, Ordering::SeqCst);
200 });
201 }
202
203 assert!(called.load(Ordering::SeqCst));
205 }
206
207 #[test]
208 fn test_cleanup_order() {
209 let order = Arc::new(std::sync::Mutex::new(Vec::new()));
210 let order1 = order.clone();
211 let order2 = order.clone();
212
213 {
214 let mut ctx = CellContext::new(CellId::new(0), "test".to_string());
215 ctx.on_cleanup(move || {
216 order1.lock().unwrap().push(1);
217 });
218 ctx.on_cleanup(move || {
219 order2.lock().unwrap().push(2);
220 });
221 }
222
223 assert_eq!(*order.lock().unwrap(), vec![2, 1]);
225 }
226
227 #[test]
228 fn test_abort() {
229 let called = Arc::new(AtomicBool::new(false));
230 let called_clone = called.clone();
231
232 let mut ctx = CellContext::new(CellId::new(0), "test".to_string());
233 ctx.on_cleanup(move || {
234 called_clone.store(true, Ordering::SeqCst);
235 });
236
237 assert!(!ctx.is_aborted());
238 ctx.abort();
239 assert!(ctx.is_aborted());
240 assert!(called.load(Ordering::SeqCst));
241 }
242
243 #[test]
244 fn test_abort_handle_creation() {
245 let handle = AbortHandle::new();
246 assert!(!handle.is_aborted());
247 }
248
249 #[test]
250 fn test_abort_handle_abort() {
251 let handle = AbortHandle::new();
252 assert!(!handle.is_aborted());
253
254 handle.abort();
255 assert!(handle.is_aborted());
256 }
257
258 #[test]
259 fn test_abort_handle_clone_shares_state() {
260 let handle = AbortHandle::new();
261 let clone = handle.clone();
262
263 assert!(!handle.is_aborted());
264 assert!(!clone.is_aborted());
265
266 clone.abort();
267
268 assert!(handle.is_aborted());
269 assert!(clone.is_aborted());
270 }
271
272 #[test]
273 fn test_abort_handle_reset() {
274 let handle = AbortHandle::new();
275 handle.abort();
276 assert!(handle.is_aborted());
277
278 handle.reset();
279 assert!(!handle.is_aborted());
280 }
281
282 #[test]
283 fn test_abort_handle_default() {
284 let handle = AbortHandle::default();
285 assert!(!handle.is_aborted());
286 }
287}