1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
//! Lifecycle handle for a running scenario.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::schedule::stats::ScenarioStats;
use crate::SondaError;
/// A running scenario's lifecycle handle.
///
/// Returned by [`crate::schedule::launch::launch_scenario`]. Provides shutdown,
/// join, and stats access. Used identically by the CLI, multi_runner, and
/// sonda-server.
///
/// The handle is `Send`: the `JoinHandle` is behind an `Option` and the other
/// fields are all `Send`, so the handle can be stored in server state and
/// moved across await points.
pub struct ScenarioHandle {
/// Unique identifier for this scenario instance.
pub id: String,
/// Human-readable scenario name (from config).
pub name: String,
/// Shared shutdown flag. Setting this to `false` signals the runner to exit.
pub shutdown: Arc<AtomicBool>,
/// The OS thread running the scenario. `None` after [`ScenarioHandle::join`] consumes it.
pub thread: Option<JoinHandle<Result<(), SondaError>>>,
/// Wall-clock time when the scenario was launched.
pub started_at: Instant,
/// Live statistics updated by the runner thread on each tick.
pub stats: Arc<RwLock<ScenarioStats>>,
/// The configured target rate (events per second) from the scenario config.
pub target_rate: f64,
}
impl ScenarioHandle {
/// Signal the scenario to stop.
///
/// Sets the shutdown flag to `false` with `SeqCst` ordering. The runner
/// thread will observe this on its next tick and exit cleanly.
pub fn stop(&self) {
self.shutdown.store(false, Ordering::SeqCst);
}
/// Check whether the scenario thread is still running.
///
/// Returns `true` if the thread is still alive, `false` if it has exited
/// or if the handle has already been joined.
pub fn is_running(&self) -> bool {
self.thread
.as_ref()
.map(|t| !t.is_finished())
.unwrap_or(false)
}
/// Join the scenario thread, consuming it.
///
/// Blocks until the thread exits or the optional timeout expires. If a
/// timeout is provided and the thread does not exit within that time, this
/// method returns `Ok(())` without consuming the thread (the thread
/// continues running and the handle still owns it).
///
/// Returns the thread's result on success, or a [`SondaError`] if the
/// thread panicked or returned an error.
pub fn join(&mut self, timeout: Option<Duration>) -> Result<(), SondaError> {
if self.thread.is_none() {
// Already joined.
return Ok(());
}
// When a timeout is requested we poll with a short sleep, since
// `JoinHandle` does not expose a timed-join API on stable Rust.
if let Some(limit) = timeout {
let deadline = Instant::now() + limit;
while Instant::now() < deadline {
if self
.thread
.as_ref()
.map(|t| t.is_finished())
.unwrap_or(true)
{
break;
}
std::thread::sleep(Duration::from_millis(10));
}
// If still not finished, return without consuming the handle.
if !self
.thread
.as_ref()
.map(|t| t.is_finished())
.unwrap_or(true)
{
return Ok(());
}
}
// Consume the JoinHandle.
let handle = self.thread.take().expect("checked above: thread is Some");
match handle.join() {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_) => Err(SondaError::Config("scenario thread panicked".to_string())),
}
}
/// Elapsed time since the scenario started.
///
/// This is a real-time measurement based on the wall clock recorded at
/// launch. It continues to grow even after the scenario stops.
pub fn elapsed(&self) -> Duration {
self.started_at.elapsed()
}
/// Read the latest stats snapshot.
///
/// Acquires the read lock briefly, clones the stats, and returns. Does not
/// block writers for longer than the clone operation.
pub fn stats_snapshot(&self) -> ScenarioStats {
self.stats
.read()
.expect("ScenarioStats RwLock poisoned")
.clone()
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use super::*;
use crate::schedule::stats::ScenarioStats;
use crate::SondaError;
// ---- Helper: build a ScenarioHandle backed by a trivial thread ----------
/// Build a `ScenarioHandle` whose thread counts to a limit, then exits.
///
/// The thread increments `total_events` on the shared stats arc each
/// iteration so tests can observe live stat updates without involving
/// the full runner pipeline.
fn make_handle(id: &str, name: &str, events: u64, interval: Duration) -> ScenarioHandle {
let shutdown = Arc::new(AtomicBool::new(true));
let stats = Arc::new(RwLock::new(ScenarioStats::default()));
let shutdown_thread = Arc::clone(&shutdown);
let stats_thread = Arc::clone(&stats);
let thread = thread::Builder::new()
.name(format!("test-{name}"))
.spawn(move || -> Result<(), SondaError> {
for _ in 0..events {
if !shutdown_thread.load(Ordering::SeqCst) {
break;
}
thread::sleep(interval);
if let Ok(mut st) = stats_thread.write() {
st.total_events += 1;
st.bytes_emitted += 64;
}
}
Ok(())
})
.expect("thread must spawn");
ScenarioHandle {
id: id.to_string(),
name: name.to_string(),
shutdown,
thread: Some(thread),
started_at: Instant::now(),
stats,
target_rate: 100.0,
}
}
// ---- is_running: true before stop, false after join ---------------------
/// A freshly spawned handle must report is_running() == true.
#[test]
fn is_running_returns_true_for_live_thread() {
let mut handle = make_handle("test-1", "live", 50, Duration::from_millis(10));
assert!(
handle.is_running(),
"is_running must return true for a live thread"
);
// Clean up.
handle.stop();
handle.join(Some(Duration::from_secs(2))).unwrap();
}
/// After stop() + join(), is_running() must return false.
#[test]
fn is_running_returns_false_after_stop_and_join() {
let mut handle = make_handle("test-2", "stopped", 1000, Duration::from_millis(5));
handle.stop();
handle
.join(Some(Duration::from_secs(2)))
.expect("join must succeed");
assert!(
!handle.is_running(),
"is_running must return false after the thread has been joined"
);
}
/// A handle whose JoinHandle has been consumed (None) returns false.
#[test]
fn is_running_returns_false_when_thread_is_none() {
let mut handle = make_handle("test-3", "none", 1, Duration::from_millis(1));
// Allow thread to finish naturally.
thread::sleep(Duration::from_millis(50));
// Consume the JoinHandle directly to mimic a post-join state.
handle.thread = None;
assert!(
!handle.is_running(),
"is_running must return false when thread is None"
);
}
// ---- stop(): sets the shutdown flag to false ----------------------------
/// stop() must store false in the shared AtomicBool.
#[test]
fn stop_sets_shutdown_flag_to_false() {
let handle = make_handle("test-4", "stop_flag", 1000, Duration::from_millis(10));
assert!(
handle.shutdown.load(Ordering::SeqCst),
"shutdown flag must be true before stop"
);
handle.stop();
assert!(
!handle.shutdown.load(Ordering::SeqCst),
"shutdown flag must be false after stop"
);
// Clean up the thread without consuming the handle.
drop(handle);
}
// ---- join(): blocks until thread exits and returns Ok -------------------
/// join() with None timeout blocks until the thread finishes and returns Ok.
#[test]
fn join_none_timeout_waits_for_thread_and_returns_ok() {
let mut handle = make_handle("test-5", "join_none", 3, Duration::from_millis(10));
let result = handle.join(None);
assert!(
result.is_ok(),
"join must return Ok when the thread succeeds: {result:?}"
);
}
/// join() is idempotent: calling it a second time returns Ok immediately.
#[test]
fn join_is_idempotent_after_first_call() {
let mut handle = make_handle("test-6", "idempotent", 1, Duration::from_millis(1));
handle.join(None).expect("first join must succeed");
let result = handle.join(None);
assert!(
result.is_ok(),
"second join on an already-joined handle must return Ok: {result:?}"
);
}
/// join() with a timeout that expires while thread is still running returns
/// Ok without consuming the thread (handle still owns it).
#[test]
fn join_with_short_timeout_returns_ok_without_consuming_thread() {
// Thread runs for 10 events × 50ms each = 500ms total.
let mut handle = make_handle("test-7", "timeout", 10, Duration::from_millis(50));
// Join with a very short timeout — thread will still be running.
let result = handle.join(Some(Duration::from_millis(10)));
assert!(
result.is_ok(),
"join with expired timeout must still return Ok"
);
// The JoinHandle must NOT have been consumed — is_running may still be true.
assert!(
handle.thread.is_some(),
"thread must not be consumed when timeout expired before thread finished"
);
// Clean up.
handle.stop();
handle.join(None).ok();
}
// ---- elapsed(): grows over time -----------------------------------------
/// elapsed() must return a positive Duration immediately after creation.
#[test]
fn elapsed_returns_positive_duration() {
let handle = make_handle("test-8", "elapsed", 1, Duration::from_millis(1));
let d = handle.elapsed();
// Even with scheduler jitter this should be at least 0 ns.
assert!(d >= Duration::ZERO, "elapsed must be non-negative: {d:?}");
}
/// elapsed() measured after a sleep must be greater than that sleep duration.
#[test]
fn elapsed_grows_monotonically_after_sleep() {
let mut handle = make_handle("test-9", "monotonic", 100, Duration::from_millis(5));
let before = handle.elapsed();
thread::sleep(Duration::from_millis(100));
let after = handle.elapsed();
assert!(
after > before,
"elapsed must grow over time: before={before:?}, after={after:?}"
);
handle.stop();
handle.join(None).ok();
}
// ---- stats_snapshot(): returns the current stats atomically -------------
/// stats_snapshot() on a fresh handle returns all-zero stats.
#[test]
fn stats_snapshot_on_fresh_handle_returns_zeros() {
let handle = make_handle("test-10", "fresh_stats", 0, Duration::ZERO);
let snap = handle.stats_snapshot();
assert_eq!(snap.total_events, 0);
assert_eq!(snap.bytes_emitted, 0);
assert_eq!(snap.errors, 0);
}
/// After the worker thread emits events, stats_snapshot() reflects the updates.
#[test]
fn stats_snapshot_returns_nonzero_total_events_after_running() {
// 5 events × 10ms each = ~50ms of work.
let mut handle = make_handle("test-11", "nonzero_stats", 5, Duration::from_millis(10));
// Wait long enough for all 5 events to fire.
thread::sleep(Duration::from_millis(200));
let snap = handle.stats_snapshot();
assert!(
snap.total_events > 0,
"stats_snapshot must reflect emitted events, got total_events={}",
snap.total_events
);
assert!(
snap.bytes_emitted > 0,
"stats_snapshot must reflect bytes_emitted > 0, got {}",
snap.bytes_emitted
);
handle.join(None).ok();
}
// ---- Contract: ScenarioHandle is Send -----------------------------------
/// ScenarioHandle must be Send so it can be stored in server state and
/// moved across tokio await points.
#[test]
fn scenario_handle_is_send() {
fn assert_send<T: Send>() {}
assert_send::<ScenarioHandle>();
}
}