arcly_stream/engine/driver.rs
1//! The run loop — `arcly-http`'s `App::launch` analogue for protocol handlers.
2
3use super::Engine;
4use crate::traits::{MediaSource, ProtocolHandler};
5use crate::{PublishRegistry, Result, StreamKey};
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info, warn};
9
10impl Engine {
11 /// Drive the engine: run every protocol handler concurrently until
12 /// `shutdown` fires, then let them observe the cancellation and wind down.
13 ///
14 /// **Coordinated teardown:** when *any* handler returns — cleanly, with an
15 /// error, or by panicking — the shared `shutdown` token is cancelled so the
16 /// remaining handlers wind down too. A single listener dying never leaves
17 /// its siblings orphaned.
18 ///
19 /// Returns the first fatal handler error, if any; otherwise `Ok(())` once
20 /// all handlers have returned.
21 ///
22 /// ```no_run
23 /// # use arcly_stream::prelude::*;
24 /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn ProtocolHandler>) -> arcly_stream::Result<()> {
25 /// engine.serve(vec![h], CancellationToken::new()).await
26 /// # }
27 /// ```
28 pub async fn serve(
29 self: &Arc<Self>,
30 handlers: Vec<Box<dyn ProtocolHandler>>,
31 shutdown: CancellationToken,
32 ) -> Result<()> {
33 let registry: Arc<dyn crate::bus::PublishRegistry> = self.clone();
34 let mut tasks = Vec::with_capacity(handlers.len());
35
36 for handler in handlers {
37 let registry = Arc::clone(®istry);
38 let shutdown = shutdown.clone();
39 let name = handler.name();
40 info!(protocol = name, "Protocol handler starting");
41 tasks.push(tokio::spawn(async move {
42 // Cancel the shared token however this task leaves — so a peer's
43 // exit (or panic, via the drop guard) winds the whole set down.
44 let _guard = shutdown.clone().drop_guard();
45 let result = handler.run(registry, shutdown).await;
46 if let Err(e) = &result {
47 error!(protocol = name, error = %e, "Protocol handler exited with error");
48 }
49 (name, result)
50 }));
51 }
52
53 let mut first_err = None;
54 for task in tasks {
55 match task.await {
56 Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
57 Ok((_, Err(e))) => {
58 if first_err.is_none() {
59 first_err = Some(e);
60 }
61 }
62 Err(join_err) => {
63 error!(error = %join_err, "Protocol handler task panicked");
64 }
65 }
66 }
67
68 match first_err {
69 Some(e) => Err(e),
70 None => Ok(()),
71 }
72 }
73
74 /// Like [`serve`](Self::serve), but owns the shutdown trigger: it runs until
75 /// `Ctrl-C` (SIGINT) or, on Unix, `SIGTERM` — the two signals an
76 /// orchestrator (systemd, Kubernetes) uses to stop a process — then cancels
77 /// the handlers and waits for them to drain.
78 ///
79 /// This is the batteries-included entry point for a binary; use
80 /// [`serve`](Self::serve) when the host already owns a cancellation token
81 /// (e.g. to compose the engine into a larger shutdown graph).
82 ///
83 /// ```no_run
84 /// # use arcly_stream::prelude::*;
85 /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn ProtocolHandler>) -> arcly_stream::Result<()> {
86 /// engine.serve_until_signal(vec![h]).await
87 /// # }
88 /// ```
89 pub async fn serve_until_signal(
90 self: &Arc<Self>,
91 handlers: Vec<Box<dyn ProtocolHandler>>,
92 ) -> Result<()> {
93 let shutdown = CancellationToken::new();
94 let signal_token = shutdown.clone();
95 tokio::spawn(async move {
96 wait_for_shutdown_signal().await;
97 info!("Shutdown signal received; cancelling protocol handlers");
98 signal_token.cancel();
99 });
100 self.serve(handlers, shutdown).await
101 }
102
103 /// Drive a [`MediaSource`] into the bus: claim `key`, pump every frame the
104 /// source yields, and end the publish session when the source is exhausted,
105 /// errors, or `shutdown` fires.
106 ///
107 /// This is the first-class runner for the [`MediaSource`] trait — the
108 /// in-process counterpart to a socket-driven [`ProtocolHandler`]. Useful for
109 /// file/loopback ingest, test fixtures, and generated streams.
110 pub async fn pump_source<S: MediaSource>(
111 self: &Arc<Self>,
112 key: &StreamKey,
113 mut source: S,
114 shutdown: CancellationToken,
115 ) -> Result<()> {
116 let handle = self.start_publish(key).await?;
117 let result = loop {
118 tokio::select! {
119 _ = shutdown.cancelled() => break Ok(()),
120 next = source.next_frame() => match next {
121 Ok(Some(frame)) => {
122 let _ = handle.publish_frame(frame);
123 }
124 Ok(None) => break Ok(()),
125 Err(e) => {
126 warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
127 break Err(e);
128 }
129 },
130 }
131 };
132 // Always release the publish slot, even on a source error.
133 self.end_publish(key).await?;
134 result
135 }
136
137 /// Reap every publishing stream that has not received a frame within the
138 /// configured [`idle_timeout`](super::EngineConfig::idle_timeout). Returns
139 /// the number of streams reaped. A no-op when no timeout is configured.
140 pub async fn reap_idle(self: &Arc<Self>) -> usize {
141 let Some(timeout) = self.config.idle_timeout else {
142 return 0;
143 };
144 let timeout_ms = timeout.as_millis() as u64;
145 let now = crate::bus::now_ms();
146 let mut reaped = 0;
147
148 // Collect victims first; don't mutate the maps while iterating them.
149 let mut victims = Vec::new();
150 for app in self.apps.iter() {
151 for handle in app.value().active_handles() {
152 if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
153 victims.push(handle.key().clone());
154 }
155 }
156 }
157
158 for key in victims {
159 self.observer.on_stream_reaped(&key);
160 info!(stream = %key, "Reaping idle stream");
161 if self.end_publish(&key).await.is_ok() {
162 reaped += 1;
163 }
164 }
165 reaped
166 }
167
168 /// Spawn a background task that calls [`reap_idle`](Self::reap_idle) every
169 /// `interval` until `shutdown` fires. No-op (returns immediately) when no
170 /// idle timeout is configured.
171 pub fn spawn_idle_reaper(
172 self: &Arc<Self>,
173 interval: std::time::Duration,
174 shutdown: CancellationToken,
175 ) {
176 if self.config.idle_timeout.is_none() {
177 return;
178 }
179 let engine = Arc::clone(self);
180 tokio::spawn(async move {
181 let mut tick = tokio::time::interval(interval);
182 loop {
183 tokio::select! {
184 _ = shutdown.cancelled() => break,
185 _ = tick.tick() => {
186 let n = engine.reap_idle().await;
187 if n > 0 {
188 info!(reaped = n, "Idle reaper swept streams");
189 }
190 }
191 }
192 }
193 });
194 }
195}
196
197/// Resolves on the first OS shutdown signal: SIGINT everywhere, plus SIGTERM on
198/// Unix. A failure to install a handler is logged and that signal is simply
199/// never observed, rather than aborting the process.
200async fn wait_for_shutdown_signal() {
201 let ctrl_c = async {
202 if let Err(e) = tokio::signal::ctrl_c().await {
203 error!(error = %e, "failed to listen for Ctrl-C");
204 // Park forever so this branch never wins the select.
205 std::future::pending::<()>().await;
206 }
207 };
208
209 #[cfg(unix)]
210 let terminate = async {
211 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
212 Ok(mut sig) => {
213 sig.recv().await;
214 }
215 Err(e) => {
216 error!(error = %e, "failed to install SIGTERM handler");
217 std::future::pending::<()>().await;
218 }
219 }
220 };
221
222 #[cfg(not(unix))]
223 let terminate = std::future::pending::<()>();
224
225 tokio::select! {
226 _ = ctrl_c => {}
227 _ = terminate => {}
228 }
229}