use super::Engine;
use crate::inbound::{InboundProtocol, IngestContext};
use crate::traits::MediaSource;
use crate::{PublishRegistry, Result, StreamKey};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
impl Engine {
pub async fn serve(
self: &Arc<Self>,
handlers: Vec<Box<dyn InboundProtocol>>,
shutdown: CancellationToken,
) -> Result<()> {
let ctx = IngestContext::new(self.clone());
let mut tasks = Vec::with_capacity(handlers.len());
for handler in handlers {
let ctx = ctx.clone();
let shutdown = shutdown.clone();
let name = handler.name();
info!(protocol = name, "Protocol handler starting");
tasks.push(tokio::spawn(async move {
let _guard = shutdown.clone().drop_guard();
let result = handler.serve(ctx, shutdown).await;
if let Err(e) = &result {
error!(protocol = name, error = %e, "Protocol handler exited with error");
}
(name, result)
}));
}
let mut first_err = None;
for task in tasks {
match task.await {
Ok((name, Ok(()))) => info!(protocol = name, "Protocol handler stopped"),
Ok((_, Err(e))) => {
if first_err.is_none() {
first_err = Some(e);
}
}
Err(join_err) => {
error!(error = %join_err, "Protocol handler task panicked");
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}
pub async fn serve_until_signal(
self: &Arc<Self>,
handlers: Vec<Box<dyn InboundProtocol>>,
) -> Result<()> {
let shutdown = CancellationToken::new();
let signal_token = shutdown.clone();
tokio::spawn(async move {
wait_for_shutdown_signal().await;
info!("Shutdown signal received; cancelling protocol handlers");
signal_token.cancel();
});
self.serve(handlers, shutdown).await
}
pub async fn serve_registered(self: &Arc<Self>, shutdown: CancellationToken) -> Result<()> {
let handlers = {
let mut guard = self
.pending_protocols
.lock()
.unwrap_or_else(|p| p.into_inner());
std::mem::take(&mut *guard)
};
self.serve(handlers, shutdown).await
}
pub async fn serve_registered_until_signal(self: &Arc<Self>) -> Result<()> {
let handlers = {
let mut guard = self
.pending_protocols
.lock()
.unwrap_or_else(|p| p.into_inner());
std::mem::take(&mut *guard)
};
self.serve_until_signal(handlers).await
}
pub async fn pump_source<S: MediaSource>(
self: &Arc<Self>,
key: &StreamKey,
mut source: S,
shutdown: CancellationToken,
) -> Result<()> {
let handle = self.start_publish(key).await?;
let result = loop {
tokio::select! {
_ = shutdown.cancelled() => break Ok(()),
next = source.next_frame() => match next {
Ok(Some(frame)) => {
let _ = handle.publish_frame(frame);
}
Ok(None) => break Ok(()),
Err(e) => {
warn!(stream = %key, error = %e, "MediaSource errored; ending publish");
break Err(e);
}
},
}
};
self.end_publish(key).await?;
result
}
pub async fn reap_idle(self: &Arc<Self>) -> usize {
let Some(timeout) = self.config.idle_timeout else {
return 0;
};
let timeout_ms = timeout.as_millis() as u64;
let now = crate::bus::now_ms();
let mut reaped = 0;
let mut victims = Vec::new();
for app in self.apps.iter() {
for handle in app.value().active_handles() {
if now.saturating_sub(handle.last_frame_ms()) >= timeout_ms {
victims.push(handle.key().clone());
}
}
}
for key in victims {
self.observer.on_stream_reaped(&key);
info!(stream = %key, "Reaping idle stream");
if self.end_publish(&key).await.is_ok() {
reaped += 1;
}
}
reaped
}
pub fn spawn_idle_reaper(
self: &Arc<Self>,
interval: std::time::Duration,
shutdown: CancellationToken,
) {
if self.config.idle_timeout.is_none() {
return;
}
let engine = Arc::clone(self);
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
loop {
tokio::select! {
_ = shutdown.cancelled() => break,
_ = tick.tick() => {
let n = engine.reap_idle().await;
if n > 0 {
info!(reaped = n, "Idle reaper swept streams");
}
}
}
}
});
}
}
async fn wait_for_shutdown_signal() {
let ctrl_c = async {
if let Err(e) = tokio::signal::ctrl_c().await {
error!(error = %e, "failed to listen for Ctrl-C");
std::future::pending::<()>().await;
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(e) => {
error!(error = %e, "failed to install SIGTERM handler");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {}
_ = terminate => {}
}
}