Skip to main content

arcly_stream/engine/
driver.rs

1//! The run loop — `arcly-http`'s `App::launch` analogue for protocol handlers.
2
3use super::Engine;
4use crate::traits::{MediaSource, ProtocolHandler};
5use crate::{PublishRegistry, Result, StreamKey};
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info, warn};
9
10impl Engine {
11    /// Drive the engine: run every protocol handler concurrently until
12    /// `shutdown` fires, then let them observe the cancellation and wind down.
13    ///
14    /// **Coordinated teardown:** when *any* handler returns — cleanly, with an
15    /// error, or by panicking — the shared `shutdown` token is cancelled so the
16    /// remaining handlers wind down too. A single listener dying never leaves
17    /// its siblings orphaned.
18    ///
19    /// Returns the first fatal handler error, if any; otherwise `Ok(())` once
20    /// all handlers have returned.
21    ///
22    /// ```no_run
23    /// # use arcly_stream::prelude::*;
24    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn ProtocolHandler>) -> arcly_stream::Result<()> {
25    /// engine.serve(vec![h], CancellationToken::new()).await
26    /// # }
27    /// ```
28    pub async fn serve(
29        self: &Arc<Self>,
30        handlers: Vec<Box<dyn ProtocolHandler>>,
31        shutdown: CancellationToken,
32    ) -> Result<()> {
33        let registry: Arc<dyn crate::bus::PublishRegistry> = self.clone();
34        let mut tasks = Vec::with_capacity(handlers.len());
35
36        for handler in handlers {
37            let registry = Arc::clone(&registry);
38            let shutdown = shutdown.clone();
39            let name = handler.name();
40            info!(protocol = name, "Protocol handler starting");
41            tasks.push(tokio::spawn(async move {
42                // Cancel the shared token however this task leaves — so a peer's
43                // exit (or panic, via the drop guard) winds the whole set down.
44                let _guard = shutdown.clone().drop_guard();
45                let result = handler.run(registry, shutdown).await;
46                if let Err(e) = &result {
47                    error!(protocol = name, error = %e, "Protocol handler exited with error");
48                }
49                (name, result)
50            }));
51        }
52
53        let mut first_err = None;
54        for task in tasks {
55            match task.await {
56                Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
57                Ok((_, Err(e))) => {
58                    if first_err.is_none() {
59                        first_err = Some(e);
60                    }
61                }
62                Err(join_err) => {
63                    error!(error = %join_err, "Protocol handler task panicked");
64                }
65            }
66        }
67
68        match first_err {
69            Some(e) => Err(e),
70            None => Ok(()),
71        }
72    }
73
74    /// Like [`serve`](Self::serve), but owns the shutdown trigger: it runs until
75    /// `Ctrl-C` (SIGINT) or, on Unix, `SIGTERM` — the two signals an
76    /// orchestrator (systemd, Kubernetes) uses to stop a process — then cancels
77    /// the handlers and waits for them to drain.
78    ///
79    /// This is the batteries-included entry point for a binary; use
80    /// [`serve`](Self::serve) when the host already owns a cancellation token
81    /// (e.g. to compose the engine into a larger shutdown graph).
82    ///
83    /// ```no_run
84    /// # use arcly_stream::prelude::*;
85    /// # async fn run(engine: std::sync::Arc<Engine>, h: Box<dyn ProtocolHandler>) -> arcly_stream::Result<()> {
86    /// engine.serve_until_signal(vec![h]).await
87    /// # }
88    /// ```
89    pub async fn serve_until_signal(
90        self: &Arc<Self>,
91        handlers: Vec<Box<dyn ProtocolHandler>>,
92    ) -> Result<()> {
93        let shutdown = CancellationToken::new();
94        let signal_token = shutdown.clone();
95        tokio::spawn(async move {
96            wait_for_shutdown_signal().await;
97            info!("Shutdown signal received; cancelling protocol handlers");
98            signal_token.cancel();
99        });
100        self.serve(handlers, shutdown).await
101    }
102
103    /// Drive a [`MediaSource`] into the bus: claim `key`, pump every frame the
104    /// source yields, and end the publish session when the source is exhausted,
105    /// errors, or `shutdown` fires.
106    ///
107    /// This is the first-class runner for the [`MediaSource`] trait — the
108    /// in-process counterpart to a socket-driven [`ProtocolHandler`]. Useful for
109    /// file/loopback ingest, test fixtures, and generated streams.
110    pub async fn pump_source<S: MediaSource>(
111        self: &Arc<Self>,
112        key: &StreamKey,
113        mut source: S,
114        shutdown: CancellationToken,
115    ) -> Result<()> {
116        let handle = self.start_publish(key).await?;
117        let result = loop {
118            tokio::select! {
119                _ = shutdown.cancelled() => break Ok(()),
120                next = source.next_frame() => match next {
121                    Ok(Some(frame)) => {
122                        let _ = handle.publish_frame(frame);
123                    }
124                    Ok(None) => break Ok(()),
125                    Err(e) => {
126                        warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
127                        break Err(e);
128                    }
129                },
130            }
131        };
132        // Always release the publish slot, even on a source error.
133        self.end_publish(key).await?;
134        result
135    }
136
137    /// Reap every publishing stream that has not received a frame within the
138    /// configured [`idle_timeout`](super::EngineConfig::idle_timeout). Returns
139    /// the number of streams reaped. A no-op when no timeout is configured.
140    pub async fn reap_idle(self: &Arc<Self>) -> usize {
141        let Some(timeout) = self.config.idle_timeout else {
142            return 0;
143        };
144        let timeout_ms = timeout.as_millis() as u64;
145        let now = crate::bus::now_ms();
146        let mut reaped = 0;
147
148        // Collect victims first; don't mutate the maps while iterating them.
149        let mut victims = Vec::new();
150        for app in self.apps.iter() {
151            for handle in app.value().active_handles() {
152                if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
153                    victims.push(handle.key().clone());
154                }
155            }
156        }
157
158        for key in victims {
159            self.observer.on_stream_reaped(&key);
160            info!(stream = %key, "Reaping idle stream");
161            if self.end_publish(&key).await.is_ok() {
162                reaped += 1;
163            }
164        }
165        reaped
166    }
167
168    /// Spawn a background task that calls [`reap_idle`](Self::reap_idle) every
169    /// `interval` until `shutdown` fires. No-op (returns immediately) when no
170    /// idle timeout is configured.
171    pub fn spawn_idle_reaper(
172        self: &Arc<Self>,
173        interval: std::time::Duration,
174        shutdown: CancellationToken,
175    ) {
176        if self.config.idle_timeout.is_none() {
177            return;
178        }
179        let engine = Arc::clone(self);
180        tokio::spawn(async move {
181            let mut tick = tokio::time::interval(interval);
182            loop {
183                tokio::select! {
184                    _ = shutdown.cancelled() => break,
185                    _ = tick.tick() => {
186                        let n = engine.reap_idle().await;
187                        if n > 0 {
188                            info!(reaped = n, "Idle reaper swept streams");
189                        }
190                    }
191                }
192            }
193        });
194    }
195}
196
197/// Resolves on the first OS shutdown signal: SIGINT everywhere, plus SIGTERM on
198/// Unix. A failure to install a handler is logged and that signal is simply
199/// never observed, rather than aborting the process.
200async fn wait_for_shutdown_signal() {
201    let ctrl_c = async {
202        if let Err(e) = tokio::signal::ctrl_c().await {
203            error!(error = %e, "failed to listen for Ctrl-C");
204            // Park forever so this branch never wins the select.
205            std::future::pending::<()>().await;
206        }
207    };
208
209    #[cfg(unix)]
210    let terminate = async {
211        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
212            Ok(mut sig) => {
213                sig.recv().await;
214            }
215            Err(e) => {
216                error!(error = %e, "failed to install SIGTERM handler");
217                std::future::pending::<()>().await;
218            }
219        }
220    };
221
222    #[cfg(not(unix))]
223    let terminate = std::future::pending::<()>();
224
225    tokio::select! {
226        _ = ctrl_c => {}
227        _ = terminate => {}
228    }
229}