use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use cameras::Error;
use cameras::discover::{
self, DiscoverConfig, DiscoverEvent, DiscoveredCamera, Discovery, next_event,
};
use dioxus::prelude::*;
use crate::channel::Channel;
const POLL_INTERVAL: Duration = Duration::from_millis(50);
const WORKER_TICK: Duration = Duration::from_millis(100);
#[derive(Copy, Clone, PartialEq)]
pub struct UseDiscovery {
pub cameras: Signal<Vec<DiscoveredCamera>>,
pub unmatched_hosts: Signal<Vec<(IpAddr, String)>>,
pub scanned: Signal<usize>,
pub total: Signal<usize>,
pub running: Signal<bool>,
pub error: Signal<Option<Error>>,
pub start: Callback<DiscoverConfig>,
pub cancel: Callback<()>,
pub clear: Callback<()>,
}
pub fn use_discovery() -> UseDiscovery {
let mut cameras = use_signal(Vec::<DiscoveredCamera>::new);
let mut unmatched_hosts = use_signal(Vec::<(IpAddr, String)>::new);
let mut scanned = use_signal(|| 0usize);
let mut total = use_signal(|| 0usize);
let mut running = use_signal(|| false);
let mut error = use_signal(|| None::<Error>);
let channel = use_hook(Channel::<DiscoverEvent>::new);
let handle = use_hook(|| Arc::new(Mutex::new(None::<Discovery>)));
let scan_id = use_hook(|| Arc::new(AtomicU64::new(0)));
let poll_channel = channel.clone();
use_hook(move || {
spawn(async move {
loop {
futures_timer::Delay::new(POLL_INTERVAL).await;
for event in poll_channel.drain() {
match event {
DiscoverEvent::CameraFound(camera) => {
cameras.write().push(camera);
}
DiscoverEvent::HostUnmatched { host, server } => {
unmatched_hosts.write().push((host, server));
}
DiscoverEvent::Progress {
scanned: done,
total: totals,
} => {
scanned.set(done);
total.set(totals);
}
DiscoverEvent::Done => {
running.set(false);
}
DiscoverEvent::HostFound { .. } => {}
_ => {}
}
}
}
})
});
let start_tx = channel.sender.clone();
let start_handle = Arc::clone(&handle);
let start_scan_id = Arc::clone(&scan_id);
let start = use_callback(move |config: DiscoverConfig| {
let id = start_scan_id.fetch_add(1, Ordering::SeqCst) + 1;
{
let mut guard = start_handle.lock().unwrap_or_else(PoisonError::into_inner);
guard.take();
}
cameras.set(Vec::new());
unmatched_hosts.set(Vec::new());
scanned.set(0);
total.set(0);
error.set(None);
let discovery = match discover::discover(config) {
Ok(discovery) => discovery,
Err(start_error) => {
error.set(Some(start_error));
running.set(false);
return;
}
};
{
let mut guard = start_handle.lock().unwrap_or_else(PoisonError::into_inner);
*guard = Some(discovery);
}
running.set(true);
let tx = start_tx.clone();
let handle = Arc::clone(&start_handle);
let current = Arc::clone(&start_scan_id);
let spawn_result = std::thread::Builder::new()
.name("cameras-discover-hook".into())
.spawn(move || {
run_discovery(handle, tx, id, current);
});
if let Err(spawn_error) = spawn_result {
let mut guard = start_handle.lock().unwrap_or_else(PoisonError::into_inner);
guard.take();
error.set(Some(Error::Backend {
platform: "discover",
message: format!("worker thread spawn: {spawn_error}"),
}));
running.set(false);
}
});
let cancel_handle = Arc::clone(&handle);
let cancel_scan_id = Arc::clone(&scan_id);
let cancel = use_callback(move |()| {
cancel_scan_id.fetch_add(1, Ordering::SeqCst);
{
let mut guard = cancel_handle.lock().unwrap_or_else(PoisonError::into_inner);
guard.take();
}
running.set(false);
});
let clear_handle = Arc::clone(&handle);
let clear_scan_id = Arc::clone(&scan_id);
let clear = use_callback(move |()| {
clear_scan_id.fetch_add(1, Ordering::SeqCst);
{
let mut guard = clear_handle.lock().unwrap_or_else(PoisonError::into_inner);
guard.take();
}
cameras.set(Vec::new());
unmatched_hosts.set(Vec::new());
scanned.set(0);
total.set(0);
error.set(None);
running.set(false);
});
UseDiscovery {
cameras,
unmatched_hosts,
scanned,
total,
running,
error,
start,
cancel,
clear,
}
}
fn run_discovery(
handle: Arc<Mutex<Option<Discovery>>>,
tx: std::sync::mpsc::Sender<DiscoverEvent>,
id: u64,
current: Arc<AtomicU64>,
) {
loop {
if current.load(Ordering::SeqCst) != id {
return;
}
let event = {
let guard = handle.lock().unwrap_or_else(PoisonError::into_inner);
if current.load(Ordering::SeqCst) != id {
return;
}
let Some(disc) = guard.as_ref() else {
return;
};
next_event(disc, WORKER_TICK)
};
match event {
Ok(DiscoverEvent::Done) => {
if current.load(Ordering::SeqCst) == id {
let _ = tx.send(DiscoverEvent::Done);
let mut guard = handle.lock().unwrap_or_else(PoisonError::into_inner);
guard.take();
}
return;
}
Ok(event) => {
if current.load(Ordering::SeqCst) == id {
let _ = tx.send(event);
}
}
Err(cameras::Error::Timeout) => continue,
Err(_) => {
if current.load(Ordering::SeqCst) == id {
let _ = tx.send(DiscoverEvent::Done);
}
return;
}
}
}
}