Skip to main content

gpu_histop/
sampler.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant};
5
6use crossbeam_channel::{Receiver, unbounded};
7
8use crate::backend::GpuBackend;
9use crate::model::GpuSample;
10
11#[derive(Debug)]
12pub enum SamplerEvent {
13    Samples(Vec<GpuSample>),
14    Error(String),
15}
16
17pub struct SamplerHandle {
18    stop: Arc<AtomicBool>,
19    join: Option<JoinHandle<()>>,
20}
21
22impl SamplerHandle {
23    pub fn stop(mut self) {
24        self.stop.store(true, Ordering::Relaxed);
25        if let Some(join) = self.join.take() {
26            let _ = join.join();
27        }
28    }
29}
30
31impl Drop for SamplerHandle {
32    fn drop(&mut self) {
33        self.stop.store(true, Ordering::Relaxed);
34    }
35}
36
37pub fn spawn_sampler(
38    mut backend: Box<dyn GpuBackend>,
39    interval: Duration,
40) -> (Receiver<SamplerEvent>, SamplerHandle) {
41    let (tx, rx) = unbounded();
42    let stop = Arc::new(AtomicBool::new(false));
43    let thread_stop = Arc::clone(&stop);
44
45    let join = thread::Builder::new()
46        .name("gpu-histop-sampler".to_owned())
47        .spawn(move || {
48            while !thread_stop.load(Ordering::Relaxed) {
49                let started = Instant::now();
50                let event = match backend.sample() {
51                    Ok(samples) => SamplerEvent::Samples(samples),
52                    Err(error) => SamplerEvent::Error(error.to_string()),
53                };
54
55                if tx.send(event).is_err() {
56                    break;
57                }
58
59                sleep_until_next_tick(interval, started, &thread_stop);
60            }
61        })
62        .expect("failed to spawn sampler thread");
63
64    (
65        rx,
66        SamplerHandle {
67            stop,
68            join: Some(join),
69        },
70    )
71}
72
73fn sleep_until_next_tick(interval: Duration, started: Instant, stop: &AtomicBool) {
74    let elapsed = started.elapsed();
75    let mut remaining = interval.saturating_sub(elapsed);
76    while !remaining.is_zero() && !stop.load(Ordering::Relaxed) {
77        let nap = remaining.min(Duration::from_millis(20));
78        thread::sleep(nap);
79        remaining = remaining.saturating_sub(nap);
80    }
81}