arcly-stream 0.1.1

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation

arcly-stream

A high-performance live-media streaming kernel for Rust.

Lock-free, zero-copy frame fan-out · instant-start GOP cache · pluggable HLS & recording · publish/play authorization · live QoS — with zero baked-in config, metrics singletons, or HTTP runtime.

crates.io docs.rs license MSRV


Why arcly-stream?

Building a streaming server means solving the same hard core every time: fanning one publisher's frames out to thousands of late-joining subscribers, without copying payloads, without locks on the hot path, and without dropping the slow ones silently. Everything else — which wire protocol you ingest, how you package egress, where you store segments, how you authorize and observe — is deployment-specific.

arcly-stream is exactly that hard core, extracted and hardened into a reusable kernel. You implement a handful of small traits for the parts unique to your system; the kernel owns the lock-free pub/sub bus, instant-start replay, back-pressure handling, lifecycle, and quality-of-service measurement.

It deliberately ships no opinion about your runtime: no global registries, no TOML schema, no axum server. Telemetry, authorization, and audit are injected traits with safe defaults — never singletons.

Architecture

Three tiers: a small always-on kernel, a feature-gated pure-Rust media plane, and wire protocols deferred to traits (so the kernel never drags in a native codec or transport you didn't ask for).

                          ┌───────────────────────────────────────────────┐
                          │                 CORE KERNEL                     │
                          │                (always compiled)                │
      publisher  ───────▶ │   Engine ─ StreamHandle                         │ ──────▶ subscribers
   (your protocol)        │     • broadcast fan-out (zero-copy Arc<Frame>)  │     (packager / SFU /
                          │     • GOP cache → instant-start replay          │      recorder / edge)
                          │     • live QoS (bitrate / fps)                  │
                          │   PublishRegistry · PlaybackRegistry · EventBus │
                          │   Observer · StreamAuthenticator · AuditSink    │
                          │   HealthRegistry · MediaFrame · StreamError     │
                          └───────────────────────────────────────────────┘
                                   ▲  implements traits          ▲  feature-gated, pure Rust
        ┌──────────────────────────┴───────────┐   ┌─────────────┴───────────────────────────┐
        │       WIRE PROTOCOLS & MUXERS         │   │              MEDIA PLANE                  │
        │   (rtmp/mpegts ready · rest traits)   │   │            (opt-in features)             │
        │  rtmp     RTMP publish/play  ✅       │   │  codec    H.264/H.265/AV1/VP9/VVC · AAC  │
        │  mpegts   MPEG-TS muxer      ✅       │   │  hls      Packager + keyframe segmenter  │
        │  InboundProtocol  SRT · WHIP · RTSP   │   │  record   RecordingSink (VOD/DVR)        │
        │  Transcoder/ClusterRelay  (traits)    │   │  ingest   TCP accept loop · rate limit   │
        └───────────────────────────────────────┘   └───────────────────────────────────────┘

The rtmp handler (publish and play over real RTMP) and the mpegts muxer (playable .ts segments) ship operational. Remaining transports (SRT, WHIP, RTSP) and ABR/cluster backends remain trait seams a host fills in.

Feature matrix

Capability Status How
Zero-copy broadcast fan-out Core StreamHandle::subscribe_resilient
Instant-start GOP replay Core AppSpec::gop_cache(frames) + replay_buffer()
Publish/play authorization Core StreamAuthenticator (permit-all default)
Live QoS (bitrate / fps) Core StreamHandle::qos() / metadata_snapshot()
Slow-subscriber eviction Core Subscription::max_lag + observer hooks
Idle-stream reaper Core EngineBuilder::idle_timeout
Coordinated graceful shutdown Core Engine::serve / serve_until_signal
Health & audit Core HealthRegistry, AuditPipeline/AuditSink
RTMP publish / play rtmp protocol::rtmp::RtmpHandler — real handshake, chunks, AMF0, FLV
MPEG-TS muxing (playable) mpegts packager::MpegTsMuxer — PAT/PMT/PES/PCR .ts
H.264 / H.265 / AV1 / VP9 / VVC / AAC parsing 🧩 codec* codec::{h264,h265,av1,vp9,vvc,aac} (pure Rust)
HLS / LL-HLS packaging 🧩 hls HlsSegmenter + HlsPlaylistStorageBackend
Recording (VOD / DVR) 🧩 record RecordingSink
Filesystem object storage 🧩 storage-fs FsStorage
Prometheus metrics 🧩 metrics PrometheusObserver
Ingest scaffolding 🧩 ingest TCP accept loop, keyframe gate, NAL scan
SRT / WebRTC (WHIP) / RTSP ingest 🧭 Deferred implement InboundProtocol
fMP4 / CMAF muxing 🧭 Deferred implement Muxer
Hardware transcoding / ABR 🧭 Deferred implement Transcoder
Edge federation 🧭 Deferred implement ClusterRelay

rtmp + mpegts are operational, native-Rust, and unsafe-free: you can ingest from OBS/FFmpeg and write playable HLS today. 🧭 Deferred means the seam exists as a documented trait. SRT, WebRTC, RTSP, fMP4/CMAF, and FFmpeg/NVENC transcoding need heavier native stacks and live in downstream crates or your application — the kernel stays lean.

Install

[dependencies]
arcly-stream = "0.1"

# Opt into the pure-Rust media plane as needed:
# arcly-stream = { version = "0.1", features = ["hls", "record", "codec"] }

# A full RTMP-in → HLS-out origin (everything you need to ingest + package):
# arcly-stream = { version = "0.1", features = ["rtmp", "mpegts", "hls", "storage-fs"] }

Quick start

Basic — publish and subscribe

use arcly_stream::prelude::*;

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .max_publishers(1_000)
        .application(AppSpec::new("live"))
        .build();

    let key = StreamKey::new("live", "demo");

    // A publisher claims the stream and pumps frames (zero-copy fan-out):
    let handle = engine.start_publish(&key).await?;

    // A subscriber (packager / recorder / SFU) drains them. `subscribe_resilient`
    // resynchronizes past `broadcast` lag instead of dying on a slow tick.
    let mut sub = engine.get_stream(&key)?.subscribe_resilient();
    tokio::spawn(async move {
        while let Some(frame) = sub.recv().await {
            // packetize `frame` …
            let _ = frame;
        }
    });

    handle.publish_frame(MediaFrame::new_video(
        0, 0, bytes::Bytes::from_static(b""), CodecId::H264, true,
    ))?;

    engine.end_publish(&key).await?;
    Ok(())
}

