<div align="center">
# arcly-stream
**A high-performance live-media streaming kernel for Rust.**
Lock-free, zero-copy frame fan-out · instant-start GOP cache · pluggable HLS &
recording · publish/play authorization · live QoS — with **zero** baked-in
config, metrics singletons, or HTTP runtime.
[](https://crates.io/crates/arcly-stream)
[](https://docs.rs/arcly-stream)
[](#license)
[](#minimum-supported-rust-version)
</div>
---
## Why arcly-stream?
Building a streaming server means solving the same hard core every time:
fanning one publisher's frames out to thousands of late-joining subscribers,
**without copying payloads, without locks on the hot path, and without dropping
the slow ones silently.** Everything else — which wire protocol you ingest, how
you package egress, where you store segments, how you authorize and observe — is
deployment-specific.
`arcly-stream` is exactly that hard core, extracted and hardened into a reusable
**kernel**. You implement a handful of small traits for the parts unique to your
system; the kernel owns the lock-free pub/sub bus, instant-start replay,
back-pressure handling, lifecycle, and quality-of-service measurement.
It deliberately ships **no opinion** about your runtime: no global registries, no
TOML schema, no axum server. Telemetry, authorization, and audit are *injected*
traits with safe defaults — never singletons.
## Architecture
Three tiers: a small always-on **kernel**, a feature-gated **pure-Rust media
plane**, and **wire protocols deferred to traits** (so the kernel never drags in
a native codec or transport you didn't ask for).
```text
┌───────────────────────────────────────────────┐
│ CORE KERNEL │
│ (always compiled) │
publisher ───────▶ │ Engine ─ StreamHandle │ ──────▶ subscribers
(your protocol) │ • broadcast fan-out (zero-copy Arc<Frame>) │ (packager / SFU /
│ • GOP cache → instant-start replay │ recorder / edge)
│ • live QoS (bitrate / fps) │
│ PublishRegistry · PlaybackRegistry · EventBus │
│ Observer · StreamAuthenticator · AuditSink │
│ HealthRegistry · MediaFrame · StreamError │
└───────────────────────────────────────────────┘
▲ implements traits ▲ feature-gated, pure Rust
┌──────────────────────────┴───────────┐ ┌─────────────┴───────────────────────────┐
│ WIRE PROTOCOLS & MUXERS │ │ MEDIA PLANE │
│ (rtmp/mpegts ready · rest traits) │ │ (opt-in features) │
│ rtmp RTMP publish/play ✅ │ │ codec H.264/H.265/AV1/VP9/VVC · AAC │
│ mpegts MPEG-TS muxer ✅ │ │ hls Packager + keyframe segmenter │
│ InboundProtocol SRT · WHIP · RTSP │ │ record RecordingSink (VOD/DVR) │
│ Transcoder/ClusterRelay (traits) │ │ ingest TCP accept loop · rate limit │
└───────────────────────────────────────┘ └───────────────────────────────────────┘
```
The `rtmp` handler (publish **and** play over real RTMP) and the `mpegts` muxer
(playable `.ts` segments) ship operational. Remaining transports (SRT, WHIP,
RTSP) and ABR/cluster backends remain trait seams a host fills in.
## Feature matrix
| Zero-copy broadcast fan-out | ✅ **Core** | `StreamHandle::subscribe_resilient` |
| Instant-start GOP replay | ✅ **Core** | `AppSpec::gop_cache(frames)` + `replay_buffer()` |
| Publish/play authorization | ✅ **Core** | `StreamAuthenticator` (permit-all default) |
| Live QoS (bitrate / fps) | ✅ **Core** | `StreamHandle::qos()` / `metadata_snapshot()` |
| Slow-subscriber eviction | ✅ **Core** | `Subscription::max_lag` + observer hooks |
| Idle-stream reaper | ✅ **Core** | `EngineBuilder::idle_timeout` |
| Coordinated graceful shutdown | ✅ **Core** | `Engine::serve` / `serve_until_signal` |
| Health & audit | ✅ **Core** | `HealthRegistry`, `AuditPipeline`/`AuditSink` |
| **RTMP publish / play** | ✅ `rtmp` | `protocol::rtmp::RtmpHandler` — real handshake, chunks, AMF0, FLV |
| **MPEG-TS muxing (playable)** | ✅ `mpegts` | `packager::MpegTsMuxer` — PAT/PMT/PES/PCR `.ts` |
| H.264 / H.265 / AV1 / VP9 / VVC / AAC parsing | 🧩 `codec*` | `codec::{h264,h265,av1,vp9,vvc,aac}` (pure Rust) |
| HLS / LL-HLS packaging | 🧩 `hls` | `HlsSegmenter` + `HlsPlaylist` → `StorageBackend` |
| Recording (VOD / DVR) | 🧩 `record` | `RecordingSink` |
| Filesystem object storage | 🧩 `storage-fs` | `FsStorage` |
| Prometheus metrics | 🧩 `metrics` | `PrometheusObserver` |
| Ingest scaffolding | 🧩 `ingest` | TCP accept loop, keyframe gate, NAL scan |
| SRT / WebRTC (WHIP) / RTSP ingest | 🧭 **Deferred** | implement `InboundProtocol` |
| fMP4 / CMAF muxing | 🧭 **Deferred** | implement `Muxer` |
| Hardware transcoding / ABR | 🧭 **Deferred** | implement `Transcoder` |
| Edge federation | 🧭 **Deferred** | implement `ClusterRelay` |
> ✅ `rtmp` + `mpegts` are operational, native-Rust, and `unsafe`-free: you can
> ingest from OBS/FFmpeg and write playable HLS today.
> 🧭 **Deferred** means the *seam* exists as a documented trait. SRT, WebRTC,
> RTSP, fMP4/CMAF, and FFmpeg/NVENC transcoding need heavier native stacks and
> live in downstream crates or your application — the kernel stays lean.
## Install
```toml
[dependencies]
arcly-stream = "0.1"
# Opt into the pure-Rust media plane as needed:
# arcly-stream = { version = "0.1", features = ["hls", "record", "codec"] }
# A full RTMP-in → HLS-out origin (everything you need to ingest + package):
# arcly-stream = { version = "0.1", features = ["rtmp", "mpegts", "hls", "storage-fs"] }
```
## Quick start
### Basic — publish and subscribe
```rust
use arcly_stream::prelude::*;
#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
let engine = Engine::builder()
.max_publishers(1_000)
.application(AppSpec::new("live"))
.build();
let key = StreamKey::new("live", "demo");
// A publisher claims the stream and pumps frames (zero-copy fan-out):
let handle = engine.start_publish(&key).await?;
// A subscriber (packager / recorder / SFU) drains them. `subscribe_resilient`
// resynchronizes past `broadcast` lag instead of dying on a slow tick.
let mut sub = engine.get_stream(&key)?.subscribe_resilient();
tokio::spawn(async move {
while let Some(frame) = sub.recv().await {
// packetize `frame` …
let _ = frame;
}
});
handle.publish_frame(MediaFrame::new_video(
0, 0, bytes::Bytes::from_static(b"…"), CodecId::H264, true,
))?;
engine.end_publish(&key).await?;
Ok(())
}
```
### Advanced — instant-start, authorization, and HLS packaging
```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::packager::{HlsSegmenter, Packager};
use arcly_stream::storage::FsStorage;
// 1. A custom authorizer: only the right stream key may publish.
struct KeyAuth { secret: String }
#[async_trait]
impl StreamAuthenticator for KeyAuth {
async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> arcly_stream::Result<()> {
match creds.token.as_deref() {
Some(t) if t == self.secret => Ok(()),
_ => Err(StreamError::Unauthorized("bad publish key".into())),
}
}
}
#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
let engine = Engine::builder()
// 120-frame keyframe-anchored GOP cache → sub-second join times:
.application(AppSpec::new("live").gop_cache(120))
.authenticator(KeyAuth { secret: "s3cr3t".into() })
.idle_timeout(std::time::Duration::from_secs(30))
.build();
let key = StreamKey::new("live", "cam");
let handle = engine
.start_publish_authorized(&key, &Credentials::token("s3cr3t"))
.await?;
// 2. Package the live stream to HLS, writing segments + playlist to disk.
// `MpegTsMuxer` (feature `mpegts`) produces playable `.ts` segments.
let mut hls = HlsSegmenter::new(
arcly_stream::packager::MpegTsMuxer::new(),
FsStorage::new("/var/hls"),
"live/cam",
/* target seconds */ 4,
/* playlist window */ 6,
);
let mut sub = engine.get_stream(&key)?.subscribe_resilient();
tokio::spawn(async move {
while let Some(frame) = sub.recv().await {
let _ = hls.push(&frame).await;
}
let _ = hls.finish().await;
});
// … feed `handle.publish_frame(..)` from your protocol parser …
let _ = handle;
Ok(())
}
```
### Operational — a real RTMP-in → HLS-out origin
No hand-rolled protocol parser needed: the `rtmp` handler ingests from OBS or
FFmpeg, and the `mpegts` muxer writes playable HLS. This is a complete live
origin in ~20 lines.
```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::protocol::rtmp::RtmpHandler;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(120)) // instant-start replay
.build();
// `.with_playback(engine.clone())` enables RTMP play (egress) too.
let rtmp = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
.with_playback(engine.clone());
// Publish: ffmpeg -re -i in.mp4 -c copy -f flv rtmp://localhost/live/cam
// Play: ffplay rtmp://localhost/live/cam
// (point an HlsSegmenter + MpegTsMuxer at the same stream for HLS egress)
engine.serve(vec![Box::new(rtmp)], CancellationToken::new()).await
}
```
Run the bundled, fully-wired version (RTMP ingest **and** disk HLS) with
[`examples/rtmp_to_hls`](examples/rtmp_to_hls):
```bash
cargo run -p rtmp-to-hls
# then: ffmpeg -re -i input.mp4 -c:v libx264 -c:a aac -f flv rtmp://localhost:1935/live/cam
# and play the HLS it writes under ./hls/live/cam/index.m3u8
```
## Extending the Kernel: Implementing Custom Protocols
`arcly-stream` is a **multi-protocol ingestion architecture**: teaching the engine
a new inbound wire protocol (RTSP, SRT, WebRTC WHIP/WHEP, your own) is one trait
impl in *your* crate — the engine kernel never changes. The bundled `RtmpHandler`
is just the first reference implementation of the same public trait you'll use.
Three public types ([`inbound`](https://docs.rs/arcly-stream/latest/arcly_stream/inbound/index.html)) form the seam:
| `InboundProtocol` | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. `Send + Sync + 'static`. |
| `IngestContext` | The cloneable handle your worker uses to reach the engine bus; hands out publish sessions. |
| `PublishSession` | An RAII token for one live stream — every frame lands in the GOP cache + live QoS; dropping it frees the publish slot. |
```text
your crate arcly-stream kernel
┌────────────────────────┐ ┌───────────────────────────────┐
│ struct MyRtspHandler │ serve │ Engine (lock-free bus) │
│ impl InboundProtocol { │◀─────────│ • broadcast fan-out │
│ async fn serve(ctx){ │ │ • GOP cache (instant start) │
│ ctx.open_publish()──┼────────▶ │ • live QoS counters │
│ .publish_frame() │ frames │ PublishRegistry │
│ } } │ │ │
└────────────────────────┘ └───────────────────────────────┘
```
Implement the trait, then register it on the builder **next to** the native RTMP
handler — both run concurrently under one coordinated shutdown:
```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::inbound::{InboundProtocol, IngestContext};
use arcly_stream::protocol::rtmp::RtmpHandler;
struct MyProtocol { /* listener config, handshake state, … */ }
#[async_trait]
impl InboundProtocol for MyProtocol {
fn name(&self) -> &'static str { "my-proto" }
async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken)
-> arcly_stream::Result<()>
{
// 1. bind your listener, 2. accept + handshake, 3. resolve a StreamKey…
let session = ctx.open_publish(StreamKey::new("live", "cam")).await?;
// 4. bridge decoded access units → MediaFrame and publish:
// session.publish_frame(frame)?; // → fan-out · GOP cache · QoS
shutdown.cancelled().await; // 5. wind down on shutdown
session.finish().await // (Drop releases it otherwise)
}
}
#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(120))
.protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap())) // native
.protocol(MyProtocol { /* … */ }) // yours
.build();
engine.serve_registered(CancellationToken::new()).await
}
```
**Runtime contract:** workers are shared across tasks (`Send + Sync + 'static`),
must observe `shutdown` and return promptly, and run on a Tokio runtime. Returning
`Err` trips the engine's coordinated teardown, winding sibling workers down too.
Existing `ProtocolHandler` impls keep working unchanged — a blanket bridge makes
every one an `InboundProtocol` automatically.
A complete, runnable version (a custom protocol registered alongside RTMP) lives
in [`examples/custom_protocol_plugin`](examples/custom_protocol_plugin):
```bash
cargo run -p custom-protocol-plugin
```
## Extension points (the traits you implement)
| `InboundProtocol` | **Teach the engine a new wire protocol** — own a listener, bridge frames (RTMP **shipped**; RTSP, SRT, WHIP… one trait impl away) |
| `ProtocolHandler` | Legacy registry-only handler; auto-bridged to `InboundProtocol` |
| `MediaSource` / `MediaSink` | Produce / consume frames (driven by `Engine::pump_source`) |
| `StorageBackend` | Object storage for segments & recordings |
| `Muxer` | Container byte-format for HLS segments (MPEG-TS **shipped**; fMP4/CMAF DIY) |
| `Transcoder` | Decode/scale/encode into an ABR ladder |
| `StreamAuthenticator` | Authorize publish / play (permit-all by default) |
| `Observer` / `AuditSink` / `HealthCheck` | Telemetry, audit trail, readiness |
| `ClusterRelay` | Origin/edge discovery & stream federation |
| `PublishRegistry` / `PlaybackRegistry` / `EventBus` | Swap the engine for your own bus |
## Cargo features
| *(none)* | Pure kernel: frame model, bus (GOP cache, live QoS), auth/audit/health/cluster contracts |
| `codec` | H.264 NAL/SPS + AAC ADTS bitstream parsers |
| `storage-fs` | Filesystem `StorageBackend` adapter |
| `ingest` | Shared TCP accept loop, keyframe gate, rate limiter, NAL scan |
| `hls` | HLS/LL-HLS packager: `Muxer`/`Packager`, segmenter, playlists |
| `mpegts` | Native MPEG-TS `Muxer` (`MpegTsMuxer`) — playable `.ts` segments |
| `rtmp` | Working RTMP publish/play `ProtocolHandler` (`RtmpHandler`) |
| `record` | Segment/recording sink over a `StorageBackend` |
| `metrics` | Prometheus `Observer` implementation |
| `macros` | `#[derive(MediaSink)]`, `#[protocol("…")]` ergonomics |
| `full` | Everything above |
## Guarantees
- **Zero-copy** — one publish clones an `Arc<MediaFrame>` pointer per subscriber,
never the payload (`bytes::Bytes`).
- **Lock-free hot path** — `tokio::broadcast` fan-out; `ArcSwap` + atomics for
cached config, GOP head, and QoS counters.
- **`#![forbid(unsafe_code)]`** — no `unsafe` in the kernel, compiler-enforced.
- **No global state** — many engines per process; ideal for tests.
## Workspace layout
```text
arcly-stream/ the published library crate (this README documents it)
arcly-stream-macros/ optional proc-macros (the `macros` feature)
examples/
minimal_ingest/ implement MediaSource + Observer, drive the bus
custom_protocol/ implement ProtocolHandler + Engine::serve run loop
custom_protocol_plugin/ implement InboundProtocol, register alongside RTMP
codec_inspect/ per-codec random-access classification + HLS codec strings
rtmp_to_hls/ RTMP ingest → MPEG-TS HLS on disk (operational origin)
```
## Examples
```bash
cargo run -p minimal-ingest # synthetic publisher + resilient subscriber
cargo run -p custom-protocol # ProtocolHandler driven by Engine::serve
cargo run -p custom-protocol-plugin # custom InboundProtocol registered with RTMP
cargo run -p codec-inspect # codec::dispatch over H.264/H.265/AV1/VP9/VVC
cargo run -p rtmp-to-hls # real RTMP origin: ingest → playable HLS segments
cargo bench -p arcly-stream # broadcast fan-out throughput (criterion)
```
## Minimum Supported Rust Version
`arcly-stream` supports **Rust 1.85** and later. An MSRV bump is a minor-version
change.
## Documentation
Full API docs are on [docs.rs/arcly-stream](https://docs.rs/arcly-stream). See
[`BLUEPRINT.md`](BLUEPRINT.md) for the extraction analysis and the mapping from
the original 25-crate `stream-center` to this single crate.
## Contributing
Issues and merge requests are welcome. CI enforces `cargo fmt`, `clippy -D
warnings`, the full test suite across the feature matrix, and `cargo deny`; see
[`RELEASING.md`](RELEASING.md) for the release process and
[`CHANGELOG.md`](CHANGELOG.md) for the history.
## License
Licensed under the [MIT License](LICENSE).