dioxus_cameras/camera_stream.rs
1//! High-level Dioxus hook that drives a single preview stream: opens a
2//! camera, runs a [`cameras::pump::Pump`], and exposes the lifecycle as
3//! signals.
4
5use std::cell::RefCell;
6use std::fmt;
7use std::rc::Rc;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11
12use cameras::pump::{self, Pump};
13use cameras::{CameraSource, Frame, StreamConfig, open_source, source_label};
14use dioxus::prelude::*;
15
16use crate::channel::Channel;
17use crate::registry::{Registry, get_or_create_sink, publish_frame, remove_sink};
18
19const POLL_INTERVAL: Duration = Duration::from_millis(50);
20
21/// The lifecycle state surfaced by [`use_camera_stream`].
22///
23/// `label` is a human-readable identifier for the source (device name or RTSP
24/// URL). Implements [`fmt::Display`] so you can format it directly into a
25/// string: `format!("{status}")`.
26#[derive(Clone, Debug)]
27pub enum StreamStatus {
28 /// No source is set.
29 Idle,
30 /// A source is set; [`cameras::open_source`] is running on a background
31 /// thread.
32 Connecting {
33 /// Human-readable label of the source being opened.
34 label: String,
35 },
36 /// The camera is open and the pump is active (or paused but ready).
37 Streaming {
38 /// Human-readable label of the active source.
39 label: String,
40 },
41 /// The last connect attempt failed.
42 ///
43 /// Match on `error` (a typed [`cameras::Error`]) if you need to branch on
44 /// the failure kind, for example, distinguishing
45 /// [`Error::PermissionDenied`](cameras::Error::PermissionDenied) from
46 /// [`Error::DeviceInUse`](cameras::Error::DeviceInUse).
47 Failed {
48 /// The error returned by [`cameras::open_source`].
49 error: cameras::Error,
50 },
51}
52
53impl PartialEq for StreamStatus {
54 fn eq(&self, other: &Self) -> bool {
55 match (self, other) {
56 (StreamStatus::Idle, StreamStatus::Idle) => true,
57 (StreamStatus::Connecting { label: a }, StreamStatus::Connecting { label: b }) => {
58 a == b
59 }
60 (StreamStatus::Streaming { label: a }, StreamStatus::Streaming { label: b }) => a == b,
61 (StreamStatus::Failed { error: a }, StreamStatus::Failed { error: b }) => {
62 a.to_string() == b.to_string()
63 }
64 _ => false,
65 }
66 }
67}
68
69impl fmt::Display for StreamStatus {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 match self {
72 StreamStatus::Idle => f.write_str("Idle"),
73 StreamStatus::Connecting { label } => write!(f, "Connecting to {label}..."),
74 StreamStatus::Streaming { label } => write!(f, "Streaming: {label}"),
75 StreamStatus::Failed { error } => write!(f, "Open failed: {error}"),
76 }
77 }
78}
79
80/// Handle returned by [`use_camera_stream`].
81///
82/// All fields are public signals / callbacks, data-oriented, no methods,
83/// no hidden state. Read `status`, write `active`, call `capture_frame`
84/// directly.
85#[derive(Copy, Clone, PartialEq)]
86pub struct UseCameraStream {
87 /// Lifecycle of the stream. See [`StreamStatus`].
88 pub status: Signal<StreamStatus>,
89 /// Whether the pump actively streams frames to the preview.
90 ///
91 /// Default `true`. Setting to `false` parks the pump, no more
92 /// `cameras::next_frame` calls, no per-frame Rust work, but keeps the
93 /// camera handle open so `capture_frame` remains fast. Toggle based on
94 /// whether the preview is currently visible to the user.
95 pub active: Signal<bool>,
96 /// Grab a single fresh frame on demand.
97 ///
98 /// Works regardless of `active`. Returns `None` if the stream is not
99 /// connected or the camera errored.
100 ///
101 /// # UI-thread blocking
102 ///
103 /// This callback **blocks the calling thread** until the pump worker
104 /// replies, typically one frame interval (~16-33ms at 30-60fps), plus
105 /// up to 20ms of wake latency if the pump is paused. When called from an
106 /// `onclick` (which runs on the UI thread), expect a brief UI stall.
107 ///
108 /// For single photo-button captures this is imperceptible. For rapid
109 /// back-to-back captures, dispatch the call from a
110 /// [`spawn_blocking`](std::thread::spawn)'d worker or Dioxus
111 /// [`spawn`](dioxus::prelude::spawn) task so the UI thread stays
112 /// responsive.
113 pub capture_frame: Callback<(), Option<Frame>>,
114}
115
116/// Hook that drives a single preview stream end-to-end.
117///
118/// Given a stream `id`, a `source` signal, and a [`StreamConfig`], the hook:
119///
120/// 1. Watches `source` for changes.
121/// 2. Opens the camera on a background thread (so the UI does not block).
122/// 3. Starts a [`cameras::pump::Pump`] that publishes frames to the
123/// [`Registry`] slot for `id`.
124/// 4. Reports progress through [`UseCameraStream::status`].
125/// 5. Tears down the previous camera automatically when `source` changes.
126/// 6. Cleans up the [`Registry`] entry and pump when the component unmounts:
127/// the consumer never has to call [`remove_sink`] manually.
128///
129/// Pair with a [`StreamPreview`](crate::StreamPreview) element bound to the
130/// same `id` to render the frames on-screen. Use
131/// [`UseCameraStream::capture_frame`] for on-demand snapshots, and
132/// [`UseCameraStream::active`] to pause the pump when the preview is hidden.
133///
134/// # Reconnect semantics
135///
136/// **Every write to `source` triggers a reconnect**, even if the new value
137/// equals the current one. Dioxus signals notify subscribers unconditionally.
138/// To avoid redundant reconnects, gate the write yourself:
139/// [`CameraSource`] implements [`PartialEq`]:
140///
141/// ```ignore
142/// if source.peek().as_ref() != Some(&next) {
143/// source.set(Some(next));
144/// }
145/// ```
146///
147/// Rapid back-to-back source changes (A → B → A before any of them finishes
148/// connecting) will spawn one connect thread per change. Cameras doesn't
149/// expose cancellation, so each [`open_source`] call runs to completion; an
150/// internal generation counter then discards stale results so only the
151/// latest source wins. The orphaned threads consume CPU until they finish
152/// but can never corrupt state.
153///
154/// # Panics
155///
156/// Panics at render time if called outside an app wired up with
157/// [`register_with`](crate::register_with).
158pub fn use_camera_stream(
159 id: u32,
160 source: Signal<Option<CameraSource>>,
161 config: StreamConfig,
162) -> UseCameraStream {
163 let registry = try_consume_context::<Registry>().expect(
164 "`use_camera_stream` requires `register_with` to be called at launch, \
165 see the dioxus-cameras crate docs",
166 );
167
168 let mut status = use_signal(|| StreamStatus::Idle);
169 let active = use_signal(|| true);
170
171 let pump_cell = use_hook(|| Rc::new(RefCell::new(None::<Pump>)));
172 let channel = use_hook(Channel::<StreamEvent>::new);
173 let generation = use_hook(|| Arc::new(AtomicU64::new(0)));
174
175 {
176 let registry = registry.clone();
177 use_drop(move || remove_sink(®istry, id));
178 }
179
180 {
181 let pump_cell = Rc::clone(&pump_cell);
182 use_effect(move || {
183 let is_active = *active.read();
184 if let Some(pump_ref) = pump_cell.borrow().as_ref() {
185 pump::set_active(pump_ref, is_active);
186 }
187 });
188 }
189
190 {
191 let channel = channel.clone();
192 let generation = Arc::clone(&generation);
193 let pump_cell = Rc::clone(&pump_cell);
194 use_hook(move || {
195 spawn(async move {
196 loop {
197 futures_timer::Delay::new(POLL_INTERVAL).await;
198 let events = channel.drain();
199 if events.is_empty() {
200 continue;
201 }
202 let current = generation.load(Ordering::Relaxed);
203 for event in events {
204 if event.generation != current {
205 continue;
206 }
207 match event.payload {
208 StreamEventPayload::Connected {
209 pump: new_pump,
210 label,
211 } => {
212 pump::set_active(&new_pump, *active.peek());
213 *pump_cell.borrow_mut() = Some(new_pump);
214 status.set(StreamStatus::Streaming { label });
215 }
216 StreamEventPayload::Failed { error } => {
217 status.set(StreamStatus::Failed { error });
218 }
219 }
220 }
221 }
222 })
223 });
224 }
225
226 let capture_frame = {
227 let pump_cell = Rc::clone(&pump_cell);
228 use_callback(move |()| pump_cell.borrow().as_ref().and_then(pump::capture_frame))
229 };
230
231 {
232 let effect_tx = channel.sender.clone();
233 let effect_generation = Arc::clone(&generation);
234 let pump_cell = Rc::clone(&pump_cell);
235 let registry = registry.clone();
236 use_effect(move || {
237 let requested = source.read().clone();
238 let generation_value = effect_generation.fetch_add(1, Ordering::Relaxed) + 1;
239
240 *pump_cell.borrow_mut() = None;
241
242 let Some(requested) = requested else {
243 status.set(StreamStatus::Idle);
244 return;
245 };
246
247 let label = source_label(&requested);
248 status.set(StreamStatus::Connecting {
249 label: label.clone(),
250 });
251
252 let tx = effect_tx.clone();
253 let registry = registry.clone();
254 let _ = std::thread::Builder::new()
255 .name("cameras-connect".into())
256 .spawn(move || {
257 let payload = match open_source(requested, config) {
258 Ok(camera) => {
259 let sink = get_or_create_sink(®istry, id);
260 let pump =
261 pump::spawn(camera, move |frame| publish_frame(&sink, frame));
262 StreamEventPayload::Connected { pump, label }
263 }
264 Err(error) => StreamEventPayload::Failed { error },
265 };
266 let _ = tx.send(StreamEvent {
267 generation: generation_value,
268 payload,
269 });
270 });
271 });
272 }
273
274 UseCameraStream {
275 status,
276 active,
277 capture_frame,
278 }
279}
280
281struct StreamEvent {
282 generation: u64,
283 payload: StreamEventPayload,
284}
285
286enum StreamEventPayload {
287 Connected { pump: Pump, label: String },
288 Failed { error: cameras::Error },
289}