1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
/// A token that can be used to signal cancellation to an in-progress workflow execution.
///
/// Clones share the same underlying state via `Arc`; cancelling any clone cancels all.
/// Designed to be cheap to clone and thread-safe.
#[derive(Clone, Debug)]
pub struct CancellationToken {
cancelled: Arc<AtomicBool>,
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
impl CancellationToken {
/// Create a new, uncancelled token.
#[must_use]
pub fn new() -> Self {
Self {
cancelled: Arc::new(AtomicBool::new(false)),
}
}
/// Signal cancellation. After this call, `is_cancelled()` returns `true` on all clones.
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
/// Returns `true` if `cancel()` has been called on this token or any of its clones.
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
/// Returns a future that resolves once the token has been cancelled.
///
/// Polls every 50 ms. Acceptable latency for interactive cancellation of long-running LLM calls.
pub async fn cancelled(&self) {
loop {
if self.is_cancelled() {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_token_is_not_cancelled() {
// Given: a freshly created CancellationToken
let token = CancellationToken::new();
// Then: it is not cancelled
assert!(!token.is_cancelled());
}
#[test]
fn test_cancel_marks_token_as_cancelled() {
// Given: a new, uncancelled token
let token = CancellationToken::new();
// When: cancel() is called
token.cancel();
// Then: is_cancelled() returns true
assert!(token.is_cancelled());
}
#[test]
fn test_clone_cancel_affects_original() {
// Given: a token and its clone
let token = CancellationToken::new();
let clone = token.clone();
// When: the clone is cancelled
clone.cancel();
// Then: the original also sees the cancellation (Arc-shared state)
assert!(token.is_cancelled());
}
#[test]
fn test_cancel_original_affects_clone() {
// Given: a token and its clone
let token = CancellationToken::new();
let clone = token.clone();
// When: the original is cancelled
token.cancel();
// Then: the clone also reports cancelled
assert!(clone.is_cancelled());
}
#[test]
fn test_multiple_cancel_calls_are_idempotent() {
// Given: a token
let token = CancellationToken::new();
// When: cancel() is called multiple times
token.cancel();
token.cancel();
token.cancel();
// Then: still simply cancelled, no panic
assert!(token.is_cancelled());
}
#[test]
fn test_cancel_from_another_thread() {
// Given: a token shared with another thread
let token = CancellationToken::new();
let token_in_thread = token.clone();
// When: a separate thread calls cancel()
let handle = std::thread::spawn(move || {
token_in_thread.cancel();
});
handle.join().unwrap_or_else(|_| panic!("thread panicked"));
// Then: the main thread sees the cancellation
assert!(token.is_cancelled());
}
#[test]
fn test_default_is_not_cancelled() {
// Given: a token created via Default trait
let token = CancellationToken::default();
// Then: it is not cancelled
assert!(!token.is_cancelled());
}
}