arcly_stream/engine/driver.rs
1//! The run loop — `arcly-http`'s `App::launch` analogue for protocol handlers.
2
3use super::Engine;
4use crate::inbound::{InboundProtocol, IngestContext};
5use crate::traits::MediaSource;
6use crate::{PublishRegistry, Result, StreamKey};
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10
11impl Engine {
12 /// Drive the engine: run every protocol handler concurrently until
13 /// `shutdown` fires, then let them observe the cancellation and wind down.
14 ///
15 /// **Coordinated teardown:** when *any* handler returns — cleanly, with an
16 /// error, or by panicking — the shared `shutdown` token is cancelled so the
17 /// remaining handlers wind down too. A single listener dying never leaves
18 /// its siblings orphaned.
19 ///
20 /// Returns the first fatal handler error, if any; otherwise `Ok(())` once
21 /// all handlers have returned.
22 ///
23 /// Accepts any [`InboundProtocol`] worker — the bundled `RtmpHandler`, a
24 /// legacy [`ProtocolHandler`](crate::ProtocolHandler) (bridged automatically),
25 /// or your own custom protocol. Prefer
26 /// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) +
27 /// [`serve_registered`](Self::serve_registered) for the builder-driven path.
28 ///
29 /// ```no_run
30 /// # use arcly_stream::prelude::*;
31 /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn InboundProtocol>) -> arcly_stream::Result<()> {
32 /// engine.serve(vec![h], CancellationToken::new()).await
33 /// # }
34 /// ```
35 pub async fn serve(
36 self: &Arc<Self>,
37 handlers: Vec<Box<dyn InboundProtocol>>,
38 shutdown: CancellationToken,
39 ) -> Result<()> {
40 // Every worker reaches the bus through the same ergonomic context.
41 let ctx = IngestContext::new(self.clone());
42 let mut tasks = Vec::with_capacity(handlers.len());
43
44 for handler in handlers {
45 let ctx = ctx.clone();
46 let shutdown = shutdown.clone();
47 let name = handler.name();
48 info!(protocol = name, "Protocol handler starting");
49 tasks.push(tokio::spawn(async move {
50 // Cancel the shared token however this task leaves — so a peer's
51 // exit (or panic, via the drop guard) winds the whole set down.
52 let _guard = shutdown.clone().drop_guard();
53 let result = handler.serve(ctx, shutdown).await;
54 if let Err(e) = &result {
55 error!(protocol = name, error = %e, "Protocol handler exited with error");
56 }
57 (name, result)
58 }));
59 }
60
61 let mut first_err = None;
62 for task in tasks {
63 match task.await {
64 Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
65 Ok((_, Err(e))) => {
66 if first_err.is_none() {
67 first_err = Some(e);
68 }
69 }
70 Err(join_err) => {
71 error!(error = %join_err, "Protocol handler task panicked");
72 }
73 }
74 }
75
76 match first_err {
77 Some(e) => Err(e),
78 None => Ok(()),
79 }
80 }
81
82 /// Like [`serve`](Self::serve), but owns the shutdown trigger: it runs until
83 /// `Ctrl-C` (SIGINT) or, on Unix, `SIGTERM` — the two signals an
84 /// orchestrator (systemd, Kubernetes) uses to stop a process — then cancels
85 /// the handlers and waits for them to drain.
86 ///
87 /// This is the batteries-included entry point for a binary; use
88 /// [`serve`](Self::serve) when the host already owns a cancellation token
89 /// (e.g. to compose the engine into a larger shutdown graph).
90 ///
91 /// ```no_run
92 /// # use arcly_stream::prelude::*;
93 /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn InboundProtocol>) -> arcly_stream::Result<()> {
94 /// engine.serve_until_signal(vec![h]).await
95 /// # }
96 /// ```
97 pub async fn serve_until_signal(
98 self: &Arc<Self>,
99 handlers: Vec<Box<dyn InboundProtocol>>,
100 ) -> Result<()> {
101 let shutdown = CancellationToken::new();
102 let signal_token = shutdown.clone();
103 tokio::spawn(async move {
104 wait_for_shutdown_signal().await;
105 info!("Shutdown signal received; cancelling protocol handlers");
106 signal_token.cancel();
107 });
108 self.serve(handlers, shutdown).await
109 }
110
111 /// Run the protocol workers registered on the [`EngineBuilder`](crate::EngineBuilder) via
112 /// [`protocol`](crate::EngineBuilder::protocol), until `shutdown` fires.
113 ///
114 /// This is the builder-driven counterpart to [`serve`](Self::serve): register
115 /// workers fluently, keep the `Arc<Engine>` for your own use (subscribing,
116 /// packaging), then drive them. The registered workers are consumed on the
117 /// first call; a second call serves an empty set.
118 ///
119 /// ```no_run
120 /// # use arcly_stream::prelude::*;
121 /// # use arcly_stream::protocol::rtmp::RtmpHandler;
122 /// # async fn run() -> arcly_stream::Result<()> {
123 /// let engine = Engine::builder()
124 /// .application(AppSpec::new("live"))
125 /// .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap()))
126 /// .build();
127 /// engine.serve_registered(CancellationToken::new()).await
128 /// # }
129 /// ```
130 pub async fn serve_registered(self: &Arc<Self>, shutdown: CancellationToken) -> Result<()> {
131 let handlers = {
132 let mut guard = self
133 .pending_protocols
134 .lock()
135 .unwrap_or_else(|p| p.into_inner());
136 std::mem::take(&mut *guard)
137 };
138 self.serve(handlers, shutdown).await
139 }
140
141 /// Like [`serve_registered`](Self::serve_registered) but owns the shutdown
142 /// trigger, running the builder-registered workers until `Ctrl-C`/`SIGTERM`.
143 pub async fn serve_registered_until_signal(self: &Arc<Self>) -> Result<()> {
144 let handlers = {
145 let mut guard = self
146 .pending_protocols
147 .lock()
148 .unwrap_or_else(|p| p.into_inner());
149 std::mem::take(&mut *guard)
150 };
151 self.serve_until_signal(handlers).await
152 }
153
154 /// Drive a [`MediaSource`] into the bus: claim `key`, pump every frame the
155 /// source yields, and end the publish session when the source is exhausted,
156 /// errors, or `shutdown` fires.
157 ///
158 /// This is the first-class runner for the [`MediaSource`] trait — the
159 /// in-process counterpart to a socket-driven
160 /// [`ProtocolHandler`](crate::ProtocolHandler). Useful for
161 /// file/loopback ingest, test fixtures, and generated streams.
162 pub async fn pump_source<S: MediaSource>(
163 self: &Arc<Self>,
164 key: &StreamKey,
165 mut source: S,
166 shutdown: CancellationToken,
167 ) -> Result<()> {
168 let handle = self.start_publish(key).await?;
169 let result = loop {
170 tokio::select! {
171 _ = shutdown.cancelled() => break Ok(()),
172 next = source.next_frame() => match next {
173 Ok(Some(frame)) => {
174 let _ = handle.publish_frame(frame);
175 }
176 Ok(None) => break Ok(()),
177 Err(e) => {
178 warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
179 break Err(e);
180 }
181 },
182 }
183 };
184 // Always release the publish slot, even on a source error.
185 self.end_publish(key).await?;
186 result
187 }
188
189 /// Reap every publishing stream that has not received a frame within the
190 /// configured [`idle_timeout`](super::EngineConfig::idle_timeout). Returns
191 /// the number of streams reaped. A no-op when no timeout is configured.
192 pub async fn reap_idle(self: &Arc<Self>) -> usize {
193 let Some(timeout) = self.config.idle_timeout else {
194 return 0;
195 };
196 let timeout_ms = timeout.as_millis() as u64;
197 // Monotonic — matches `StreamHandle::last_frame_ms` and is immune to wall
198 // clock steps that could otherwise reap (or fail to reap) a live stream.
199 let now = crate::bus::mono_ms();
200 let mut reaped = 0;
201
202 // Collect victims first; don't mutate the maps while iterating them.
203 let mut victims = Vec::new();
204 for app in self.apps.iter() {
205 for handle in app.value().active_handles() {
206 if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
207 victims.push(handle.key().clone());
208 }
209 }
210 }
211
212 for key in victims {
213 self.observer.on_stream_reaped(&key);
214 info!(stream = %key, "Reaping idle stream");
215 if self.end_publish(&key).await.is_ok() {
216 reaped += 1;
217 }
218 }
219 reaped
220 }
221
222 /// Spawn a background task that calls [`reap_idle`](Self::reap_idle) every
223 /// `interval` until `shutdown` fires. No-op (returns immediately) when no
224 /// idle timeout is configured.
225 pub fn spawn_idle_reaper(
226 self: &Arc<Self>,
227 interval: std::time::Duration,
228 shutdown: CancellationToken,
229 ) {
230 if self.config.idle_timeout.is_none() {
231 return;
232 }
233 let engine = Arc::clone(self);
234 tokio::spawn(async move {
235 let mut tick = tokio::time::interval(interval);
236 loop {
237 tokio::select! {
238 _ = shutdown.cancelled() => break,
239 _ = tick.tick() => {
240 let n = engine.reap_idle().await;
241 if n > 0 {
242 info!(reaped = n, "Idle reaper swept streams");
243 }
244 }
245 }
246 }
247 });
248 }
249}
250
251/// Resolves on the first OS shutdown signal: SIGINT everywhere, plus SIGTERM on
252/// Unix. A failure to install a handler is logged and that signal is simply
253/// never observed, rather than aborting the process.
254async fn wait_for_shutdown_signal() {
255 let ctrl_c = async {
256 if let Err(e) = tokio::signal::ctrl_c().await {
257 error!(error = %e, "failed to listen for Ctrl-C");
258 // Park forever so this branch never wins the select.
259 std::future::pending::<()>().await;
260 }
261 };
262
263 #[cfg(unix)]
264 let terminate = async {
265 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
266 Ok(mut sig) => {
267 sig.recv().await;
268 }
269 Err(e) => {
270 error!(error = %e, "failed to install SIGTERM handler");
271 std::future::pending::<()>().await;
272 }
273 }
274 };
275
276 #[cfg(not(unix))]
277 let terminate = std::future::pending::<()>();
278
279 tokio::select! {
280 _ = ctrl_c => {}
281 _ = terminate => {}
282 }
283}