crush_core/cancel.rs
1//! Cancellation support for compression operations
2//!
3//! This module provides thread-safe cancellation mechanisms for gracefully
4//! stopping compression/decompression operations in response to user signals
5//! (e.g., Ctrl+C) or programmatic cancellation requests.
6
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10
11/// Thread-safe cancellation signal for compression operations.
12///
13/// This trait provides a lock-free way to signal cancellation to worker threads
14/// and check cancellation state without blocking. Implementations must be
15/// async-signal-safe (can be called from signal handlers).
16///
17/// # Thread Safety
18///
19/// All methods are safe to call concurrently from multiple threads.
20/// Implementations must use atomic operations to ensure lock-freedom.
21///
22/// # Example
23///
24/// ```
25/// use std::sync::Arc;
26/// use crush_core::cancel::{CancellationToken, AtomicCancellationToken};
27///
28/// let token = Arc::new(AtomicCancellationToken::new());
29/// let token_worker = Arc::clone(&token);
30///
31/// // Worker thread checks cancellation
32/// std::thread::spawn(move || {
33/// while !token_worker.is_cancelled() {
34/// // Do work...
35/// }
36/// });
37///
38/// // Signal handler or main thread cancels
39/// token.cancel();
40/// ```
41pub trait CancellationToken: Send + Sync {
42 /// Check if cancellation has been requested.
43 ///
44 /// This method is lock-free, async-signal-safe, and very fast (<10ns).
45 /// It can be called from signal handlers and hot loops without concern.
46 ///
47 /// # Returns
48 ///
49 /// `true` if cancellation has been requested, `false` otherwise.
50 fn is_cancelled(&self) -> bool;
51
52 /// Request cancellation.
53 ///
54 /// This method is idempotent - calling it multiple times has the same
55 /// effect as calling it once. Safe to call from signal handlers.
56 ///
57 /// # Thread Safety
58 ///
59 /// Multiple threads can call this concurrently. The first call to set
60 /// the flag "wins", but all calls succeed.
61 fn cancel(&self);
62
63 /// Reset the cancellation state to not-cancelled.
64 ///
65 /// This allows reusing the same token for multiple sequential operations.
66 /// NOT async-signal-safe - do not call from signal handlers.
67 fn reset(&self);
68}
69
70/// Standard implementation of `CancellationToken` using `AtomicBool`.
71///
72/// This is the recommended implementation for most use cases.
73pub struct AtomicCancellationToken {
74 cancelled: AtomicBool,
75}
76
77impl AtomicCancellationToken {
78 /// Create a new cancellation token in the not-cancelled state.
79 #[must_use]
80 pub fn new() -> Self {
81 Self {
82 cancelled: AtomicBool::new(false),
83 }
84 }
85}
86
87impl Default for AtomicCancellationToken {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl CancellationToken for AtomicCancellationToken {
94 fn is_cancelled(&self) -> bool {
95 self.cancelled.load(Ordering::SeqCst)
96 }
97
98 fn cancel(&self) {
99 self.cancelled.store(true, Ordering::SeqCst);
100 }
101
102 fn reset(&self) {
103 self.cancelled.store(false, Ordering::SeqCst);
104 }
105}
106
107/// Tracks resources created during compression/decompression for guaranteed cleanup.
108///
109/// Uses RAII pattern - resources are automatically cleaned up when dropped unless
110/// marked as complete. This ensures incomplete output files are deleted if an
111/// operation is cancelled or fails.
112///
113/// # Thread Safety
114///
115/// All methods use interior mutability and are safe to call from multiple threads.
116///
117/// # Example
118///
119/// ```no_run
120/// use crush_core::cancel::ResourceTracker;
121/// use std::path::PathBuf;
122///
123/// let tracker = ResourceTracker::new();
124///
125/// // Register the output file to be cleaned up if operation fails
126/// tracker.register_output(PathBuf::from("output.crush"));
127///
128/// // Register temporary files that should always be deleted
129/// tracker.register_temp_file(PathBuf::from("temp.dat"));
130///
131/// // ... do compression work ...
132///
133/// // If successful, mark complete to keep the output file
134/// tracker.mark_complete();
135///
136/// // Drop will clean up temp files but keep output (marked complete)
137/// ```
138///
139/// # Cleanup Behavior
140///
141/// - **Temp files**: Always deleted on drop
142/// - **Output file**: Deleted on drop UNLESS `mark_complete()` was called
143/// - **On panic**: Drop runs, ensuring cleanup even during unwinding
144pub struct ResourceTracker {
145 output_path: Mutex<Option<PathBuf>>,
146 temp_files: Mutex<Vec<PathBuf>>,
147 is_complete: AtomicBool,
148}
149
150impl ResourceTracker {
151 /// Create a new resource tracker.
152 #[must_use]
153 pub fn new() -> Self {
154 Self {
155 output_path: Mutex::new(None),
156 temp_files: Mutex::new(Vec::new()),
157 is_complete: AtomicBool::new(false),
158 }
159 }
160
161 /// Register the output file path to be cleaned up if the operation doesn't complete.
162 ///
163 /// The output file will be deleted on drop unless `mark_complete()` is called.
164 /// Only one output file can be registered (subsequent calls replace the previous).
165 ///
166 /// # Arguments
167 ///
168 /// * `path` - Path to the output file
169 pub fn register_output(&self, path: PathBuf) {
170 if let Ok(mut output) = self.output_path.lock() {
171 *output = Some(path);
172 }
173 }
174
175 /// Register a temporary file that should always be deleted on cleanup.
176 ///
177 /// Temporary files are always deleted on drop, regardless of completion status.
178 /// Multiple temporary files can be registered.
179 ///
180 /// # Arguments
181 ///
182 /// * `path` - Path to the temporary file
183 pub fn register_temp_file(&self, path: PathBuf) {
184 if let Ok(mut temps) = self.temp_files.lock() {
185 temps.push(path);
186 }
187 }
188
189 /// Mark the operation as successfully completed, preventing output file deletion.
190 ///
191 /// Call this after the operation succeeds to keep the output file.
192 /// If not called, the output file will be deleted on drop (cleanup on failure).
193 ///
194 /// # Example
195 ///
196 /// ```no_run
197 /// # use crush_core::cancel::ResourceTracker;
198 /// # use std::path::PathBuf;
199 /// let tracker = ResourceTracker::new();
200 /// tracker.register_output(PathBuf::from("output.dat"));
201 ///
202 /// // ... do work ...
203 ///
204 /// if work_succeeded() {
205 /// tracker.mark_complete(); // Keep the output file
206 /// }
207 /// // If work failed, drop will delete the output file
208 /// # fn work_succeeded() -> bool { true }
209 /// ```
210 pub fn mark_complete(&self) {
211 self.is_complete.store(true, Ordering::SeqCst);
212 }
213
214 /// Clean up all tracked resources.
215 ///
216 /// # Errors
217 ///
218 /// Returns an error if file deletion fails (permissions, file in use, etc.).
219 /// Returns the first error encountered if multiple deletions fail.
220 pub fn cleanup_all(&self) -> std::io::Result<()> {
221 // Always delete temp files
222 if let Ok(temps) = self.temp_files.lock() {
223 for temp in temps.iter() {
224 if temp.exists() {
225 std::fs::remove_file(temp)?;
226 }
227 }
228 }
229
230 // Delete output file only if operation not complete
231 if !self.is_complete.load(Ordering::SeqCst) {
232 if let Ok(output) = self.output_path.lock() {
233 if let Some(path) = output.as_ref() {
234 if path.exists() {
235 std::fs::remove_file(path)?;
236 }
237 }
238 }
239 }
240
241 Ok(())
242 }
243}
244
245impl Default for ResourceTracker {
246 fn default() -> Self {
247 Self::new()
248 }
249}
250
251impl Drop for ResourceTracker {
252 fn drop(&mut self) {
253 if !self.is_complete.load(Ordering::SeqCst) {
254 // Operation did not complete - clean up everything
255 // Ignore errors in Drop - best effort cleanup
256 let _ = self.cleanup_all();
257 }
258 }
259}
260
261#[cfg(test)]
262#[allow(clippy::panic_in_result_fn)]
263#[allow(clippy::unwrap_used)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn new_token_not_cancelled() {
269 let token = AtomicCancellationToken::new();
270 assert!(!token.is_cancelled());
271 }
272
273 #[test]
274 fn cancel_sets_flag() {
275 let token = AtomicCancellationToken::new();
276 token.cancel();
277 assert!(token.is_cancelled());
278 }
279
280 #[test]
281 fn cancel_is_idempotent() {
282 let token = AtomicCancellationToken::new();
283 token.cancel();
284 token.cancel();
285 token.cancel();
286 assert!(token.is_cancelled());
287 }
288
289 #[test]
290 fn reset_clears_cancellation() {
291 let token = AtomicCancellationToken::new();
292 token.cancel();
293 assert!(token.is_cancelled());
294 token.reset();
295 assert!(!token.is_cancelled());
296 }
297
298 #[test]
299 fn default_creates_uncancelled_token() {
300 let token = AtomicCancellationToken::default();
301 assert!(!token.is_cancelled());
302 }
303
304 // ResourceTracker tests
305 #[test]
306 fn new_resource_tracker_is_empty() {
307 let tracker = ResourceTracker::new();
308 // Verify cleanup_all succeeds with no registered resources
309 assert!(tracker.cleanup_all().is_ok());
310 }
311
312 #[test]
313 fn resource_tracker_default() {
314 let tracker = ResourceTracker::default();
315 assert!(tracker.cleanup_all().is_ok());
316 }
317
318 #[test]
319 fn register_output_and_mark_complete() -> std::io::Result<()> {
320 use std::fs;
321 use tempfile::TempDir;
322
323 let dir = TempDir::new()?;
324 let output_path = dir.path().join("output.txt");
325
326 // Create the output file
327 fs::write(&output_path, b"test data")?;
328 assert!(output_path.exists());
329
330 let tracker = ResourceTracker::new();
331 tracker.register_output(output_path.clone());
332 tracker.mark_complete();
333
334 // Clean up - file should NOT be deleted (marked complete)
335 tracker.cleanup_all()?;
336 assert!(
337 output_path.exists(),
338 "Completed output file should not be deleted"
339 );
340
341 Ok(())
342 }
343
344 #[test]
345 fn register_output_without_complete_deletes() -> std::io::Result<()> {
346 use std::fs;
347 use tempfile::TempDir;
348
349 let dir = TempDir::new()?;
350 let output_path = dir.path().join("output.txt");
351
352 // Create the output file
353 fs::write(&output_path, b"test data")?;
354 assert!(output_path.exists());
355
356 let tracker = ResourceTracker::new();
357 tracker.register_output(output_path.clone());
358
359 // Clean up without marking complete - file SHOULD be deleted
360 tracker.cleanup_all()?;
361 assert!(
362 !output_path.exists(),
363 "Incomplete output file should be deleted"
364 );
365
366 Ok(())
367 }
368
369 #[test]
370 fn register_temp_file_always_deletes() -> std::io::Result<()> {
371 use std::fs;
372 use tempfile::TempDir;
373
374 let dir = TempDir::new()?;
375 let temp_path = dir.path().join("temp.txt");
376
377 // Create the temp file
378 fs::write(&temp_path, b"temp data")?;
379 assert!(temp_path.exists());
380
381 let tracker = ResourceTracker::new();
382 tracker.register_temp_file(temp_path.clone());
383 tracker.mark_complete(); // Even if marked complete
384
385 // Clean up - temp file should ALWAYS be deleted
386 tracker.cleanup_all()?;
387 assert!(!temp_path.exists(), "Temp file should always be deleted");
388
389 Ok(())
390 }
391
392 #[test]
393 fn register_multiple_temp_files() -> std::io::Result<()> {
394 use std::fs;
395 use tempfile::TempDir;
396
397 let dir = TempDir::new()?;
398 let temp1 = dir.path().join("temp1.txt");
399 let temp2 = dir.path().join("temp2.txt");
400 let temp3 = dir.path().join("temp3.txt");
401
402 // Create temp files
403 fs::write(&temp1, b"temp1")?;
404 fs::write(&temp2, b"temp2")?;
405 fs::write(&temp3, b"temp3")?;
406
407 let tracker = ResourceTracker::new();
408 tracker.register_temp_file(temp1.clone());
409 tracker.register_temp_file(temp2.clone());
410 tracker.register_temp_file(temp3.clone());
411
412 // Clean up - all temp files should be deleted
413 tracker.cleanup_all()?;
414 assert!(!temp1.exists());
415 assert!(!temp2.exists());
416 assert!(!temp3.exists());
417
418 Ok(())
419 }
420
421 #[test]
422 fn drop_cleans_up_incomplete_output() -> std::io::Result<()> {
423 use std::fs;
424 use tempfile::TempDir;
425
426 let dir = TempDir::new()?;
427 let output_path = dir.path().join("output.txt");
428
429 // Create the output file
430 fs::write(&output_path, b"test data")?;
431 assert!(output_path.exists());
432
433 {
434 let tracker = ResourceTracker::new();
435 tracker.register_output(output_path.clone());
436 // Drop without marking complete
437 }
438
439 // File should be deleted by Drop
440 assert!(
441 !output_path.exists(),
442 "Drop should delete incomplete output"
443 );
444
445 Ok(())
446 }
447
448 #[test]
449 fn drop_keeps_completed_output() -> std::io::Result<()> {
450 use std::fs;
451 use tempfile::TempDir;
452
453 let dir = TempDir::new()?;
454 let output_path = dir.path().join("output.txt");
455
456 // Create the output file
457 fs::write(&output_path, b"test data")?;
458
459 {
460 let tracker = ResourceTracker::new();
461 tracker.register_output(output_path.clone());
462 tracker.mark_complete();
463 // Drop with completion
464 }
465
466 // File should NOT be deleted
467 assert!(output_path.exists(), "Drop should keep completed output");
468
469 Ok(())
470 }
471
472 #[test]
473 fn cleanup_nonexistent_files_succeeds() {
474 use tempfile::TempDir;
475
476 let dir = TempDir::new().unwrap();
477 let fake_path = dir.path().join("does_not_exist.txt");
478
479 let tracker = ResourceTracker::new();
480 tracker.register_output(fake_path.clone());
481 tracker.register_temp_file(fake_path);
482
483 // Cleanup should succeed even if files don't exist
484 assert!(tracker.cleanup_all().is_ok());
485 }
486
487 #[test]
488 fn replace_output_registration() -> std::io::Result<()> {
489 use std::fs;
490 use tempfile::TempDir;
491
492 let dir = TempDir::new()?;
493 let output1 = dir.path().join("output1.txt");
494 let output2 = dir.path().join("output2.txt");
495
496 fs::write(&output1, b"data1")?;
497 fs::write(&output2, b"data2")?;
498
499 let tracker = ResourceTracker::new();
500 tracker.register_output(output1.clone());
501 tracker.register_output(output2.clone()); // Replace first registration
502
503 // Only the second output should be deleted
504 tracker.cleanup_all()?;
505
506 assert!(
507 output1.exists(),
508 "First output should not be tracked after replacement"
509 );
510 assert!(!output2.exists(), "Second output should be deleted");
511
512 Ok(())
513 }
514}