Advanced — instant-start, authorization, and HLS packaging

use arcly_stream::prelude::*;
use arcly_stream::packager::{HlsSegmenter, Packager};
use arcly_stream::storage::FsStorage;

// 1. A custom authorizer: only the right stream key may publish.
struct KeyAuth { secret: String }

#[async_trait]
impl StreamAuthenticator for KeyAuth {
    async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> arcly_stream::Result<()> {
        match creds.token.as_deref() {
            Some(t) if t == self.secret => Ok(()),
            _ => Err(StreamError::Unauthorized("bad publish key".into())),
        }
    }
}

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        // 120-frame keyframe-anchored GOP cache → sub-second join times:
        .application(AppSpec::new("live").gop_cache(120))
        .authenticator(KeyAuth { secret: "s3cr3t".into() })
        .idle_timeout(std::time::Duration::from_secs(30))
        .build();

    let key = StreamKey::new("live", "cam");
    let handle = engine
        .start_publish_authorized(&key, &Credentials::token("s3cr3t"))
        .await?;

    // 2. Package the live stream to HLS, writing segments + playlist to disk.
    //    `MpegTsMuxer` (feature `mpegts`) produces playable `.ts` segments.
    let mut hls = HlsSegmenter::new(
        arcly_stream::packager::MpegTsMuxer::new(),
        FsStorage::new("/var/hls"),
        "live/cam",
        /* target seconds */ 4,
        /* playlist window */ 6,
    );
    let mut sub = engine.get_stream(&key)?.subscribe_resilient();
    tokio::spawn(async move {
        while let Some(frame) = sub.recv().await {
            let _ = hls.push(&frame).await;
        }
        let _ = hls.finish().await;
    });

    // … feed `handle.publish_frame(..)` from your protocol parser …
    let _ = handle;
    Ok(())
}

