Skip to main content

arcly_stream/engine/
driver.rs

1//! The run loop — `arcly-http`'s `App::launch` analogue for protocol handlers.
2
3use super::Engine;
4use crate::inbound::{InboundProtocol, IngestContext};
5use crate::traits::MediaSource;
6use crate::{PublishRegistry, Result, StreamKey};
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10
11impl Engine {
12    /// Drive the engine: run every protocol handler concurrently until
13    /// `shutdown` fires, then let them observe the cancellation and wind down.
14    ///
15    /// **Coordinated teardown:** when *any* handler returns — cleanly, with an
16    /// error, or by panicking — the shared `shutdown` token is cancelled so the
17    /// remaining handlers wind down too. A single listener dying never leaves
18    /// its siblings orphaned.
19    ///
20    /// Returns the first fatal handler error, if any; otherwise `Ok(())` once
21    /// all handlers have returned.
22    ///
23    /// Accepts any [`InboundProtocol`] worker — the bundled `RtmpHandler`, a
24    /// legacy [`ProtocolHandler`](crate::ProtocolHandler) (bridged automatically),
25    /// or your own custom protocol. Prefer
26    /// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) +
27    /// [`serve_registered`](Self::serve_registered) for the builder-driven path.
28    ///
29    /// ```no_run
30    /// # use arcly_stream::prelude::*;
31    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn InboundProtocol>) -> arcly_stream::Result<()> {
32    /// engine.serve(vec![h], CancellationToken::new()).await
33    /// # }
34    /// ```
35    pub async fn serve(
36        self: &Arc<Self>,
37        handlers: Vec<Box<dyn InboundProtocol>>,
38        shutdown: CancellationToken,
39    ) -> Result<()> {
40        // Every worker reaches the bus through the same ergonomic context.
41        let ctx = IngestContext::new(self.clone());
42        let mut tasks = Vec::with_capacity(handlers.len());
43
44        for handler in handlers {
45            let ctx = ctx.clone();
46            let shutdown = shutdown.clone();
47            let name = handler.name();
48            info!(protocol = name, "Protocol handler starting");
49            tasks.push(tokio::spawn(async move {
50                // Cancel the shared token however this task leaves — so a peer's
51                // exit (or panic, via the drop guard) winds the whole set down.
52                let _guard = shutdown.clone().drop_guard();
53                let result = handler.serve(ctx, shutdown).await;
54                if let Err(e) = &result {
55                    error!(protocol = name, error = %e, "Protocol handler exited with error");
56                }
57                (name, result)
58            }));
59        }
60
61        let mut first_err = None;
62        for task in tasks {
63            match task.await {
64                Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
65                Ok((_, Err(e))) => {
66                    if first_err.is_none() {
67                        first_err = Some(e);
68                    }
69                }
70                Err(join_err) => {
71                    error!(error = %join_err, "Protocol handler task panicked");
72                }
73            }
74        }
75
76        match first_err {
77            Some(e) => Err(e),
78            None => Ok(()),
79        }
80    }
81
82    /// Like [`serve`](Self::serve), but owns the shutdown trigger: it runs until
83    /// `Ctrl-C` (SIGINT) or, on Unix, `SIGTERM` — the two signals an
84    /// orchestrator (systemd, Kubernetes) uses to stop a process — then cancels
85    /// the handlers and waits for them to drain.
86    ///
87    /// This is the batteries-included entry point for a binary; use
88    /// [`serve`](Self::serve) when the host already owns a cancellation token
89    /// (e.g. to compose the engine into a larger shutdown graph).
90    ///
91    /// ```no_run
92    /// # use arcly_stream::prelude::*;
93    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn InboundProtocol>) -> arcly_stream::Result<()> {
94    /// engine.serve_until_signal(vec![h]).await
95    /// # }
96    /// ```
97    pub async fn serve_until_signal(
98        self: &Arc<Self>,
99        handlers: Vec<Box<dyn InboundProtocol>>,
100    ) -> Result<()> {
101        let shutdown = CancellationToken::new();
102        let signal_token = shutdown.clone();
103        tokio::spawn(async move {
104            wait_for_shutdown_signal().await;
105            info!("Shutdown signal received; cancelling protocol handlers");
106            signal_token.cancel();
107        });
108        self.serve(handlers, shutdown).await
109    }
110
111    /// Run the protocol workers registered on the [`EngineBuilder`](crate::EngineBuilder) via
112    /// [`protocol`](crate::EngineBuilder::protocol), until `shutdown` fires.
113    ///
114    /// This is the builder-driven counterpart to [`serve`](Self::serve): register
115    /// workers fluently, keep the `Arc<Engine>` for your own use (subscribing,
116    /// packaging), then drive them. The registered workers are consumed on the
117    /// first call; a second call serves an empty set.
118    ///
119    /// ```no_run
120    /// # use arcly_stream::prelude::*;
121    /// # use arcly_stream::protocol::rtmp::RtmpHandler;
122    /// # async fn run() -> arcly_stream::Result<()> {
123    /// let engine = Engine::builder()
124    ///     .application(AppSpec::new("live"))
125    ///     .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap()))
126    ///     .build();
127    /// engine.serve_registered(CancellationToken::new()).await
128    /// # }
129    /// ```
130    pub async fn serve_registered(self: &Arc<Self>, shutdown: CancellationToken) -> Result<()> {
131        let handlers = {
132            let mut guard = self
133                .pending_protocols
134                .lock()
135                .unwrap_or_else(|p| p.into_inner());
136            std::mem::take(&mut *guard)
137        };
138        self.serve(handlers, shutdown).await
139    }
140
141    /// Like [`serve_registered`](Self::serve_registered) but owns the shutdown
142    /// trigger, running the builder-registered workers until `Ctrl-C`/`SIGTERM`.
143    pub async fn serve_registered_until_signal(self: &Arc<Self>) -> Result<()> {
144        let handlers = {
145            let mut guard = self
146                .pending_protocols
147                .lock()
148                .unwrap_or_else(|p| p.into_inner());
149            std::mem::take(&mut *guard)
150        };
151        self.serve_until_signal(handlers).await
152    }
153
154    /// Drive a [`MediaSource`] into the bus: claim `key`, pump every frame the
155    /// source yields, and end the publish session when the source is exhausted,
156    /// errors, or `shutdown` fires.
157    ///
158    /// This is the first-class runner for the [`MediaSource`] trait — the
159    /// in-process counterpart to a socket-driven
160    /// [`ProtocolHandler`](crate::ProtocolHandler). Useful for
161    /// file/loopback ingest, test fixtures, and generated streams.
162    pub async fn pump_source<S: MediaSource>(
163        self: &Arc<Self>,
164        key: &StreamKey,
165        mut source: S,
166        shutdown: CancellationToken,
167    ) -> Result<()> {
168        let handle = self.start_publish(key).await?;
169        let result = loop {
170            tokio::select! {
171                _ = shutdown.cancelled() => break Ok(()),
172                next = source.next_frame() => match next {
173                    Ok(Some(frame)) => {
174                        let _ = handle.publish_frame(frame);
175                    }
176                    Ok(None) => break Ok(()),
177                    Err(e) => {
178                        warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
179                        break Err(e);
180                    }
181                },
182            }
183        };
184        // Always release the publish slot, even on a source error.
185        self.end_publish(key).await?;
186        result
187    }
188
189    /// Reap every publishing stream that has not received a frame within the
190    /// configured [`idle_timeout`](super::EngineConfig::idle_timeout). Returns
191    /// the number of streams reaped. A no-op when no timeout is configured.
192    pub async fn reap_idle(self: &Arc<Self>) -> usize {
193        let Some(timeout) = self.config.idle_timeout else {
194            return 0;
195        };
196        let timeout_ms = timeout.as_millis() as u64;
197        // Monotonic — matches `StreamHandle::last_frame_ms` and is immune to wall
198        // clock steps that could otherwise reap (or fail to reap) a live stream.
199        let now = crate::bus::mono_ms();
200        let mut reaped = 0;
201
202        // Collect victims first; don't mutate the maps while iterating them.
203        let mut victims = Vec::new();
204        for app in self.apps.iter() {
205            for handle in app.value().active_handles() {
206                if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
207                    victims.push(handle.key().clone());
208                }
209            }
210        }
211
212        for key in victims {
213            self.observer.on_stream_reaped(&key);
214            info!(stream = %key, "Reaping idle stream");
215            if self.end_publish(&key).await.is_ok() {
216                reaped += 1;
217            }
218        }
219        reaped
220    }
221
222    /// Spawn a background task that calls [`reap_idle`](Self::reap_idle) every
223    /// `interval` until `shutdown` fires. No-op (returns immediately) when no
224    /// idle timeout is configured.
225    pub fn spawn_idle_reaper(
226        self: &Arc<Self>,
227        interval: std::time::Duration,
228        shutdown: CancellationToken,
229    ) {
230        if self.config.idle_timeout.is_none() {
231            return;
232        }
233        let engine = Arc::clone(self);
234        tokio::spawn(async move {
235            let mut tick = tokio::time::interval(interval);
236            loop {
237                tokio::select! {
238                    _ = shutdown.cancelled() => break,
239                    _ = tick.tick() => {
240                        let n = engine.reap_idle().await;
241                        if n > 0 {
242                            info!(reaped = n, "Idle reaper swept streams");
243                        }
244                    }
245                }
246            }
247        });
248    }
249}
250
251/// Resolves on the first OS shutdown signal: SIGINT everywhere, plus SIGTERM on
252/// Unix. A failure to install a handler is logged and that signal is simply
253/// never observed, rather than aborting the process.
254async fn wait_for_shutdown_signal() {
255    let ctrl_c = async {
256        if let Err(e) = tokio::signal::ctrl_c().await {
257            error!(error = %e, "failed to listen for Ctrl-C");
258            // Park forever so this branch never wins the select.
259            std::future::pending::<()>().await;
260        }
261    };
262
263    #[cfg(unix)]
264    let terminate = async {
265        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
266            Ok(mut sig) => {
267                sig.recv().await;
268            }
269            Err(e) => {
270                error!(error = %e, "failed to install SIGTERM handler");
271                std::future::pending::<()>().await;
272            }
273        }
274    };
275
276    #[cfg(not(unix))]
277    let terminate = std::future::pending::<()>();
278
279    tokio::select! {
280        _ = ctrl_c => {}
281        _ = terminate => {}
282    }
283}