use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Sender, SyncSender};
use std::thread::{JoinHandle, sleep};
use std::time::Duration;
use crate::{Camera, DEFAULT_FRAME_TIMEOUT, Error, Frame, next_frame};
const PAUSED_POLL_INTERVAL: Duration = Duration::from_millis(20);
const COMMAND_QUEUE_CAPACITY: usize = 16;
pub struct Pump {
pub(crate) worker: Option<JoinHandle<()>>,
pub(crate) shutdown: Arc<AtomicBool>,
pub(crate) active: Arc<AtomicBool>,
pub(crate) commands: SyncSender<PumpCommand>,
}
impl Drop for Pump {
fn drop(&mut self) {
if self.worker.is_some() {
self.shutdown.store(true, Ordering::Relaxed);
}
}
}
pub(crate) enum PumpCommand {
Capture { reply: Sender<Option<Frame>> },
}
pub fn spawn<F>(camera: Camera, mut on_frame: F) -> Pump
where
F: FnMut(Frame) + Send + 'static,
{
let shutdown = Arc::new(AtomicBool::new(false));
let active = Arc::new(AtomicBool::new(true));
let (command_tx, command_rx) = mpsc::sync_channel::<PumpCommand>(COMMAND_QUEUE_CAPACITY);
let shutdown_for_worker = Arc::clone(&shutdown);
let active_for_worker = Arc::clone(&active);
let worker = std::thread::Builder::new()
.name("cameras-pump".into())
.spawn(move || {
let camera = camera;
loop {
if shutdown_for_worker.load(Ordering::Relaxed) {
break;
}
let mut handled_command = false;
while let Ok(command) = command_rx.try_recv() {
match command {
PumpCommand::Capture { reply } => {
let frame = match next_frame(&camera, DEFAULT_FRAME_TIMEOUT) {
Ok(frame) => {
on_frame(frame.clone());
Some(frame)
}
Err(_) => None,
};
let _ = reply.send(frame);
}
}
handled_command = true;
}
if handled_command {
continue;
}
if !active_for_worker.load(Ordering::Relaxed) {
sleep(PAUSED_POLL_INTERVAL);
continue;
}
match next_frame(&camera, DEFAULT_FRAME_TIMEOUT) {
Ok(frame) => on_frame(frame),
Err(Error::Timeout) => continue,
Err(_) => break,
}
}
})
.expect("failed to spawn cameras pump thread");
Pump {
worker: Some(worker),
shutdown,
active,
commands: command_tx,
}
}
pub fn set_active(pump: &Pump, active: bool) {
pump.active.store(active, Ordering::Relaxed);
}
pub fn capture_frame(pump: &Pump) -> Option<Frame> {
let (reply_tx, reply_rx) = mpsc::channel();
pump.commands
.try_send(PumpCommand::Capture { reply: reply_tx })
.ok()?;
reply_rx.recv().ok().flatten()
}
pub fn stop_and_join(mut pump: Pump) {
pump.shutdown.store(true, Ordering::Relaxed);
if let Some(worker) = pump.worker.take() {
let _ = worker.join();
}
}