simvar_utils 0.3.0

Simulator Utils package
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
//! Utility functions for simulation testing and cancellation management.
//!
//! This crate provides utilities for managing worker threads and cancellation tokens
//! in simulation environments. It supports both thread-local and global cancellation,
//! allowing tests to gracefully terminate simulations and async operations.
//!
//! # Features
//!
//! * **Thread Management**: Unique worker thread ID tracking
//! * **Cancellation Tokens**: Thread-local and global cancellation support
//! * **Async Utilities**: Run futures until simulation cancellation
//!
//! # Example
//!
//! ```rust
//! use simvar_utils::{worker_thread_id, run_until_simulation_cancelled};
//!
//! // Get unique thread ID
//! let thread_id = worker_thread_id();
//! println!("Worker thread ID: {}", thread_id);
//!
//! # async fn example() {
//! # async fn simulate_work() -> u32 { 42 }
//! // Run future until cancelled
//! let result = run_until_simulation_cancelled(async {
//!     simulate_work().await
//! }).await;
//!
//! match result {
//!     Some(output) => println!("Completed: {}", output),
//!     None => println!("Cancelled"),
//! }
//! # }
//! ```

#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
#![warn(clippy::all, clippy::pedantic, clippy::nursery, clippy::cargo)]
#![allow(clippy::multiple_crate_versions)]

use std::{
    cell::RefCell,
    future::Future,
    sync::{LazyLock, RwLock, atomic::AtomicU64},
};

use switchy::unsync::util::CancellationToken;

static WORKER_THREAD_ID_COUNTER: LazyLock<AtomicU64> = LazyLock::new(|| AtomicU64::new(1));

thread_local! {
    static WORKER_THREAD_ID: RefCell<u64> = RefCell::new(WORKER_THREAD_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst));
}

/// Returns the unique identifier for the current worker thread.
///
/// Each thread gets a unique, monotonically increasing ID starting from 1.
#[must_use]
pub fn worker_thread_id() -> u64 {
    WORKER_THREAD_ID.with_borrow(|x| *x)
}

thread_local! {
    static SIMULATOR_CANCELLATION_TOKEN: RefCell<RwLock<CancellationToken>> =
        RefCell::new(RwLock::new(CancellationToken::new()));
}

/// Resets the thread-local simulation cancellation token.
///
/// Creates a new cancellation token for the current thread, clearing any previous
/// cancellation state. Use this to prepare for a new simulation run.
///
/// # Panics
///
/// * If the `SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to write to
pub fn reset_simulator_cancellation_token() {
    SIMULATOR_CANCELLATION_TOKEN
        .with_borrow_mut(|x| *x.write().unwrap() = CancellationToken::new());
}

/// Checks if the current thread's simulation has been cancelled.
///
/// Returns `true` if either the global or thread-local cancellation token has been triggered.
///
/// # Panics
///
/// * If the `SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to read from
#[must_use]
pub fn is_simulator_cancelled() -> bool {
    is_global_simulator_cancelled()
        || SIMULATOR_CANCELLATION_TOKEN.with_borrow(|x| x.read().unwrap().is_cancelled())
}

/// Cancels the current thread's simulation.
///
/// Triggers the thread-local cancellation token, causing any futures running with
/// [`run_until_simulation_cancelled`] to terminate.
///
/// # Panics
///
/// * If the `SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to read from
pub fn cancel_simulation() {
    SIMULATOR_CANCELLATION_TOKEN.with_borrow(|x| x.read().unwrap().cancel());
}

static GLOBAL_SIMULATOR_CANCELLATION_TOKEN: LazyLock<RwLock<CancellationToken>> =
    LazyLock::new(|| RwLock::new(CancellationToken::new()));

/// Resets the global simulation cancellation token.
///
/// Creates a new global cancellation token, clearing any previous cancellation state
/// across all threads. Use this to prepare for a new simulation run.
///
/// # Panics
///
/// * If the `GLOBAL_SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to write to
pub fn reset_global_simulator_cancellation_token() {
    *GLOBAL_SIMULATOR_CANCELLATION_TOKEN.write().unwrap() = CancellationToken::new();
}

/// Checks if the global simulation has been cancelled.
///
/// Returns `true` if the global cancellation token has been triggered, affecting all threads.
///
/// # Panics
///
/// * If the `GLOBAL_SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to read from
#[must_use]
pub fn is_global_simulator_cancelled() -> bool {
    GLOBAL_SIMULATOR_CANCELLATION_TOKEN
        .read()
        .unwrap()
        .is_cancelled()
}

/// Cancels all simulations globally.
///
/// Triggers the global cancellation token, affecting all threads and causing any futures
/// running with [`run_until_simulation_cancelled`] to terminate across the entire process.
///
/// # Panics
///
/// * If the `GLOBAL_SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to read from
pub fn cancel_global_simulation() {
    GLOBAL_SIMULATOR_CANCELLATION_TOKEN.read().unwrap().cancel();
}