Operational — a real RTMP-in → HLS-out origin

No hand-rolled protocol parser needed: the rtmp handler ingests from OBS or FFmpeg, and the mpegts muxer writes playable HLS. This is a complete live origin in ~20 lines.

use arcly_stream::prelude::*;
use arcly_stream::protocol::rtmp::RtmpHandler;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .application(AppSpec::new("live").gop_cache(120)) // instant-start replay
        .build();

    // `.with_playback(engine.clone())` enables RTMP play (egress) too.
    let rtmp = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
        .with_playback(engine.clone());

    // Publish:  ffmpeg -re -i in.mp4 -c copy -f flv rtmp://localhost/live/cam
    // Play:     ffplay rtmp://localhost/live/cam
    // (point an HlsSegmenter + MpegTsMuxer at the same stream for HLS egress)
    engine.serve(vec![Box::new(rtmp)], CancellationToken::new()).await
}

Run the bundled, fully-wired version (RTMP ingest and disk HLS) with examples/rtmp_to_hls:

cargo run -p rtmp-to-hls
# then: ffmpeg -re -i input.mp4 -c:v libx264 -c:a aac -f flv rtmp://localhost:1935/live/cam
# and play the HLS it writes under ./hls/live/cam/index.m3u8

Extending the Kernel: Implementing Custom Protocols

arcly-stream is a multi-protocol ingestion architecture: teaching the engine a new inbound wire protocol (RTSP, SRT, WebRTC WHIP/WHEP, your own) is one trait impl in your crate — the engine kernel never changes. The bundled RtmpHandler is just the first reference implementation of the same public trait you'll use.

Three public types (inbound) form the seam:

Type Role
InboundProtocol The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. Send + Sync + 'static.
IngestContext The cloneable handle your worker uses to reach the engine bus; hands out publish sessions.
PublishSession An RAII token for one live stream — every frame lands in the GOP cache + live QoS; dropping it frees the publish slot.
  your crate                          arcly-stream kernel
 ┌────────────────────────┐          ┌───────────────────────────────┐
 │ struct MyRtspHandler    │  serve   │ Engine (lock-free bus)        │
 │ impl InboundProtocol {  │◀─────────│  • broadcast fan-out          │
 │   async fn serve(ctx){  │          │  • GOP cache (instant start)  │
 │     ctx.open_publish()──┼────────▶ │  • live QoS counters          │
 │       .publish_frame()  │ frames   │  PublishRegistry              │
 │ } }                     │          │                               │
 └────────────────────────┘          └───────────────────────────────┘

Implement the trait, then register it on the builder next to the native RTMP handler — both run concurrently under one coordinated shutdown:

use arcly_stream::prelude::*;
use arcly_stream::inbound::{InboundProtocol, IngestContext};
use arcly_stream::protocol::rtmp::RtmpHandler;

struct MyProtocol { /* listener config, handshake state, … */ }

#[async_trait]
impl InboundProtocol for MyProtocol {
    fn name(&self) -> &'static str { "my-proto" }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken)
        -> arcly_stream::Result<()>
    {
        // 1. bind your listener, 2. accept + handshake, 3. resolve a StreamKey…
        let session = ctx.open_publish(StreamKey::new("live", "cam")).await?;
        // 4. bridge decoded access units → MediaFrame and publish:
        //    session.publish_frame(frame)?;   // → fan-out · GOP cache · QoS
        shutdown.cancelled().await;            // 5. wind down on shutdown
        session.finish().await                 //    (Drop releases it otherwise)
    }
}

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .application(AppSpec::new("live").gop_cache(120))
        .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap())) // native
        .protocol(MyProtocol { /**/ })                            // yours
        .build();
    engine.serve_registered(CancellationToken::new()).await
}

