Skip to main content

crush_core/
cancel.rs

1//! Cancellation support for compression operations
2//!
3//! This module provides thread-safe cancellation mechanisms for gracefully
4//! stopping compression/decompression operations in response to user signals
5//! (e.g., Ctrl+C) or programmatic cancellation requests.
6
7use std::path::PathBuf;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Mutex;
10
11/// Thread-safe cancellation signal for compression operations.
12///
13/// This trait provides a lock-free way to signal cancellation to worker threads
14/// and check cancellation state without blocking. Implementations must be
15/// async-signal-safe (can be called from signal handlers).
16///
17/// # Thread Safety
18///
19/// All methods are safe to call concurrently from multiple threads.
20/// Implementations must use atomic operations to ensure lock-freedom.
21///
22/// # Example
23///
24/// ```
25/// use std::sync::Arc;
26/// use crush_core::cancel::{CancellationToken, AtomicCancellationToken};
27///
28/// let token = Arc::new(AtomicCancellationToken::new());
29/// let token_worker = Arc::clone(&token);
30///
31/// // Worker thread checks cancellation
32/// std::thread::spawn(move || {
33///     while !token_worker.is_cancelled() {
34///         // Do work...
35///     }
36/// });
37///
38/// // Signal handler or main thread cancels
39/// token.cancel();
40/// ```
41pub trait CancellationToken: Send + Sync {
42    /// Check if cancellation has been requested.
43    ///
44    /// This method is lock-free, async-signal-safe, and very fast (<10ns).
45    /// It can be called from signal handlers and hot loops without concern.
46    ///
47    /// # Returns
48    ///
49    /// `true` if cancellation has been requested, `false` otherwise.
50    fn is_cancelled(&self) -> bool;
51
52    /// Request cancellation.
53    ///
54    /// This method is idempotent - calling it multiple times has the same
55    /// effect as calling it once. Safe to call from signal handlers.
56    ///
57    /// # Thread Safety
58    ///
59    /// Multiple threads can call this concurrently. The first call to set
60    /// the flag "wins", but all calls succeed.
61    fn cancel(&self);
62
63    /// Reset the cancellation state to not-cancelled.
64    ///
65    /// This allows reusing the same token for multiple sequential operations.
66    /// NOT async-signal-safe - do not call from signal handlers.
67    fn reset(&self);
68}
69
70/// Standard implementation of `CancellationToken` using `AtomicBool`.
71///
72/// This is the recommended implementation for most use cases.
73pub struct AtomicCancellationToken {
74    cancelled: AtomicBool,
75}
76
77impl AtomicCancellationToken {
78    /// Create a new cancellation token in the not-cancelled state.
79    #[must_use]
80    pub fn new() -> Self {
81        Self {
82            cancelled: AtomicBool::new(false),
83        }
84    }
85}
86
87impl Default for AtomicCancellationToken {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl CancellationToken for AtomicCancellationToken {
94    fn is_cancelled(&self) -> bool {
95        self.cancelled.load(Ordering::SeqCst)
96    }
97
98    fn cancel(&self) {
99        self.cancelled.store(true, Ordering::SeqCst);
100    }
101
102    fn reset(&self) {
103        self.cancelled.store(false, Ordering::SeqCst);
104    }
105}
106
107/// Tracks resources created during compression/decompression for guaranteed cleanup.
108///
109/// Uses RAII pattern - resources are automatically cleaned up when dropped unless
110/// marked as complete. This ensures incomplete output files are deleted if an
111/// operation is cancelled or fails.
112///
113/// # Thread Safety
114///
115/// All methods use interior mutability and are safe to call from multiple threads.
116///
117/// # Example
118///
119/// ```no_run
120/// use crush_core::cancel::ResourceTracker;
121/// use std::path::PathBuf;
122///
123/// let tracker = ResourceTracker::new();
124///
125/// // Register the output file to be cleaned up if operation fails
126/// tracker.register_output(PathBuf::from("output.crush"));
127///
128/// // Register temporary files that should always be deleted
129/// tracker.register_temp_file(PathBuf::from("temp.dat"));
130///
131/// // ... do compression work ...
132///
133/// // If successful, mark complete to keep the output file
134/// tracker.mark_complete();
135///
136/// // Drop will clean up temp files but keep output (marked complete)
137/// ```
138///
139/// # Cleanup Behavior
140///
141/// - **Temp files**: Always deleted on drop
142/// - **Output file**: Deleted on drop UNLESS `mark_complete()` was called
143/// - **On panic**: Drop runs, ensuring cleanup even during unwinding
144pub struct ResourceTracker {
145    output_path: Mutex<Option<PathBuf>>,
146    temp_files: Mutex<Vec<PathBuf>>,
147    is_complete: AtomicBool,
148}
149
150impl ResourceTracker {
151    /// Create a new resource tracker.
152    #[must_use]
153    pub fn new() -> Self {
154        Self {
155            output_path: Mutex::new(None),
156            temp_files: Mutex::new(Vec::new()),
157            is_complete: AtomicBool::new(false),
158        }
159    }
160
161    /// Register the output file path to be cleaned up if the operation doesn't complete.
162    ///
163    /// The output file will be deleted on drop unless `mark_complete()` is called.
164    /// Only one output file can be registered (subsequent calls replace the previous).
165    ///
166    /// # Arguments
167    ///
168    /// * `path` - Path to the output file
169    pub fn register_output(&self, path: PathBuf) {
170        if let Ok(mut output) = self.output_path.lock() {
171            *output = Some(path);
172        }
173    }
174
175    /// Register a temporary file that should always be deleted on cleanup.
176    ///
177    /// Temporary files are always deleted on drop, regardless of completion status.
178    /// Multiple temporary files can be registered.
179    ///
180    /// # Arguments
181    ///
182    /// * `path` - Path to the temporary file
183    pub fn register_temp_file(&self, path: PathBuf) {
184        if let Ok(mut temps) = self.temp_files.lock() {
185            temps.push(path);
186        }
187    }
188
189    /// Mark the operation as successfully completed, preventing output file deletion.
190    ///
191    /// Call this after the operation succeeds to keep the output file.
192    /// If not called, the output file will be deleted on drop (cleanup on failure).
193    ///
194    /// # Example
195    ///
196    /// ```no_run
197    /// # use crush_core::cancel::ResourceTracker;
198    /// # use std::path::PathBuf;
199    /// let tracker = ResourceTracker::new();
200    /// tracker.register_output(PathBuf::from("output.dat"));
201    ///
202    /// // ... do work ...
203    ///
204    /// if work_succeeded() {
205    ///     tracker.mark_complete(); // Keep the output file
206    /// }
207    /// // If work failed, drop will delete the output file
208    /// # fn work_succeeded() -> bool { true }
209    /// ```
210    pub fn mark_complete(&self) {
211        self.is_complete.store(true, Ordering::SeqCst);
212    }
213
214    /// Clean up all tracked resources.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if file deletion fails (permissions, file in use, etc.).
219    /// Returns the first error encountered if multiple deletions fail.
220    pub fn cleanup_all(&self) -> std::io::Result<()> {
221        // Always delete temp files
222        if let Ok(temps) = self.temp_files.lock() {
223            for temp in temps.iter() {
224                if temp.exists() {
225                    std::fs::remove_file(temp)?;
226                }
227            }
228        }
229
230        // Delete output file only if operation not complete
231        if !self.is_complete.load(Ordering::SeqCst) {
232            if let Ok(output) = self.output_path.lock() {
233                if let Some(path) = output.as_ref() {
234                    if path.exists() {
235                        std::fs::remove_file(path)?;
236                    }
237                }
238            }
239        }
240
241        Ok(())
242    }
243}
244
245impl Default for ResourceTracker {
246    fn default() -> Self {
247        Self::new()
248    }
249}
250
251impl Drop for ResourceTracker {
252    fn drop(&mut self) {
253        if !self.is_complete.load(Ordering::SeqCst) {
254            // Operation did not complete - clean up everything
255            // Ignore errors in Drop - best effort cleanup
256            let _ = self.cleanup_all();
257        }
258    }
259}
260
261#[cfg(test)]
262#[allow(clippy::panic_in_result_fn)]
263#[allow(clippy::unwrap_used)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn new_token_not_cancelled() {
269        let token = AtomicCancellationToken::new();
270        assert!(!token.is_cancelled());
271    }
272
273    #[test]
274    fn cancel_sets_flag() {
275        let token = AtomicCancellationToken::new();
276        token.cancel();
277        assert!(token.is_cancelled());
278    }
279
280    #[test]
281    fn cancel_is_idempotent() {
282        let token = AtomicCancellationToken::new();
283        token.cancel();
284        token.cancel();
285        token.cancel();
286        assert!(token.is_cancelled());
287    }
288
289    #[test]
290    fn reset_clears_cancellation() {
291        let token = AtomicCancellationToken::new();
292        token.cancel();
293        assert!(token.is_cancelled());
294        token.reset();
295        assert!(!token.is_cancelled());
296    }
297
298    #[test]
299    fn default_creates_uncancelled_token() {
300        let token = AtomicCancellationToken::default();
301        assert!(!token.is_cancelled());
302    }
303
304    // ResourceTracker tests
305    #[test]
306    fn new_resource_tracker_is_empty() {
307        let tracker = ResourceTracker::new();
308        // Verify cleanup_all succeeds with no registered resources
309        assert!(tracker.cleanup_all().is_ok());
310    }
311
312    #[test]
313    fn resource_tracker_default() {
314        let tracker = ResourceTracker::default();
315        assert!(tracker.cleanup_all().is_ok());
316    }
317
318    #[test]
319    fn register_output_and_mark_complete() -> std::io::Result<()> {
320        use std::fs;
321        use tempfile::TempDir;
322
323        let dir = TempDir::new()?;
324        let output_path = dir.path().join("output.txt");
325
326        // Create the output file
327        fs::write(&output_path, b"test data")?;
328        assert!(output_path.exists());
329
330        let tracker = ResourceTracker::new();
331        tracker.register_output(output_path.clone());
332        tracker.mark_complete();
333
334        // Clean up - file should NOT be deleted (marked complete)
335        tracker.cleanup_all()?;
336        assert!(
337            output_path.exists(),
338            "Completed output file should not be deleted"
339        );
340
341        Ok(())
342    }
343
344    #[test]
345    fn register_output_without_complete_deletes() -> std::io::Result<()> {
346        use std::fs;
347        use tempfile::TempDir;
348
349        let dir = TempDir::new()?;
350        let output_path = dir.path().join("output.txt");
351
352        // Create the output file
353        fs::write(&output_path, b"test data")?;
354        assert!(output_path.exists());
355
356        let tracker = ResourceTracker::new();
357        tracker.register_output(output_path.clone());
358
359        // Clean up without marking complete - file SHOULD be deleted
360        tracker.cleanup_all()?;
361        assert!(
362            !output_path.exists(),
363            "Incomplete output file should be deleted"
364        );
365
366        Ok(())
367    }
368
369    #[test]
370    fn register_temp_file_always_deletes() -> std::io::Result<()> {
371        use std::fs;
372        use tempfile::TempDir;
373
374        let dir = TempDir::new()?;
375        let temp_path = dir.path().join("temp.txt");
376
377        // Create the temp file
378        fs::write(&temp_path, b"temp data")?;
379        assert!(temp_path.exists());
380
381        let tracker = ResourceTracker::new();
382        tracker.register_temp_file(temp_path.clone());
383        tracker.mark_complete(); // Even if marked complete
384
385        // Clean up - temp file should ALWAYS be deleted
386        tracker.cleanup_all()?;
387        assert!(!temp_path.exists(), "Temp file should always be deleted");
388
389        Ok(())
390    }
391
392    #[test]
393    fn register_multiple_temp_files() -> std::io::Result<()> {
394        use std::fs;
395        use tempfile::TempDir;
396
397        let dir = TempDir::new()?;
398        let temp1 = dir.path().join("temp1.txt");
399        let temp2 = dir.path().join("temp2.txt");
400        let temp3 = dir.path().join("temp3.txt");
401
402        // Create temp files
403        fs::write(&temp1, b"temp1")?;
404        fs::write(&temp2, b"temp2")?;
405        fs::write(&temp3, b"temp3")?;
406
407        let tracker = ResourceTracker::new();
408        tracker.register_temp_file(temp1.clone());
409        tracker.register_temp_file(temp2.clone());
410        tracker.register_temp_file(temp3.clone());
411
412        // Clean up - all temp files should be deleted
413        tracker.cleanup_all()?;
414        assert!(!temp1.exists());
415        assert!(!temp2.exists());
416        assert!(!temp3.exists());
417
418        Ok(())
419    }
420
421    #[test]
422    fn drop_cleans_up_incomplete_output() -> std::io::Result<()> {
423        use std::fs;
424        use tempfile::TempDir;
425
426        let dir = TempDir::new()?;
427        let output_path = dir.path().join("output.txt");
428
429        // Create the output file
430        fs::write(&output_path, b"test data")?;
431        assert!(output_path.exists());
432
433        {
434            let tracker = ResourceTracker::new();
435            tracker.register_output(output_path.clone());
436            // Drop without marking complete
437        }
438
439        // File should be deleted by Drop
440        assert!(
441            !output_path.exists(),
442            "Drop should delete incomplete output"
443        );
444
445        Ok(())
446    }
447
448    #[test]
449    fn drop_keeps_completed_output() -> std::io::Result<()> {
450        use std::fs;
451        use tempfile::TempDir;
452
453        let dir = TempDir::new()?;
454        let output_path = dir.path().join("output.txt");
455
456        // Create the output file
457        fs::write(&output_path, b"test data")?;
458
459        {
460            let tracker = ResourceTracker::new();
461            tracker.register_output(output_path.clone());
462            tracker.mark_complete();
463            // Drop with completion
464        }
465
466        // File should NOT be deleted
467        assert!(output_path.exists(), "Drop should keep completed output");
468
469        Ok(())
470    }
471
472    #[test]
473    fn cleanup_nonexistent_files_succeeds() {
474        use tempfile::TempDir;
475
476        let dir = TempDir::new().unwrap();
477        let fake_path = dir.path().join("does_not_exist.txt");
478
479        let tracker = ResourceTracker::new();
480        tracker.register_output(fake_path.clone());
481        tracker.register_temp_file(fake_path);
482
483        // Cleanup should succeed even if files don't exist
484        assert!(tracker.cleanup_all().is_ok());
485    }
486
487    #[test]
488    fn replace_output_registration() -> std::io::Result<()> {
489        use std::fs;
490        use tempfile::TempDir;
491
492        let dir = TempDir::new()?;
493        let output1 = dir.path().join("output1.txt");
494        let output2 = dir.path().join("output2.txt");
495
496        fs::write(&output1, b"data1")?;
497        fs::write(&output2, b"data2")?;
498
499        let tracker = ResourceTracker::new();
500        tracker.register_output(output1.clone());
501        tracker.register_output(output2.clone()); // Replace first registration
502
503        // Only the second output should be deleted
504        tracker.cleanup_all()?;
505
506        assert!(
507            output1.exists(),
508            "First output should not be tracked after replacement"
509        );
510        assert!(!output2.exists(), "Second output should be deleted");
511
512        Ok(())
513    }
514}