1pub mod memory_fs;
12pub use memory_fs::{FsOp, MemoryFileSystem};
13
14use std::any::Any;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18
19use seams_rs_core::{Clock, JoinError, JoinHandle, Sleeper, Spawner};
20
21#[derive(Debug, Clone)]
25pub struct ManualClock {
26 now_ns: Arc<Mutex<u64>>,
27 start: Arc<Instant>,
28 initial_ns: u64,
29}
30
31impl ManualClock {
32 pub fn new() -> Self {
34 Self::from_ns(0)
35 }
36
37 pub fn from_ns(ns: u64) -> Self {
39 Self {
40 now_ns: Arc::new(Mutex::new(ns)),
41 start: Arc::new(Instant::now()),
42 initial_ns: ns,
43 }
44 }
45
46 pub fn advance(&self, d: Duration) {
48 let mut g = self.now_ns.lock().unwrap();
49 *g = g.saturating_add(d.as_nanos() as u64);
50 }
51
52 pub fn set_now_ns(&self, ns: u64) {
54 *self.now_ns.lock().unwrap() = ns;
55 }
56}
57
58impl Default for ManualClock {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl Clock for ManualClock {
65 fn now_ns(&self) -> u64 {
66 *self.now_ns.lock().unwrap()
67 }
68
69 fn now_instant(&self) -> Instant {
70 let offset = self.now_ns().saturating_sub(self.initial_ns);
71 *self.start + Duration::from_nanos(offset)
72 }
73}
74
75#[derive(Debug, Default, Clone)]
79pub struct InstantSleeper {
80 calls: Arc<Mutex<Vec<(Duration, bool)>>>,
81}
82
83impl InstantSleeper {
84 pub fn new() -> Self {
86 Self::default()
87 }
88
89 pub fn calls(&self) -> Vec<(Duration, bool)> {
91 self.calls.lock().unwrap().clone()
92 }
93}
94
95impl Sleeper for InstantSleeper {
96 fn sleep(&self, duration: Duration) {
97 self.calls.lock().unwrap().push((duration, false));
98 }
99
100 fn sleep_responsive(&self, total: Duration, shutdown: &AtomicBool) -> bool {
101 let flag = shutdown.load(Ordering::SeqCst);
102 self.calls.lock().unwrap().push((total, flag));
103 flag
104 }
105}
106
107fn panic_msg(payload: &(dyn Any + Send)) -> String {
110 if let Some(s) = payload.downcast_ref::<&'static str>() {
111 (*s).to_string()
112 } else if let Some(s) = payload.downcast_ref::<String>() {
113 s.clone()
114 } else {
115 "panic".to_string()
116 }
117}
118
119#[derive(Debug, Default, Clone, Copy)]
121pub struct CurrentThreadSpawner;
122
123impl CurrentThreadSpawner {
124 pub fn new() -> Self {
126 Self
127 }
128}
129
130struct CurrentThreadHandle<T> {
131 result: Result<T, JoinError>,
132}
133
134impl<T: Send> JoinHandle<T> for CurrentThreadHandle<T> {
135 fn join(self: Box<Self>) -> Result<T, JoinError> {
136 self.result
137 }
138}
139
140impl Spawner for CurrentThreadSpawner {
141 fn spawn_blocking<F, T>(&self, f: F) -> Box<dyn JoinHandle<T>>
142 where
143 F: FnOnce() -> T + Send + 'static,
144 T: Send + 'static,
145 {
146 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f))
147 .map_err(|payload| JoinError::Panicked(panic_msg(&*payload)));
148 Box::new(CurrentThreadHandle { result })
149 }
150}
151
152type Thunk = Box<dyn FnOnce() + Send + 'static>;
153type ThunkSlot = Arc<Mutex<Option<Thunk>>>;
154
155#[derive(Default)]
158pub struct DeferredSpawner {
159 pending: Arc<Mutex<Vec<ThunkSlot>>>,
160}
161
162impl DeferredSpawner {
163 pub fn new() -> Self {
165 Self::default()
166 }
167
168 pub fn run_pending(&self) {
170 let taken: Vec<ThunkSlot> = std::mem::take(&mut *self.pending.lock().unwrap());
171 for slot in taken {
172 if let Some(t) = slot.lock().unwrap().take() {
173 t();
174 }
175 }
176 }
177
178 pub fn join_all(&self) {
180 self.run_pending();
181 }
182
183 pub fn pending_count(&self) -> usize {
185 self.pending.lock().unwrap().len()
186 }
187}
188
189struct DeferredHandle<T> {
190 own_thunk: ThunkSlot,
191 slot: Arc<Mutex<Option<Result<T, JoinError>>>>,
192}
193
194impl<T: Send + 'static> JoinHandle<T> for DeferredHandle<T> {
195 fn join(self: Box<Self>) -> Result<T, JoinError> {
196 if let Some(t) = self.own_thunk.lock().unwrap().take() {
197 t();
198 }
199 match self.slot.lock().unwrap().take() {
200 Some(r) => r,
201 None => Err(JoinError::Cancelled),
202 }
203 }
204}
205
206impl Spawner for DeferredSpawner {
207 fn spawn_blocking<F, T>(&self, f: F) -> Box<dyn JoinHandle<T>>
208 where
209 F: FnOnce() -> T + Send + 'static,
210 T: Send + 'static,
211 {
212 let slot: Arc<Mutex<Option<Result<T, JoinError>>>> = Arc::new(Mutex::new(None));
213 let slot_writer = slot.clone();
214 let thunk: Thunk = Box::new(move || {
215 let r = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f))
216 .map_err(|payload| JoinError::Panicked(panic_msg(&*payload)));
217 *slot_writer.lock().unwrap() = Some(r);
218 });
219 let own: ThunkSlot = Arc::new(Mutex::new(Some(thunk)));
220 self.pending.lock().unwrap().push(own.clone());
221 Box::new(DeferredHandle {
222 own_thunk: own,
223 slot,
224 })
225 }
226}
227
228#[cfg(test)]
231mod tests {
232 use super::*;
233 use seams_rs_core::contract_tests as ct;
234 use std::time::Duration;
235
236 #[test]
237 fn manual_clock_now_ns() {
238 ct::clock_now_ns_monotonic(&ManualClock::new());
239 }
240
241 #[test]
242 fn manual_clock_now_instant() {
243 ct::clock_now_instant_monotonic(&ManualClock::new());
244 }
245
246 #[test]
247 fn instant_sleeper_sleep() {
248 ct::sleeper_sleep_waits(
249 &InstantSleeper::new(),
250 Duration::ZERO,
251 Duration::from_millis(50),
252 );
253 }
254
255 #[test]
256 fn instant_sleeper_shutdown_before() {
257 ct::sleeper_responsive_shutdown_before(&InstantSleeper::new());
258 }
259
260 #[test]
261 fn instant_sleeper_shutdown_during() {
262 ct::sleeper_responsive_shutdown_during(&InstantSleeper::new());
263 }
264
265 #[test]
266 fn instant_sleeper_no_shutdown() {
267 ct::sleeper_responsive_no_shutdown(&InstantSleeper::new());
268 }
269
270 #[test]
271 fn current_thread_spawner_value() {
272 ct::spawner_returns_value(&CurrentThreadSpawner::new());
273 }
274
275 #[test]
276 fn current_thread_spawner_panic() {
277 ct::spawner_propagates_panic(&CurrentThreadSpawner::new());
278 }
279
280 #[test]
281 fn deferred_spawner_value() {
282 ct::spawner_returns_value(&DeferredSpawner::new());
283 }
284
285 #[test]
286 fn deferred_spawner_panic() {
287 ct::spawner_propagates_panic(&DeferredSpawner::new());
288 }
289
290 #[test]
291 fn manual_clock_advance() {
292 let c = ManualClock::from_ns(1000);
293 c.advance(Duration::from_nanos(500));
294 assert_eq!(c.now_ns(), 1500);
295 }
296
297 #[test]
298 fn manual_clock_set() {
299 let c = ManualClock::new();
300 c.set_now_ns(42);
301 assert_eq!(c.now_ns(), 42);
302 }
303
304 #[test]
305 fn instant_sleeper_records_calls() {
306 let s = InstantSleeper::new();
307 s.sleep(Duration::from_millis(10));
308 assert_eq!(s.calls().len(), 1);
309 assert_eq!(s.calls()[0].0, Duration::from_millis(10));
310 }
311
312 #[test]
313 fn deferred_spawner_defers() {
314 let s = DeferredSpawner::new();
315 let h = s.spawn_blocking(|| 7);
316 assert_eq!(s.pending_count(), 1);
317 s.run_pending();
318 assert_eq!(s.pending_count(), 0);
319 assert_eq!(h.join().unwrap(), 7);
320 }
321
322 #[test]
323 fn deferred_spawner_pending_count_starts_zero() {
324 let s = DeferredSpawner::new();
325 assert_eq!(s.pending_count(), 0);
326 }
327
328 #[test]
329 fn deferred_spawner_join_all_drains() {
330 let s = DeferredSpawner::new();
331 let h = s.spawn_blocking(|| 99);
332 assert_eq!(s.pending_count(), 1);
333 s.join_all();
334 assert_eq!(s.pending_count(), 0);
335 assert_eq!(h.join().unwrap(), 99);
336 }
337
338 #[test]
339 fn manual_clock_now_instant_tracks_advance() {
340 let c = ManualClock::new();
341 let t0 = c.now_instant();
342 c.advance(Duration::from_millis(50));
343 let t1 = c.now_instant();
344 assert_eq!(t1.saturating_duration_since(t0), Duration::from_millis(50));
345 }
346}