Skip to main content

kimun_notes/util/
single_slot_task.rs

1//! Single-slot cancellable tokio task.
2//!
3//! Encodes the "one task at a time; aborting an old one cancels its
4//! spawned future" pattern that the autocomplete controller and the
5//! editor's autosave both reinvent. Drop aborts — so the spawned
6//! future cannot outlive the parent that owned the slot.
7//!
8//! ```ignore
9//! let mut slot: SingleSlotTask<String> = SingleSlotTask::empty();
10//! slot.spawn(async { "hello".to_string() });
11//! // Spawning again aborts the prior task.
12//! slot.spawn(async { "world".to_string() });
13//! ```
14//!
15//! `JoinHandle` alone does NOT cancel its task on drop — it merely
16//! detaches. This type holds an `AbortHandle` alongside and cancels
17//! through it in `abort()` and `Drop`.
18
19use std::future::Future;
20use std::time::Duration;
21use tokio::task::{AbortHandle, JoinError, JoinHandle};
22
23pub struct SingleSlotTask<T> {
24    handle: Option<JoinHandle<T>>,
25    abort: Option<AbortHandle>,
26}
27
28impl<T> SingleSlotTask<T> {
29    pub fn empty() -> Self {
30        Self {
31            handle: None,
32            abort: None,
33        }
34    }
35
36    /// True between `spawn` and the moment the task's future finishes
37    /// (cancellation OR normal return). Cheap; no awaiting.
38    pub fn is_in_flight(&self) -> bool {
39        self.handle
40            .as_ref()
41            .map(|h| !h.is_finished())
42            .unwrap_or(false)
43    }
44
45    /// Abort the in-flight task (if any) and clear the slot. No-op on
46    /// an empty slot. `abort()` is asynchronous — the spawned future
47    /// is cancelled at its next await point, so a result already
48    /// sent on a channel may still arrive at its consumer. Consumers
49    /// are responsible for filtering stale results (e.g. via revision
50    /// match).
51    pub fn abort(&mut self) {
52        if let Some(h) = self.abort.take() {
53            h.abort();
54        }
55        self.handle = None;
56    }
57}
58
59impl<T: Send + 'static> SingleSlotTask<T> {
60    /// Aborts any in-flight task, then spawns `fut`. Returns the new
61    /// task's `AbortHandle` so callers that want to cancel from
62    /// somewhere other than the slot's owner (e.g. an event handler
63    /// holding a clone) can do so. Usually ignored.
64    pub fn spawn<F>(&mut self, fut: F) -> AbortHandle
65    where
66        F: Future<Output = T> + Send + 'static,
67    {
68        if let Some(prev) = self.abort.take() {
69            prev.abort();
70        }
71        let handle = tokio::spawn(fut);
72        let abort_handle = handle.abort_handle();
73        self.abort = Some(abort_handle.clone());
74        self.handle = Some(handle);
75        abort_handle
76    }
77
78    /// `Some(result)` if the task completed within the deadline.
79    /// `None` if the deadline expired — the slot keeps the handle so
80    /// the caller can decide whether to `abort()` explicitly. The
81    /// returned `Result` is `JoinHandle`'s own result type, so a
82    /// panic in the task surfaces as `Err(JoinError)`.
83    pub async fn await_with_timeout(&mut self, dur: Duration) -> Option<Result<T, JoinError>> {
84        let handle = self.handle.as_mut()?;
85        match tokio::time::timeout(dur, handle).await {
86            Ok(res) => {
87                // Task completed — clear the slot.
88                self.handle = None;
89                self.abort = None;
90                Some(res)
91            }
92            Err(_) => None,
93        }
94    }
95}
96
97impl<T> Drop for SingleSlotTask<T> {
98    fn drop(&mut self) {
99        self.abort();
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use std::sync::Arc;
107    use std::sync::atomic::{AtomicBool, Ordering};
108    use tokio::sync::Notify;
109
110    #[tokio::test]
111    async fn empty_slot_reports_idle() {
112        let slot: SingleSlotTask<()> = SingleSlotTask::empty();
113        assert!(!slot.is_in_flight());
114    }
115
116    #[tokio::test]
117    async fn spawn_runs_to_completion() {
118        let mut slot: SingleSlotTask<u32> = SingleSlotTask::empty();
119        slot.spawn(async { 42 });
120        let out = slot.await_with_timeout(Duration::from_secs(1)).await;
121        assert_eq!(out.expect("must complete").expect("no panic"), 42);
122        assert!(!slot.is_in_flight());
123    }
124
125    #[tokio::test]
126    async fn single_slot_task_spawn_aborts_previous() {
127        let canary = Arc::new(AtomicBool::new(false));
128        let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
129
130        let canary_clone = canary.clone();
131        slot.spawn(async move {
132            tokio::time::sleep(Duration::from_secs(60)).await;
133            canary_clone.store(true, Ordering::SeqCst);
134        });
135
136        // Replace with a fast task. The 60s sleeper must NEVER run to
137        // completion, so the canary stays false.
138        slot.spawn(async {});
139        let _ = slot.await_with_timeout(Duration::from_secs(1)).await;
140
141        tokio::time::sleep(Duration::from_millis(50)).await;
142        assert!(
143            !canary.load(Ordering::SeqCst),
144            "previous task must have been aborted"
145        );
146    }
147
148    #[tokio::test]
149    async fn single_slot_task_drop_aborts_in_flight() {
150        let canary = Arc::new(AtomicBool::new(false));
151        {
152            let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
153            let canary_clone = canary.clone();
154            slot.spawn(async move {
155                tokio::time::sleep(Duration::from_secs(60)).await;
156                canary_clone.store(true, Ordering::SeqCst);
157            });
158            // slot drops here.
159        }
160        tokio::time::sleep(Duration::from_millis(50)).await;
161        assert!(
162            !canary.load(Ordering::SeqCst),
163            "drop must abort the spawned task"
164        );
165    }
166
167    #[tokio::test]
168    async fn single_slot_task_timeout_returns_none_keeps_handle() {
169        let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
170        slot.spawn(async {
171            tokio::time::sleep(Duration::from_secs(60)).await;
172        });
173        let out = slot.await_with_timeout(Duration::from_millis(50)).await;
174        assert!(out.is_none(), "long task must time out");
175        assert!(
176            slot.is_in_flight(),
177            "handle should survive a timeout so caller can decide to abort"
178        );
179        // Explicit cleanup.
180        slot.abort();
181        assert!(!slot.is_in_flight());
182    }
183
184    #[tokio::test]
185    async fn explicit_abort_clears_slot() {
186        let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
187        let notify = Arc::new(Notify::new());
188        let n = notify.clone();
189        slot.spawn(async move {
190            n.notified().await;
191        });
192        assert!(slot.is_in_flight());
193        slot.abort();
194        assert!(!slot.is_in_flight());
195    }
196}