1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
mod common;
use camber::{RuntimeError, channel, runtime, spawn};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
#[test]
fn cancel_stops_task_at_next_io_boundary() {
runtime::run(|| {
let (tx, rx) = channel::bounded::<i32>(1);
// Barrier: task signals after first recv completes (uses std channel
// to avoid camber cancellation checks).
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::<()>(0);
let handle = spawn(move || -> Result<(), RuntimeError> {
// First recv should succeed (we send a value to unblock it)
let _val = rx.recv()?;
// Signal: first recv done, safe to set cancel flag
let _ = ready_tx.send(());
// Second recv should return Cancelled (flag set between recvs)
match rx.recv() {
Err(RuntimeError::Cancelled) => Ok(()),
other => panic!("expected Cancelled, got {other:?}"),
}
});
// Send a value so the first recv succeeds
tx.send(1).unwrap();
// Wait until the task has completed the first recv
ready_rx.recv().unwrap();
// Now cancel — the second recv will see the flag
handle.cancel();
// Task should exit cleanly via ? on Cancelled
let result = handle.join();
assert!(result.is_ok(), "expected Ok, got {result:?}");
})
.unwrap();
}
#[test]
fn cancel_before_task_starts_io() {
runtime::run(|| {
let handle = spawn(|| -> Result<(), RuntimeError> {
// Sleep before doing IO — cancellation happens during sleep
thread::sleep(Duration::from_millis(100));
// http::get should return Cancelled without making a request
common::block_on(camber::http::get("http://127.0.0.1:1"))?;
Ok(())
});
// Cancel immediately (before sleep finishes)
handle.cancel();
let result = handle.join().unwrap();
let err = result.unwrap_err();
assert!(
matches!(err, RuntimeError::Cancelled),
"expected Cancelled, got {err:?}"
);
})
.unwrap();
}
#[test]
fn join_after_cancel_returns_result() {
runtime::run(|| {
let handle = spawn(|| 42);
// Join first — task completes before cancel
let result = handle.join().unwrap();
assert_eq!(result, 42);
// cancel() after join is consumed — JoinHandle moved into join(),
// so we test cancel on a completed-but-not-yet-joined task instead.
})
.unwrap();
// Variant: cancel after task completes but before join
runtime::run(|| {
let handle = spawn(|| {
thread::sleep(Duration::from_millis(10));
42
});
thread::sleep(Duration::from_millis(50));
handle.cancel();
let result = handle.join().unwrap();
assert_eq!(result, 42);
})
.unwrap();
}
#[test]
fn channel_iter_respects_cancellation() {
runtime::run(|| {
let (tx, rx) = channel::bounded::<i32>(10);
let count = Arc::new(AtomicUsize::new(0));
let count_inner = Arc::clone(&count);
// Barrier: task signals it has started iterating
let (started_tx, started_rx) = std::sync::mpsc::sync_channel::<()>(0);
let handle = spawn(move || {
started_tx.send(()).unwrap();
for _val in rx.iter() {
count_inner.fetch_add(1, Ordering::SeqCst);
}
});
// Wait for the task to start iterating
started_rx.recv().unwrap();
// Cancel the task
handle.cancel();
// Send one value to unblock the current iteration
let _ = tx.send(99);
// Task should exit (iter stops yielding after cancellation)
let result = handle.join();
assert!(result.is_ok(), "expected Ok, got {result:?}");
// Should have received at most 1 item (the unblock value)
let received = count.load(Ordering::SeqCst);
assert!(received <= 1, "expected at most 1 item, got {received}");
})
.unwrap();
}
#[test]
fn cancel_detected_after_io_completes() {
runtime::run(|| {
let (tx, rx) = channel::bounded::<i32>(1);
let reached_second_op = Arc::new(AtomicBool::new(false));
let reached_inner = Arc::clone(&reached_second_op);
let handle = spawn(move || -> Result<i32, RuntimeError> {
// Block on recv — value will arrive, then cancel is set
let val = rx.recv()?;
// If we get here, recv succeeded. The next IO should detect cancel.
reached_inner.store(true, Ordering::Release);
// This recv should return Cancelled (cancel set after first recv)
let second = rx.recv()?;
Ok(val + second)
});
// Send value to unblock the first recv
tx.send(10).unwrap();
// Small delay to let the task receive the value
thread::sleep(Duration::from_millis(20));
// Cancel — task should detect this on the next IO operation
handle.cancel();
let result = handle.join().unwrap();
assert!(
reached_second_op.load(Ordering::Acquire),
"task should have passed first recv"
);
assert!(
matches!(result, Err(RuntimeError::Cancelled)),
"expected Cancelled, got {result:?}"
);
})
.unwrap();
}