use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Sender, SyncSender};
use std::thread::{JoinHandle, sleep};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::source::{CameraSource, open_source};
use crate::types::StreamConfig;
use crate::{Camera, DEFAULT_FRAME_TIMEOUT, Error, Frame, next_frame};
const PAUSED_POLL_INTERVAL: Duration = Duration::from_millis(20);
const COMMAND_QUEUE_CAPACITY: usize = 16;
const MAX_BACKOFF_SHIFT: u32 = 20;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ReconnectPolicy {
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub max_attempts: Option<u32>,
pub jitter: f32,
pub stall_timeout: Duration,
}
impl Default for ReconnectPolicy {
fn default() -> Self {
Self {
initial_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(30),
max_attempts: None,
jitter: 0.2,
stall_timeout: Duration::from_secs(15),
}
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum PumpStatus {
Connecting,
Connected,
Reconnecting {
attempt: u32,
next_delay: Duration,
reason: String,
},
GaveUp {
reason: String,
},
}
pub struct Pump {
pub(crate) worker: Option<JoinHandle<()>>,
pub(crate) shutdown: Arc<AtomicBool>,
pub(crate) active: Arc<AtomicBool>,
pub(crate) commands: SyncSender<PumpCommand>,
}
impl Drop for Pump {
fn drop(&mut self) {
if self.worker.is_some() {
self.shutdown.store(true, Ordering::Relaxed);
}
}
}
pub(crate) enum PumpCommand {
Capture { reply: Sender<Option<Frame>> },
}
pub fn spawn<F>(camera: Camera, mut on_frame: F) -> Pump
where
F: FnMut(Frame) + Send + 'static,
{
let shutdown = Arc::new(AtomicBool::new(false));
let active = Arc::new(AtomicBool::new(true));
let (command_tx, command_rx) = mpsc::sync_channel::<PumpCommand>(COMMAND_QUEUE_CAPACITY);
let shutdown_for_worker = Arc::clone(&shutdown);
let active_for_worker = Arc::clone(&active);
let worker = std::thread::Builder::new()
.name("cameras-pump".into())
.spawn(move || {
let camera = camera;
let mut last_frame_at = Instant::now();
loop {
if shutdown_for_worker.load(Ordering::Relaxed) {
break;
}
if drain_command_queue(&command_rx, &camera, &mut on_frame, &mut last_frame_at) {
continue;
}
if !active_for_worker.load(Ordering::Relaxed) {
sleep(PAUSED_POLL_INTERVAL);
last_frame_at = Instant::now();
continue;
}
match next_frame(&camera, DEFAULT_FRAME_TIMEOUT) {
Ok(frame) => {
last_frame_at = Instant::now();
on_frame(frame);
}
Err(Error::Timeout) => continue,
Err(_) => break,
}
}
})
.expect("failed to spawn cameras pump thread");
Pump {
worker: Some(worker),
shutdown,
active,
commands: command_tx,
}
}
fn drain_command_queue<F>(
command_rx: &mpsc::Receiver<PumpCommand>,
camera: &Camera,
on_frame: &mut F,
last_frame_at: &mut Instant,
) -> bool
where
F: FnMut(Frame),
{
let mut handled = false;
while let Ok(command) = command_rx.try_recv() {
match command {
PumpCommand::Capture { reply } => {
let frame = match next_frame(camera, DEFAULT_FRAME_TIMEOUT) {
Ok(frame) => {
*last_frame_at = Instant::now();
on_frame(frame.clone());
Some(frame)
}
Err(_) => None,
};
let _ = reply.send(frame);
}
}
handled = true;
}
handled
}
pub fn set_active(pump: &Pump, active: bool) {
pump.active.store(active, Ordering::Relaxed);
}
pub fn capture_frame(pump: &Pump) -> Option<Frame> {
let (reply_tx, reply_rx) = mpsc::channel();
pump.commands
.try_send(PumpCommand::Capture { reply: reply_tx })
.ok()?;
reply_rx.recv().ok().flatten()
}
pub fn stop_and_join(mut pump: Pump) {
pump.shutdown.store(true, Ordering::Relaxed);
if let Some(worker) = pump.worker.take() {
let _ = worker.join();
}
}
pub fn spawn_with_policy<F>(
source: CameraSource,
config: StreamConfig,
mut on_frame: F,
policy: Option<ReconnectPolicy>,
status: Option<SyncSender<PumpStatus>>,
) -> Result<Pump, Error>
where
F: FnMut(Frame) + Send + 'static,
{
let reconnect_eligible = source_is_rtsp(&source) && policy.is_some();
let effective_policy = policy.unwrap_or_default();
emit_status(&status, PumpStatus::Connecting);
let initial_camera = open_source(source.clone(), config)?;
emit_status(&status, PumpStatus::Connected);
let shutdown = Arc::new(AtomicBool::new(false));
let active = Arc::new(AtomicBool::new(true));
let (command_tx, command_rx) = mpsc::sync_channel::<PumpCommand>(COMMAND_QUEUE_CAPACITY);
let shutdown_for_worker = Arc::clone(&shutdown);
let active_for_worker = Arc::clone(&active);
let worker = std::thread::Builder::new()
.name("cameras-pump".into())
.spawn(move || {
let mut camera = initial_camera;
let source = source;
let config = config;
let status = status;
let mut last_frame_at = Instant::now();
loop {
if shutdown_for_worker.load(Ordering::Relaxed) {
break;
}
if drain_command_queue(&command_rx, &camera, &mut on_frame, &mut last_frame_at) {
continue;
}
if !active_for_worker.load(Ordering::Relaxed) {
sleep(PAUSED_POLL_INTERVAL);
last_frame_at = Instant::now();
continue;
}
match next_frame(&camera, DEFAULT_FRAME_TIMEOUT) {
Ok(frame) => {
last_frame_at = Instant::now();
on_frame(frame);
}
Err(Error::Timeout) => {
if reconnect_eligible
&& last_frame_at.elapsed() > effective_policy.stall_timeout
{
match run_reconnect_loop(
ReconnectContext {
command_rx: &command_rx,
source: &source,
config,
policy: &effective_policy,
status: &status,
shutdown: &shutdown_for_worker,
active: &active_for_worker,
},
"stall_timeout_exceeded",
) {
ReconnectOutcome::Reconnected(new_camera) => {
camera = new_camera;
last_frame_at = Instant::now();
}
ReconnectOutcome::Shutdown | ReconnectOutcome::GaveUp => break,
}
}
}
Err(_) => {
if !reconnect_eligible {
break;
}
match run_reconnect_loop(
ReconnectContext {
command_rx: &command_rx,
source: &source,
config,
policy: &effective_policy,
status: &status,
shutdown: &shutdown_for_worker,
active: &active_for_worker,
},
"next_frame_error",
) {
ReconnectOutcome::Reconnected(new_camera) => {
camera = new_camera;
last_frame_at = Instant::now();
}
ReconnectOutcome::Shutdown | ReconnectOutcome::GaveUp => break,
}
}
}
}
})
.expect("failed to spawn cameras pump thread");
Ok(Pump {
worker: Some(worker),
shutdown,
active,
commands: command_tx,
})
}
enum ReconnectOutcome {
Reconnected(Camera),
GaveUp,
Shutdown,
}
struct ReconnectContext<'a> {
command_rx: &'a mpsc::Receiver<PumpCommand>,
source: &'a CameraSource,
config: StreamConfig,
policy: &'a ReconnectPolicy,
status: &'a Option<SyncSender<PumpStatus>>,
shutdown: &'a AtomicBool,
active: &'a AtomicBool,
}
fn run_reconnect_loop(context: ReconnectContext<'_>, initial_reason: &str) -> ReconnectOutcome {
let ReconnectContext {
command_rx,
source,
config,
policy,
status,
shutdown,
active,
} = context;
let mut attempt: u32 = 0;
let mut reason = initial_reason.to_string();
loop {
if shutdown.load(Ordering::Relaxed) {
reply_none_to_pending_commands(command_rx);
return ReconnectOutcome::Shutdown;
}
reply_none_to_pending_commands(command_rx);
if !active.load(Ordering::Relaxed) {
sleep(PAUSED_POLL_INTERVAL);
continue;
}
if let Some(max) = policy.max_attempts
&& attempt >= max
{
emit_status(
status,
PumpStatus::GaveUp {
reason: format!("max_attempts_reached:{reason}"),
},
);
return ReconnectOutcome::GaveUp;
}
let delay = compute_backoff(policy, attempt);
emit_status(
status,
PumpStatus::Reconnecting {
attempt,
next_delay: delay,
reason: reason.clone(),
},
);
sleep_responsive(delay, shutdown, command_rx);
if shutdown.load(Ordering::Relaxed) {
return ReconnectOutcome::Shutdown;
}
match open_source(source.clone(), config) {
Ok(new_camera) => {
emit_status(status, PumpStatus::Connected);
return ReconnectOutcome::Reconnected(new_camera);
}
Err(error) => {
reason = format!("open_failed:{error}");
attempt = attempt.saturating_add(1);
}
}
}
}
fn reply_none_to_pending_commands(command_rx: &mpsc::Receiver<PumpCommand>) {
while let Ok(command) = command_rx.try_recv() {
match command {
PumpCommand::Capture { reply } => {
let _ = reply.send(None);
}
}
}
}
fn sleep_responsive(
total: Duration,
shutdown: &AtomicBool,
command_rx: &mpsc::Receiver<PumpCommand>,
) {
let tick = Duration::from_millis(100);
let mut remaining = total;
while remaining > Duration::ZERO {
if shutdown.load(Ordering::Relaxed) {
return;
}
reply_none_to_pending_commands(command_rx);
let chunk = remaining.min(tick);
sleep(chunk);
remaining = remaining.saturating_sub(chunk);
}
}
fn compute_backoff(policy: &ReconnectPolicy, attempt: u32) -> Duration {
let base_nanos = policy.initial_backoff.as_nanos() as u64;
let factor = 1u64
.checked_shl(attempt.min(MAX_BACKOFF_SHIFT))
.unwrap_or(u64::MAX);
let scaled_nanos = base_nanos.saturating_mul(factor);
let max_nanos = policy.max_backoff.as_nanos() as u64;
let capped_nanos = scaled_nanos.min(max_nanos);
let jitter_magnitude = (capped_nanos as f32 * policy.jitter.abs()) as u64;
if jitter_magnitude == 0 {
return Duration::from_nanos(capped_nanos);
}
let seed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.subsec_nanos() as u64)
.unwrap_or(attempt as u64);
let offset = (seed % (jitter_magnitude * 2)) as i64 - jitter_magnitude as i64;
let final_nanos = (capped_nanos as i64 + offset).max(0) as u64;
Duration::from_nanos(final_nanos)
}
fn emit_status(status: &Option<SyncSender<PumpStatus>>, event: PumpStatus) {
if let Some(tx) = status {
let _ = tx.try_send(event);
}
}
fn source_is_rtsp(source: &CameraSource) -> bool {
#[cfg(all(feature = "rtsp", any(target_os = "macos", target_os = "windows")))]
{
matches!(source, CameraSource::Rtsp { .. })
}
#[cfg(not(all(feature = "rtsp", any(target_os = "macos", target_os = "windows"))))]
{
let _ = source;
false
}
}