Skip to main content

dioxus_cameras/
camera_stream.rs

1//! High-level Dioxus hook that drives a single preview stream: opens a
2//! camera, runs a [`cameras::pump::Pump`], and exposes the lifecycle as
3//! signals.
4
5use std::cell::RefCell;
6use std::fmt;
7use std::rc::Rc;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11
12use cameras::pump::{self, Pump};
13use cameras::{CameraSource, Frame, StreamConfig, open_source, source_label};
14use dioxus::prelude::*;
15
16use crate::channel::Channel;
17use crate::registry::{Registry, get_or_create_sink, publish_frame, remove_sink};
18
19const POLL_INTERVAL: Duration = Duration::from_millis(50);
20
21/// The lifecycle state surfaced by [`use_camera_stream`].
22///
23/// `label` is a human-readable identifier for the source (device name or RTSP
24/// URL). Implements [`fmt::Display`] so you can format it directly into a
25/// string: `format!("{status}")`.
26#[derive(Clone, Debug)]
27pub enum StreamStatus {
28    /// No source is set.
29    Idle,
30    /// A source is set; [`cameras::open_source`] is running on a background
31    /// thread.
32    Connecting {
33        /// Human-readable label of the source being opened.
34        label: String,
35    },
36    /// The camera is open and the pump is active (or paused but ready).
37    Streaming {
38        /// Human-readable label of the active source.
39        label: String,
40    },
41    /// The last connect attempt failed.
42    ///
43    /// Match on `error` (a typed [`cameras::Error`]) if you need to branch on
44    /// the failure kind, for example, distinguishing
45    /// [`Error::PermissionDenied`](cameras::Error::PermissionDenied) from
46    /// [`Error::DeviceInUse`](cameras::Error::DeviceInUse).
47    Failed {
48        /// The error returned by [`cameras::open_source`].
49        error: cameras::Error,
50    },
51}
52
53impl PartialEq for StreamStatus {
54    fn eq(&self, other: &Self) -> bool {
55        match (self, other) {
56            (StreamStatus::Idle, StreamStatus::Idle) => true,
57            (StreamStatus::Connecting { label: a }, StreamStatus::Connecting { label: b }) => {
58                a == b
59            }
60            (StreamStatus::Streaming { label: a }, StreamStatus::Streaming { label: b }) => a == b,
61            (StreamStatus::Failed { error: a }, StreamStatus::Failed { error: b }) => {
62                a.to_string() == b.to_string()
63            }
64            _ => false,
65        }
66    }
67}
68
69impl fmt::Display for StreamStatus {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        match self {
72            StreamStatus::Idle => f.write_str("Idle"),
73            StreamStatus::Connecting { label } => write!(f, "Connecting to {label}..."),
74            StreamStatus::Streaming { label } => write!(f, "Streaming: {label}"),
75            StreamStatus::Failed { error } => write!(f, "Open failed: {error}"),
76        }
77    }
78}
79
80/// Handle returned by [`use_camera_stream`].
81///
82/// All fields are public signals / callbacks, data-oriented, no methods,
83/// no hidden state. Read `status`, write `active`, call `capture_frame`
84/// directly.
85#[derive(Copy, Clone, PartialEq)]
86pub struct UseCameraStream {
87    /// Lifecycle of the stream. See [`StreamStatus`].
88    pub status: Signal<StreamStatus>,
89    /// Whether the pump actively streams frames to the preview.
90    ///
91    /// Default `true`. Setting to `false` parks the pump, no more
92    /// `cameras::next_frame` calls, no per-frame Rust work, but keeps the
93    /// camera handle open so `capture_frame` remains fast. Toggle based on
94    /// whether the preview is currently visible to the user.
95    pub active: Signal<bool>,
96    /// Grab a single fresh frame on demand.
97    ///
98    /// Works regardless of `active`. Returns `None` if the stream is not
99    /// connected or the camera errored.
100    ///
101    /// # UI-thread blocking
102    ///
103    /// This callback **blocks the calling thread** until the pump worker
104    /// replies, typically one frame interval (~16-33ms at 30-60fps), plus
105    /// up to 20ms of wake latency if the pump is paused. When called from an
106    /// `onclick` (which runs on the UI thread), expect a brief UI stall.
107    ///
108    /// For single photo-button captures this is imperceptible. For rapid
109    /// back-to-back captures, dispatch the call from a
110    /// [`spawn_blocking`](std::thread::spawn)'d worker or Dioxus
111    /// [`spawn`](dioxus::prelude::spawn) task so the UI thread stays
112    /// responsive.
113    pub capture_frame: Callback<(), Option<Frame>>,
114}
115
116/// Hook that drives a single preview stream end-to-end.
117///
118/// Given a stream `id`, a `source` signal, and a [`StreamConfig`], the hook:
119///
120/// 1. Watches `source` for changes.
121/// 2. Opens the camera on a background thread (so the UI does not block).
122/// 3. Starts a [`cameras::pump::Pump`] that publishes frames to the
123///    [`Registry`] slot for `id`.
124/// 4. Reports progress through [`UseCameraStream::status`].
125/// 5. Tears down the previous camera automatically when `source` changes.
126/// 6. Cleans up the [`Registry`] entry and pump when the component unmounts:
127///    the consumer never has to call [`remove_sink`] manually.
128///
129/// Pair with a [`StreamPreview`](crate::StreamPreview) element bound to the
130/// same `id` to render the frames on-screen. Use
131/// [`UseCameraStream::capture_frame`] for on-demand snapshots, and
132/// [`UseCameraStream::active`] to pause the pump when the preview is hidden.
133///
134/// # Reconnect semantics
135///
136/// **Every write to `source` triggers a reconnect**, even if the new value
137/// equals the current one. Dioxus signals notify subscribers unconditionally.
138/// To avoid redundant reconnects, gate the write yourself:
139/// [`CameraSource`] implements [`PartialEq`]:
140///
141/// ```ignore
142/// if source.peek().as_ref() != Some(&next) {
143///     source.set(Some(next));
144/// }
145/// ```
146///
147/// Rapid back-to-back source changes (A → B → A before any of them finishes
148/// connecting) will spawn one connect thread per change. Cameras doesn't
149/// expose cancellation, so each [`open_source`] call runs to completion; an
150/// internal generation counter then discards stale results so only the
151/// latest source wins. The orphaned threads consume CPU until they finish
152/// but can never corrupt state.
153///
154/// # Panics
155///
156/// Panics at render time if called outside an app wired up with
157/// [`register_with`](crate::register_with).
158pub fn use_camera_stream(
159    id: u32,
160    source: Signal<Option<CameraSource>>,
161    config: StreamConfig,
162) -> UseCameraStream {
163    let registry = try_consume_context::<Registry>().expect(
164        "`use_camera_stream` requires `register_with` to be called at launch, \
165         see the dioxus-cameras crate docs",
166    );
167
168    let mut status = use_signal(|| StreamStatus::Idle);
169    let active = use_signal(|| true);
170
171    let pump_cell = use_hook(|| Rc::new(RefCell::new(None::<Pump>)));
172    let channel = use_hook(Channel::<StreamEvent>::new);
173    let generation = use_hook(|| Arc::new(AtomicU64::new(0)));
174
175    {
176        let registry = registry.clone();
177        use_drop(move || remove_sink(&registry, id));
178    }
179
180    {
181        let pump_cell = Rc::clone(&pump_cell);
182        use_effect(move || {
183            let is_active = *active.read();
184            if let Some(pump_ref) = pump_cell.borrow().as_ref() {
185                pump::set_active(pump_ref, is_active);
186            }
187        });
188    }
189
190    {
191        let channel = channel.clone();
192        let generation = Arc::clone(&generation);
193        let pump_cell = Rc::clone(&pump_cell);
194        use_hook(move || {
195            spawn(async move {
196                loop {
197                    futures_timer::Delay::new(POLL_INTERVAL).await;
198                    let events = channel.drain();
199                    if events.is_empty() {
200                        continue;
201                    }
202                    let current = generation.load(Ordering::Relaxed);
203                    for event in events {
204                        if event.generation != current {
205                            continue;
206                        }
207                        match event.payload {
208                            StreamEventPayload::Connected {
209                                pump: new_pump,
210                                label,
211                            } => {
212                                pump::set_active(&new_pump, *active.peek());
213                                *pump_cell.borrow_mut() = Some(new_pump);
214                                status.set(StreamStatus::Streaming { label });
215                            }
216                            StreamEventPayload::Failed { error } => {
217                                status.set(StreamStatus::Failed { error });
218                            }
219                        }
220                    }
221                }
222            })
223        });
224    }
225
226    let capture_frame = {
227        let pump_cell = Rc::clone(&pump_cell);
228        use_callback(move |()| pump_cell.borrow().as_ref().and_then(pump::capture_frame))
229    };
230
231    {
232        let effect_tx = channel.sender.clone();
233        let effect_generation = Arc::clone(&generation);
234        let pump_cell = Rc::clone(&pump_cell);
235        let registry = registry.clone();
236        use_effect(move || {
237            let requested = source.read().clone();
238            let generation_value = effect_generation.fetch_add(1, Ordering::Relaxed) + 1;
239
240            *pump_cell.borrow_mut() = None;
241
242            let Some(requested) = requested else {
243                status.set(StreamStatus::Idle);
244                return;
245            };
246
247            let label = source_label(&requested);
248            status.set(StreamStatus::Connecting {
249                label: label.clone(),
250            });
251
252            let tx = effect_tx.clone();
253            let registry = registry.clone();
254            let _ = std::thread::Builder::new()
255                .name("cameras-connect".into())
256                .spawn(move || {
257                    let payload = match open_source(requested, config) {
258                        Ok(camera) => {
259                            let sink = get_or_create_sink(&registry, id);
260                            let pump =
261                                pump::spawn(camera, move |frame| publish_frame(&sink, frame));
262                            StreamEventPayload::Connected { pump, label }
263                        }
264                        Err(error) => StreamEventPayload::Failed { error },
265                    };
266                    let _ = tx.send(StreamEvent {
267                        generation: generation_value,
268                        payload,
269                    });
270                });
271        });
272    }
273
274    UseCameraStream {
275        status,
276        active,
277        capture_frame,
278    }
279}
280
281struct StreamEvent {
282    generation: u64,
283    payload: StreamEventPayload,
284}
285
286enum StreamEventPayload {
287    Connected { pump: Pump, label: String },
288    Failed { error: cameras::Error },
289}