use crate::bus::{PublishRegistry, StreamHandle};
use crate::{MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
#[async_trait]
pub trait InboundProtocol: Send + Sync + 'static {
fn name(&self) -> &'static str;
async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()>;
}
#[async_trait]
impl<T: crate::traits::ProtocolHandler + 'static> InboundProtocol for T {
fn name(&self) -> &'static str {
crate::traits::ProtocolHandler::name(self)
}
async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
crate::traits::ProtocolHandler::run(self, Arc::clone(ctx.registry()), shutdown).await
}
}
#[derive(Clone)]
pub struct IngestContext {
registry: Arc<dyn PublishRegistry>,
}
impl IngestContext {
pub fn new(registry: Arc<dyn PublishRegistry>) -> Self {
Self { registry }
}
pub async fn open_publish(&self, key: StreamKey) -> Result<PublishSession> {
let handle = self.registry.start_publish(&key).await?;
Ok(PublishSession {
handle,
registry: Arc::clone(&self.registry),
key,
released: false,
})
}
pub fn registry(&self) -> &Arc<dyn PublishRegistry> {
&self.registry
}
}
pub struct PublishSession {
handle: StreamHandle,
registry: Arc<dyn PublishRegistry>,
key: StreamKey,
released: bool,
}
impl PublishSession {
pub fn key(&self) -> &StreamKey {
&self.key
}
pub fn handle(&self) -> &StreamHandle {
&self.handle
}
pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize> {
self.handle.publish_frame(frame)
}
pub async fn finish(mut self) -> Result<()> {
self.released = true;
self.registry.end_publish(&self.key).await
}
}
impl Drop for PublishSession {
fn drop(&mut self) {
if self.released {
return;
}
if let Ok(rt) = tokio::runtime::Handle::try_current() {
let registry = Arc::clone(&self.registry);
let key = self.key.clone();
rt.spawn(async move {
let _ = registry.end_publish(&key).await;
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::PlaybackRegistry;
use crate::{AppSpec, CodecId, Engine, FrameFlags};
use bytes::Bytes;
struct DemoProtocol {
key: StreamKey,
}
#[async_trait]
impl InboundProtocol for DemoProtocol {
fn name(&self) -> &'static str {
"demo"
}
async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
let session = ctx.open_publish(self.key.clone()).await?;
let mut cfg =
MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, false);
cfg.flags |= FrameFlags::CONFIG;
session.publish_frame(cfg)?;
session.publish_frame(MediaFrame::new_video(
0,
0,
Bytes::from_static(b"idr"),
CodecId::H264,
true,
))?;
shutdown.cancelled().await;
session.finish().await
}
}
#[tokio::test]
async fn custom_protocol_publishes_through_ingest_context() {
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(8))
.build();
let key = StreamKey::new("live", "cam");
let ctx = IngestContext::new(engine.clone());
let proto = DemoProtocol { key: key.clone() };
let shutdown = CancellationToken::new();
let worker = {
let shutdown = shutdown.clone();
tokio::spawn(async move { proto.serve(ctx, shutdown).await })
};
let handle = loop {
if let Ok(h) = engine.get_stream(&key) {
if h.replay_buffer().iter().any(|f| f.is_keyframe()) {
break h;
}
}
tokio::task::yield_now().await;
};
let (vcfg, _) = handle.cached_configs();
assert!(vcfg.is_some(), "config frame cached via PublishSession");
shutdown.cancel();
worker.await.unwrap().unwrap();
assert!(
engine.get_stream(&key).is_err(),
"session released on finish"
);
}
#[tokio::test]
async fn dropping_session_releases_the_slot() {
let engine = Engine::builder().application(AppSpec::new("live")).build();
let key = StreamKey::new("live", "drop-test");
let ctx = IngestContext::new(engine.clone());
{
let _session = ctx.open_publish(key.clone()).await.unwrap();
assert!(engine.get_stream(&key).is_ok(), "stream live while held");
}
for _ in 0..16 {
if engine.get_stream(&key).is_err() {
break;
}
tokio::task::yield_now().await;
}
assert!(engine.get_stream(&key).is_err(), "slot released on drop");
}
}