Runtime contract: workers are shared across tasks (Send + Sync + 'static), must observe shutdown and return promptly, and run on a Tokio runtime. Returning Err trips the engine's coordinated teardown, winding sibling workers down too. Existing ProtocolHandler impls keep working unchanged — a blanket bridge makes every one an InboundProtocol automatically.

A complete, runnable version (a custom protocol registered alongside RTMP) lives in examples/custom_protocol_plugin:

cargo run -p custom-protocol-plugin

Extension points (the traits you implement)

Trait Purpose
InboundProtocol Teach the engine a new wire protocol — own a listener, bridge frames (RTMP shipped; RTSP, SRT, WHIP… one trait impl away)
ProtocolHandler Legacy registry-only handler; auto-bridged to InboundProtocol
MediaSource / MediaSink Produce / consume frames (driven by Engine::pump_source)
StorageBackend Object storage for segments & recordings
Muxer Container byte-format for HLS segments (MPEG-TS shipped; fMP4/CMAF DIY)
Transcoder Decode/scale/encode into an ABR ladder
StreamAuthenticator Authorize publish / play (permit-all by default)
Observer / AuditSink / HealthCheck Telemetry, audit trail, readiness
ClusterRelay Origin/edge discovery & stream federation
PublishRegistry / PlaybackRegistry / EventBus Swap the engine for your own bus

Cargo features

Feature Effect
(none) Pure kernel: frame model, bus (GOP cache, live QoS), auth/audit/health/cluster contracts
codec H.264 NAL/SPS + AAC ADTS bitstream parsers
storage-fs Filesystem StorageBackend adapter
ingest Shared TCP accept loop, keyframe gate, rate limiter, NAL scan
hls HLS/LL-HLS packager: Muxer/Packager, segmenter, playlists
mpegts Native MPEG-TS Muxer (MpegTsMuxer) — playable .ts segments
rtmp Working RTMP publish/play ProtocolHandler (RtmpHandler)
record Segment/recording sink over a StorageBackend
metrics Prometheus Observer implementation
macros #[derive(MediaSink)], #[protocol("…")] ergonomics
full Everything above

Guarantees

  • Zero-copy — one publish clones an Arc<MediaFrame> pointer per subscriber, never the payload (bytes::Bytes).
  • Lock-free hot pathtokio::broadcast fan-out; ArcSwap + atomics for cached config, GOP head, and QoS counters.
  • #![forbid(unsafe_code)] — no unsafe in the kernel, compiler-enforced.
  • No global state — many engines per process; ideal for tests.

Workspace layout

arcly-stream/          the published library crate (this README documents it)
arcly-stream-macros/   optional proc-macros (the `macros` feature)
examples/
  minimal_ingest/         implement MediaSource + Observer, drive the bus
  custom_protocol/        implement ProtocolHandler + Engine::serve run loop
  custom_protocol_plugin/ implement InboundProtocol, register alongside RTMP
  codec_inspect/          per-codec random-access classification + HLS codec strings
  rtmp_to_hls/            RTMP ingest → MPEG-TS HLS on disk (operational origin)

Examples

cargo run -p minimal-ingest        # synthetic publisher + resilient subscriber
cargo run -p custom-protocol       # ProtocolHandler driven by Engine::serve
cargo run -p custom-protocol-plugin # custom InboundProtocol registered with RTMP
cargo run -p codec-inspect         # codec::dispatch over H.264/H.265/AV1/VP9/VVC
cargo run -p rtmp-to-hls        # real RTMP origin: ingest → playable HLS segments
cargo bench  -p arcly-stream    # broadcast fan-out throughput (criterion)

Minimum Supported Rust Version

arcly-stream supports Rust 1.85 and later. An MSRV bump is a minor-version change.

Documentation

Full API docs are on docs.rs/arcly-stream. See BLUEPRINT.md for the extraction analysis and the mapping from the original 25-crate stream-center to this single crate.

Contributing

Issues and merge requests are welcome. CI enforces cargo fmt, clippy -D warnings, the full test suite across the feature matrix, and cargo deny; see RELEASING.md for the release process and CHANGELOG.md for the history.

License

Licensed under the MIT License.