/// Runs a future until it completes or simulation is cancelled.
///
/// Returns `Some(output)` if the future completes, or `None` if either the global
/// or thread-local simulation cancellation token is triggered.
///
/// # Examples
///
/// ```rust
/// use simvar_utils::{reset_global_simulator_cancellation_token, reset_simulator_cancellation_token, run_until_simulation_cancelled};
///
/// # async fn example() {
/// reset_global_simulator_cancellation_token();
/// reset_simulator_cancellation_token();
///
/// let output = run_until_simulation_cancelled(async { 7_u8 }).await;
/// assert_eq!(output, Some(7));
/// # }
/// ```
///
/// # Panics
///
/// * If the `GLOBAL_SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to read from
/// * If the `SIMULATOR_CANCELLATION_TOKEN` `RwLock` fails to read from
pub async fn run_until_simulation_cancelled<F>(fut: F) -> Option<F::Output>
where
    F: Future,
{
    let global_token = GLOBAL_SIMULATOR_CANCELLATION_TOKEN.read().unwrap().clone();
    let local_token = SIMULATOR_CANCELLATION_TOKEN.with_borrow(|x| x.read().unwrap().clone());

    switchy::unsync::select! {
        resp = fut => Some(resp),
        () = global_token.cancelled() => None,
        () = local_token.cancelled() => None,
    }
}

#[cfg(test)]
mod tests {
    use serial_test::serial;

    use super::*;

    // Note: All tests in this module use #[serial] because they interact with the global
    // SIMULATOR_CANCELLATION_TOKEN state. Running these tests in parallel would cause
    // race conditions where one test's state changes affect another test's expectations.
    // The serial_test crate ensures these tests run one at a time.

    #[test_log::test]
    #[serial]
    fn test_worker_thread_id_returns_unique_ids() {
        let id1 = worker_thread_id();
        let id2 = worker_thread_id();
        // Same thread should return same ID
        assert_eq!(id1, id2);
    }

    #[test_log::test]
    #[serial]
    fn test_worker_thread_id_uniqueness_across_threads() {
        let id1 = worker_thread_id();
        let handle = std::thread::spawn(worker_thread_id);
        let id2 = handle.join().unwrap();
        // Different threads should have different IDs
        assert_ne!(id1, id2);
    }

    #[test_log::test]
    #[serial]
    fn test_local_cancellation_isolated_between_threads() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        // Cancel local simulation on this thread
        cancel_simulation();
        assert!(is_simulator_cancelled());

        // Spawn a new thread and check its local state is NOT cancelled
        let handle = std::thread::spawn(|| {
            // This thread has its own thread-local token which should NOT be cancelled
            reset_simulator_cancellation_token();
            is_simulator_cancelled()
        });

