use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use chimeras::pump::{self, Pump};
use chimeras::{CameraSource, Frame, StreamConfig, open_source, source_label};
use dioxus::prelude::*;
use crate::channel::Channel;
use crate::registry::{Registry, get_or_create_sink, publish_frame, remove_sink};
const POLL_INTERVAL: Duration = Duration::from_millis(50);
#[derive(Clone, Debug)]
pub enum StreamStatus {
Idle,
Connecting {
label: String,
},
Streaming {
label: String,
},
Failed {
error: chimeras::Error,
},
}
impl PartialEq for StreamStatus {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(StreamStatus::Idle, StreamStatus::Idle) => true,
(StreamStatus::Connecting { label: a }, StreamStatus::Connecting { label: b }) => {
a == b
}
(StreamStatus::Streaming { label: a }, StreamStatus::Streaming { label: b }) => a == b,
(StreamStatus::Failed { error: a }, StreamStatus::Failed { error: b }) => {
a.to_string() == b.to_string()
}
_ => false,
}
}
}
impl fmt::Display for StreamStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StreamStatus::Idle => f.write_str("Idle"),
StreamStatus::Connecting { label } => write!(f, "Connecting to {label}..."),
StreamStatus::Streaming { label } => write!(f, "Streaming: {label}"),
StreamStatus::Failed { error } => write!(f, "Open failed: {error}"),
}
}
}
#[derive(Copy, Clone, PartialEq)]
pub struct UseCameraStream {
pub status: Signal<StreamStatus>,
pub active: Signal<bool>,
pub capture_frame: Callback<(), Option<Frame>>,
}
pub fn use_camera_stream(
id: u32,
source: Signal<Option<CameraSource>>,
config: StreamConfig,
) -> UseCameraStream {
let registry = try_consume_context::<Registry>().expect(
"`use_camera_stream` requires `register_with` to be called at launch, \
see the dioxus-chimeras crate docs",
);
let mut status = use_signal(|| StreamStatus::Idle);
let active = use_signal(|| true);
let pump_cell = use_hook(|| Rc::new(RefCell::new(None::<Pump>)));
let channel = use_hook(Channel::<StreamEvent>::new);
let generation = use_hook(|| Arc::new(AtomicU64::new(0)));
{
let registry = registry.clone();
use_drop(move || remove_sink(®istry, id));
}
{
let pump_cell = Rc::clone(&pump_cell);
use_effect(move || {
let is_active = *active.read();
if let Some(pump_ref) = pump_cell.borrow().as_ref() {
pump::set_active(pump_ref, is_active);
}
});
}
{
let channel = channel.clone();
let generation = Arc::clone(&generation);
let pump_cell = Rc::clone(&pump_cell);
use_hook(move || {
spawn(async move {
loop {
futures_timer::Delay::new(POLL_INTERVAL).await;
let events = channel.drain();
if events.is_empty() {
continue;
}
let current = generation.load(Ordering::Relaxed);
for event in events {
if event.generation != current {
continue;
}
match event.payload {
StreamEventPayload::Connected {
pump: new_pump,
label,
} => {
pump::set_active(&new_pump, *active.peek());
*pump_cell.borrow_mut() = Some(new_pump);
status.set(StreamStatus::Streaming { label });
}
StreamEventPayload::Failed { error } => {
status.set(StreamStatus::Failed { error });
}
}
}
}
})
});
}
let capture_frame = {
let pump_cell = Rc::clone(&pump_cell);
use_callback(move |()| pump_cell.borrow().as_ref().and_then(pump::capture_frame))
};
{
let effect_tx = channel.sender.clone();
let effect_generation = Arc::clone(&generation);
let pump_cell = Rc::clone(&pump_cell);
let registry = registry.clone();
use_effect(move || {
let requested = source.read().clone();
let generation_value = effect_generation.fetch_add(1, Ordering::Relaxed) + 1;
*pump_cell.borrow_mut() = None;
let Some(requested) = requested else {
status.set(StreamStatus::Idle);
return;
};
let label = source_label(&requested);
status.set(StreamStatus::Connecting {
label: label.clone(),
});
let tx = effect_tx.clone();
let registry = registry.clone();
let _ = std::thread::Builder::new()
.name("chimeras-connect".into())
.spawn(move || {
let payload = match open_source(requested, config) {
Ok(camera) => {
let sink = get_or_create_sink(®istry, id);
let pump =
pump::spawn(camera, move |frame| publish_frame(&sink, frame));
StreamEventPayload::Connected { pump, label }
}
Err(error) => StreamEventPayload::Failed { error },
};
let _ = tx.send(StreamEvent {
generation: generation_value,
payload,
});
});
});
}
UseCameraStream {
status,
active,
capture_frame,
}
}
struct StreamEvent {
generation: u64,
payload: StreamEventPayload,
}
enum StreamEventPayload {
Connected { pump: Pump, label: String },
Failed { error: chimeras::Error },
}