kimun_notes/util/
single_slot_task.rs1use std::future::Future;
20use std::time::Duration;
21use tokio::task::{AbortHandle, JoinError, JoinHandle};
22
23pub struct SingleSlotTask<T> {
24 handle: Option<JoinHandle<T>>,
25 abort: Option<AbortHandle>,
26}
27
28impl<T> SingleSlotTask<T> {
29 pub fn empty() -> Self {
30 Self {
31 handle: None,
32 abort: None,
33 }
34 }
35
36 pub fn is_in_flight(&self) -> bool {
39 self.handle
40 .as_ref()
41 .map(|h| !h.is_finished())
42 .unwrap_or(false)
43 }
44
45 pub fn abort(&mut self) {
52 if let Some(h) = self.abort.take() {
53 h.abort();
54 }
55 self.handle = None;
56 }
57}
58
59impl<T: Send + 'static> SingleSlotTask<T> {
60 pub fn spawn<F>(&mut self, fut: F) -> AbortHandle
65 where
66 F: Future<Output = T> + Send + 'static,
67 {
68 if let Some(prev) = self.abort.take() {
69 prev.abort();
70 }
71 let handle = tokio::spawn(fut);
72 let abort_handle = handle.abort_handle();
73 self.abort = Some(abort_handle.clone());
74 self.handle = Some(handle);
75 abort_handle
76 }
77
78 pub async fn await_with_timeout(&mut self, dur: Duration) -> Option<Result<T, JoinError>> {
84 let handle = self.handle.as_mut()?;
85 match tokio::time::timeout(dur, handle).await {
86 Ok(res) => {
87 self.handle = None;
89 self.abort = None;
90 Some(res)
91 }
92 Err(_) => None,
93 }
94 }
95}
96
97impl<T> Drop for SingleSlotTask<T> {
98 fn drop(&mut self) {
99 self.abort();
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use std::sync::Arc;
107 use std::sync::atomic::{AtomicBool, Ordering};
108 use tokio::sync::Notify;
109
110 #[tokio::test]
111 async fn empty_slot_reports_idle() {
112 let slot: SingleSlotTask<()> = SingleSlotTask::empty();
113 assert!(!slot.is_in_flight());
114 }
115
116 #[tokio::test]
117 async fn spawn_runs_to_completion() {
118 let mut slot: SingleSlotTask<u32> = SingleSlotTask::empty();
119 slot.spawn(async { 42 });
120 let out = slot.await_with_timeout(Duration::from_secs(1)).await;
121 assert_eq!(out.expect("must complete").expect("no panic"), 42);
122 assert!(!slot.is_in_flight());
123 }
124
125 #[tokio::test]
126 async fn single_slot_task_spawn_aborts_previous() {
127 let canary = Arc::new(AtomicBool::new(false));
128 let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
129
130 let canary_clone = canary.clone();
131 slot.spawn(async move {
132 tokio::time::sleep(Duration::from_secs(60)).await;
133 canary_clone.store(true, Ordering::SeqCst);
134 });
135
136 slot.spawn(async {});
139 let _ = slot.await_with_timeout(Duration::from_secs(1)).await;
140
141 tokio::time::sleep(Duration::from_millis(50)).await;
142 assert!(
143 !canary.load(Ordering::SeqCst),
144 "previous task must have been aborted"
145 );
146 }
147
148 #[tokio::test]
149 async fn single_slot_task_drop_aborts_in_flight() {
150 let canary = Arc::new(AtomicBool::new(false));
151 {
152 let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
153 let canary_clone = canary.clone();
154 slot.spawn(async move {
155 tokio::time::sleep(Duration::from_secs(60)).await;
156 canary_clone.store(true, Ordering::SeqCst);
157 });
158 }
160 tokio::time::sleep(Duration::from_millis(50)).await;
161 assert!(
162 !canary.load(Ordering::SeqCst),
163 "drop must abort the spawned task"
164 );
165 }
166
167 #[tokio::test]
168 async fn single_slot_task_timeout_returns_none_keeps_handle() {
169 let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
170 slot.spawn(async {
171 tokio::time::sleep(Duration::from_secs(60)).await;
172 });
173 let out = slot.await_with_timeout(Duration::from_millis(50)).await;
174 assert!(out.is_none(), "long task must time out");
175 assert!(
176 slot.is_in_flight(),
177 "handle should survive a timeout so caller can decide to abort"
178 );
179 slot.abort();
181 assert!(!slot.is_in_flight());
182 }
183
184 #[tokio::test]
185 async fn explicit_abort_clears_slot() {
186 let mut slot: SingleSlotTask<()> = SingleSlotTask::empty();
187 let notify = Arc::new(Notify::new());
188 let n = notify.clone();
189 slot.spawn(async move {
190 n.notified().await;
191 });
192 assert!(slot.is_in_flight());
193 slot.abort();
194 assert!(!slot.is_in_flight());
195 }
196}