arcly_stream/traits.rs
1//! The foundational extension traits a downstream user implements.
2//!
3//! These are codec- and runtime-agnostic. Implement the ones you need and the
4//! engine drives them; the engine never assumes a particular protocol,
5//! container, or storage backend.
6
7use crate::bus::PublishRegistry;
8use crate::frame::CodecId;
9use crate::{MediaFrame, Result, StreamError};
10use async_trait::async_trait;
11use bytes::Bytes;
12use std::path::Path;
13use std::sync::Arc;
14
15/// A source that produces [`MediaFrame`]s (e.g. a protocol ingest connection).
16#[async_trait]
17pub trait MediaSource: Send + Sync {
18 /// Returns the next frame, or `Ok(None)` when the source is exhausted.
19 /// An `Err` result indicates a fatal I/O or protocol error.
20 ///
21 /// Using `Result<Option<T>>` rather than `Option<Result<T>>` follows the
22 /// standard Rust I/O convention: `Ok(Some(frame))` = got a frame,
23 /// `Ok(None)` = end of stream, `Err(e)` = unrecoverable error.
24 async fn next_frame(&mut self) -> Result<Option<MediaFrame>>;
25}
26
27/// A sink that consumes [`MediaFrame`]s (e.g. a protocol playback connection,
28/// a transcoding pipeline node, or a recorder).
29#[async_trait]
30pub trait MediaSink: Send + Sync {
31 /// Send a frame to this sink.
32 async fn send_frame(&mut self, frame: MediaFrame) -> Result<()>;
33
34 /// Flush any internal buffers and signal end of stream.
35 async fn flush(&mut self) -> Result<()> {
36 Ok(())
37 }
38}
39
40/// Low-level protocol handler that accepts connections and maps them to
41/// stream identities. Each concrete protocol implementation provides this.
42///
43/// Unlike `stream-center`'s original `ProtocolHandler`, `run` receives a
44/// [`PublishRegistry`] trait object rather than reaching for a concrete
45/// `ApplicationRegistry`, so a handler is reusable against any engine — or a
46/// host's own bus implementation.
47#[async_trait]
48pub trait ProtocolHandler: Send + Sync {
49 /// Human-readable name (e.g. `"rtmp"`, `"hls"`).
50 fn name(&self) -> &'static str;
51
52 /// Start accepting connections. Implementations should run until the
53 /// provided `shutdown` token is cancelled, resolving incoming connections
54 /// to a [`StreamKey`](crate::StreamKey) and forwarding frames through
55 /// `registry`.
56 async fn run(
57 &self,
58 registry: Arc<dyn PublishRegistry>,
59 shutdown: tokio_util::sync::CancellationToken,
60 ) -> Result<()>;
61}
62
63/// Object-storage abstraction used by recording and HLS/DASH packagers.
64#[async_trait]
65pub trait StorageBackend: Send + Sync + 'static {
66 /// Write `data` at `key`. Overwrites if the key exists.
67 async fn put(&self, key: &str, data: Bytes) -> Result<()>;
68
69 /// Read the object stored at `key`.
70 async fn get(&self, key: &str) -> Result<Bytes>;
71
72 /// Delete the object at `key`.
73 async fn delete(&self, key: &str) -> Result<()>;
74
75 /// List all keys with the given `prefix`.
76 async fn list(&self, prefix: &str) -> Result<Vec<String>>;
77
78 /// Returns `true` if the object exists.
79 async fn exists(&self, key: &str) -> Result<bool> {
80 match self.get(key).await {
81 Ok(_) => Ok(true),
82 Err(StreamError::StorageNotFound(_)) => Ok(false),
83 Err(e) => Err(e),
84 }
85 }
86
87 /// Write the file at `path` to `key`, then remove the source file.
88 ///
89 /// The default implementation reads the file into memory and calls [`put`](Self::put).
90 /// Backends that share a filesystem with the temp directory (e.g. disk)
91 /// should override this with a rename to avoid the memory round-trip.
92 async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
93 let data = tokio::fs::read(path)
94 .await
95 .map_err(|e| StreamError::storage(format!("put_file read failed: {e}")))?;
96 self.put(key, Bytes::from(data)).await?;
97 tokio::fs::remove_file(path).await.ok();
98 Ok(())
99 }
100}
101
102/// `Arc<B>` is itself a [`StorageBackend`], so a single backend can be shared
103/// across many packagers/recorders without wrapping boilerplate.
104#[async_trait]
105impl<B: StorageBackend> StorageBackend for Arc<B> {
106 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
107 (**self).put(key, data).await
108 }
109 async fn get(&self, key: &str) -> Result<Bytes> {
110 (**self).get(key).await
111 }
112 async fn delete(&self, key: &str) -> Result<()> {
113 (**self).delete(key).await
114 }
115 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
116 (**self).list(prefix).await
117 }
118 async fn exists(&self, key: &str) -> Result<bool> {
119 (**self).exists(key).await
120 }
121 async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
122 (**self).put_file(key, path).await
123 }
124}
125
126/// Hardware-acceleration backend (NVENC, AMF, QSV, NPU, …).
127///
128/// Implementors are registered at startup based on available hardware.
129pub trait HwAccelBackend: Send + Sync {
130 /// Unique backend identifier (e.g. `"nvenc"`, `"qsv"`).
131 fn id(&self) -> &'static str;
132
133 /// Returns whether the backend is usable on this machine.
134 fn is_available(&self) -> bool;
135
136 /// List codec IDs this backend can encode.
137 fn supported_encoders(&self) -> &[CodecId];
138
139 /// List codec IDs this backend can decode.
140 fn supported_decoders(&self) -> &[CodecId];
141}