1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant};
5
6use crossbeam_channel::{Receiver, unbounded};
7
8use crate::backend::GpuBackend;
9use crate::model::GpuSample;
10
11#[derive(Debug)]
12pub enum SamplerEvent {
13 Samples(Vec<GpuSample>),
14 Error(String),
15}
16
17pub struct SamplerHandle {
18 stop: Arc<AtomicBool>,
19 join: Option<JoinHandle<()>>,
20}
21
22impl SamplerHandle {
23 pub fn stop(mut self) {
24 self.stop.store(true, Ordering::Relaxed);
25 if let Some(join) = self.join.take() {
26 let _ = join.join();
27 }
28 }
29}
30
31impl Drop for SamplerHandle {
32 fn drop(&mut self) {
33 self.stop.store(true, Ordering::Relaxed);
34 }
35}
36
37pub fn spawn_sampler(
38 mut backend: Box<dyn GpuBackend>,
39 interval: Duration,
40) -> (Receiver<SamplerEvent>, SamplerHandle) {
41 let (tx, rx) = unbounded();
42 let stop = Arc::new(AtomicBool::new(false));
43 let thread_stop = Arc::clone(&stop);
44
45 let join = thread::Builder::new()
46 .name("gpu-histop-sampler".to_owned())
47 .spawn(move || {
48 while !thread_stop.load(Ordering::Relaxed) {
49 let started = Instant::now();
50 let event = match backend.sample() {
51 Ok(samples) => SamplerEvent::Samples(samples),
52 Err(error) => SamplerEvent::Error(error.to_string()),
53 };
54
55 if tx.send(event).is_err() {
56 break;
57 }
58
59 sleep_until_next_tick(interval, started, &thread_stop);
60 }
61 })
62 .expect("failed to spawn sampler thread");
63
64 (
65 rx,
66 SamplerHandle {
67 stop,
68 join: Some(join),
69 },
70 )
71}
72
73fn sleep_until_next_tick(interval: Duration, started: Instant, stop: &AtomicBool) {
74 let elapsed = started.elapsed();
75 let mut remaining = interval.saturating_sub(elapsed);
76 while !remaining.is_zero() && !stop.load(Ordering::Relaxed) {
77 let nap = remaining.min(Duration::from_millis(20));
78 thread::sleep(nap);
79 remaining = remaining.saturating_sub(nap);
80 }
81}