        let other_thread_cancelled = handle.join().unwrap();
        // The other thread's local cancellation state should be false
        // (since we reset it and only cancelled on the main thread)
        assert!(
            !other_thread_cancelled,
            "Local cancellation should not affect other threads"
        );
    }

    #[test_log::test]
    #[serial]
    fn test_reset_simulator_cancellation_token() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        // Cancel the token
        cancel_simulation();
        assert!(is_simulator_cancelled());

        // Reset should clear cancellation
        reset_simulator_cancellation_token();
        assert!(!is_simulator_cancelled());
    }

    #[test_log::test]
    #[serial]
    fn test_cancel_simulation_sets_cancelled_state() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        assert!(!is_simulator_cancelled());

        cancel_simulation();
        assert!(is_simulator_cancelled());
    }

    #[test_log::test]
    #[serial]
    fn test_is_simulator_cancelled_respects_global_cancellation() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        assert!(!is_simulator_cancelled());

        cancel_global_simulation();
        // Local cancellation should detect global cancellation
        assert!(is_simulator_cancelled());
    }

    #[test_log::test]
    #[serial]
    fn test_global_cancellation_independent_from_local() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        cancel_simulation();
        // Local cancelled but not global directly
        assert!(!is_global_simulator_cancelled());
        assert!(is_simulator_cancelled());
    }

    #[test_log::test]
    #[serial]
    fn test_reset_global_simulator_cancellation_token() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        cancel_global_simulation();

        assert!(is_global_simulator_cancelled());

        reset_global_simulator_cancellation_token();
        assert!(!is_global_simulator_cancelled());
    }

    #[test_log::test(switchy_async::test)]
    #[serial]
    async fn test_run_until_simulation_cancelled_completes_normally() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        let result = run_until_simulation_cancelled(async { 42 }).await;
        assert_eq!(result, Some(42));
    }

    #[test_log::test(switchy_async::test)]
    #[serial]
    async fn test_run_until_simulation_cancelled_with_local_cancellation() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        let cancel_task = async {
            cancel_simulation();
        };

        let work_task = async {
            // This will never complete
            std::future::pending::<()>().await;
            42
        };

        // Cancel immediately
        cancel_task.await;
        let result = run_until_simulation_cancelled(work_task).await;
        assert_eq!(result, None);
    }

    #[test_log::test(switchy_async::test)]
    #[serial]
    async fn test_run_until_simulation_cancelled_with_global_cancellation() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        let cancel_task = async {
            cancel_global_simulation();
        };

        let work_task = async {
            // This will never complete
            std::future::pending::<()>().await;
            42
        };

        // Cancel immediately
        cancel_task.await;
        let result = run_until_simulation_cancelled(work_task).await;
        assert_eq!(result, None);
    }

    #[test_log::test]
    #[serial]
    fn test_global_cancellation_affects_other_threads() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        // Verify not cancelled initially
        assert!(!is_global_simulator_cancelled());

        // Cancel globally from main thread
        cancel_global_simulation();

        // Verify another thread sees the global cancellation
        let handle = std::thread::spawn(|| {
            // Reset this thread's local token (should not affect global)
            reset_simulator_cancellation_token();
            // This should still return true because global is cancelled
            is_simulator_cancelled()
        });

        let other_thread_sees_cancellation = handle.join().unwrap();
        assert!(
            other_thread_sees_cancellation,
            "Global cancellation should be visible to all threads"
        );
    }

    #[test_log::test]
    #[serial]
    fn test_worker_thread_ids_are_monotonically_increasing() {
        // Spawn multiple threads and collect their IDs
        let mut handles = Vec::new();
        for _ in 0..5 {
            handles.push(std::thread::spawn(worker_thread_id));
        }

        let mut ids: Vec<u64> = handles.into_iter().map(|h| h.join().unwrap()).collect();

        // Sort to verify all IDs are unique
        ids.sort_unstable();
        let original_len = ids.len();
        ids.dedup();
        assert_eq!(ids.len(), original_len, "All thread IDs should be unique");

        // All IDs should be >= 1 (IDs start at 1)
        assert!(ids.iter().all(|&id| id >= 1), "All IDs should be >= 1");
    }

    #[test_log::test]
    #[serial]
    fn test_is_simulator_cancelled_with_both_local_and_global_cancelled() {
        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        // Cancel both local and global
        cancel_simulation();
        cancel_global_simulation();

        // is_simulator_cancelled should return true (tests the OR logic when both are true)
        assert!(is_simulator_cancelled());
        assert!(is_global_simulator_cancelled());

        // Reset only global, local should still keep it cancelled
        reset_global_simulator_cancellation_token();
        assert!(is_simulator_cancelled());
        assert!(!is_global_simulator_cancelled());

        // Reset local too, now should be false
        reset_simulator_cancellation_token();
        assert!(!is_simulator_cancelled());
    }

    #[test_log::test]
    #[serial]
    fn test_global_cancellation_from_multiple_threads_is_thread_safe() {
        // Reset all states
        reset_global_simulator_cancellation_token();

        // Spawn multiple threads that all try to cancel globally
        let mut handles = Vec::new();
        for _ in 0..10 {
            handles.push(std::thread::spawn(|| {
                cancel_global_simulation();
                is_global_simulator_cancelled()
            }));
        }

        // All threads should see the cancellation
        for handle in handles {
            let result = handle.join().unwrap();
            assert!(result, "All threads should see global cancellation");
        }

        // Main thread should also see it
        assert!(is_global_simulator_cancelled());
    }

    #[test_log::test(switchy_async::test)]
    #[serial]
    async fn test_run_until_simulation_cancelled_with_concurrent_cancellation() {
        use std::sync::{
            Arc,
            atomic::{AtomicBool, Ordering},
        };

        // Reset all states
        reset_global_simulator_cancellation_token();
        reset_simulator_cancellation_token();

        let work_started = Arc::new(AtomicBool::new(false));
        let work_started_clone = Arc::clone(&work_started);

        // Create a task that signals when it starts and then waits forever
        let work_task = async move {
            work_started_clone.store(true, Ordering::SeqCst);
            std::future::pending::<()>().await;
            42
        };

        // Spawn the cancellation in a way that happens after work starts
        let result = switchy::unsync::select! {
            result = run_until_simulation_cancelled(work_task) => result,
            () = async {
                // Wait until work has started
                while !work_started.load(Ordering::SeqCst) {
                    switchy::unsync::task::yield_now().await;
                }
                // Now cancel
                cancel_simulation();
            } => None,
        };

        assert_eq!(result, None, "Task should be cancelled");
    }
}