Skip to main content

arcly_stream/
traits.rs

1//! The foundational extension traits a downstream user implements.
2//!
3//! These are codec- and runtime-agnostic. Implement the ones you need and the
4//! engine drives them; the engine never assumes a particular protocol,
5//! container, or storage backend.
6
7use crate::bus::PublishRegistry;
8use crate::frame::CodecId;
9use crate::{MediaFrame, Result, StreamError};
10use async_trait::async_trait;
11use bytes::Bytes;
12use std::path::Path;
13use std::sync::Arc;
14
15/// A source that produces [`MediaFrame`]s (e.g. a protocol ingest connection).
16#[async_trait]
17pub trait MediaSource: Send + Sync {
18    /// Returns the next frame, or `Ok(None)` when the source is exhausted.
19    /// An `Err` result indicates a fatal I/O or protocol error.
20    ///
21    /// Using `Result<Option<T>>` rather than `Option<Result<T>>` follows the
22    /// standard Rust I/O convention: `Ok(Some(frame))` = got a frame,
23    /// `Ok(None)` = end of stream, `Err(e)` = unrecoverable error.
24    async fn next_frame(&mut self) -> Result<Option<MediaFrame>>;
25}
26
27/// A sink that consumes [`MediaFrame`]s (e.g. a protocol playback connection,
28/// a transcoding pipeline node, or a recorder).
29#[async_trait]
30pub trait MediaSink: Send + Sync {
31    /// Send a frame to this sink.
32    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()>;
33
34    /// Flush any internal buffers and signal end of stream.
35    async fn flush(&mut self) -> Result<()> {
36        Ok(())
37    }
38}
39
40/// Low-level protocol handler that accepts connections and maps them to
41/// stream identities.  Each concrete protocol implementation provides this.
42///
43/// Unlike `stream-center`'s original `ProtocolHandler`, `run` receives a
44/// [`PublishRegistry`] trait object rather than reaching for a concrete
45/// `ApplicationRegistry`, so a handler is reusable against any engine — or a
46/// host's own bus implementation.
47#[async_trait]
48pub trait ProtocolHandler: Send + Sync {
49    /// Human-readable name (e.g. `"rtmp"`, `"hls"`).
50    fn name(&self) -> &'static str;
51
52    /// Start accepting connections.  Implementations should run until the
53    /// provided `shutdown` token is cancelled, resolving incoming connections
54    /// to a [`StreamKey`](crate::StreamKey) and forwarding frames through
55    /// `registry`.
56    async fn run(
57        &self,
58        registry: Arc<dyn PublishRegistry>,
59        shutdown: tokio_util::sync::CancellationToken,
60    ) -> Result<()>;
61}
62
63/// Object-storage abstraction used by recording and HLS/DASH packagers.
64#[async_trait]
65pub trait StorageBackend: Send + Sync + 'static {
66    /// Write `data` at `key`.  Overwrites if the key exists.
67    async fn put(&self, key: &str, data: Bytes) -> Result<()>;
68
69    /// Read the object stored at `key`.
70    async fn get(&self, key: &str) -> Result<Bytes>;
71
72    /// Delete the object at `key`.
73    async fn delete(&self, key: &str) -> Result<()>;
74
75    /// List all keys with the given `prefix`.
76    async fn list(&self, prefix: &str) -> Result<Vec<String>>;
77
78    /// Returns `true` if the object exists.
79    async fn exists(&self, key: &str) -> Result<bool> {
80        match self.get(key).await {
81            Ok(_) => Ok(true),
82            Err(StreamError::StorageNotFound(_)) => Ok(false),
83            Err(e) => Err(e),
84        }
85    }
86
87    /// Write the file at `path` to `key`, then remove the source file.
88    ///
89    /// The default implementation reads the file into memory and calls [`put`](Self::put).
90    /// Backends that share a filesystem with the temp directory (e.g. disk)
91    /// should override this with a rename to avoid the memory round-trip.
92    async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
93        let data = tokio::fs::read(path)
94            .await
95            .map_err(|e| StreamError::storage(format!("put_file read failed: {e}")))?;
96        self.put(key, Bytes::from(data)).await?;
97        tokio::fs::remove_file(path).await.ok();
98        Ok(())
99    }
100}
101
102/// `Arc<B>` is itself a [`StorageBackend`], so a single backend can be shared
103/// across many packagers/recorders without wrapping boilerplate.
104#[async_trait]
105impl<B: StorageBackend> StorageBackend for Arc<B> {
106    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
107        (**self).put(key, data).await
108    }
109    async fn get(&self, key: &str) -> Result<Bytes> {
110        (**self).get(key).await
111    }
112    async fn delete(&self, key: &str) -> Result<()> {
113        (**self).delete(key).await
114    }
115    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
116        (**self).list(prefix).await
117    }
118    async fn exists(&self, key: &str) -> Result<bool> {
119        (**self).exists(key).await
120    }
121    async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
122        (**self).put_file(key, path).await
123    }
124}
125
126/// Hardware-acceleration backend (NVENC, AMF, QSV, NPU, …).
127///
128/// Implementors are registered at startup based on available hardware.
129pub trait HwAccelBackend: Send + Sync {
130    /// Unique backend identifier (e.g. `"nvenc"`, `"qsv"`).
131    fn id(&self) -> &'static str;
132
133    /// Returns whether the backend is usable on this machine.
134    fn is_available(&self) -> bool;
135
136    /// List codec IDs this backend can encode.
137    fn supported_encoders(&self) -> &[CodecId];
138
139    /// List codec IDs this backend can decode.
140    fn supported_decoders(&self) -> &[CodecId];
141}