arcly-stream 0.1.0

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
<div align="center">

# arcly-stream

**A high-performance live-media streaming kernel for Rust.**

Lock-free, zero-copy frame fan-out · instant-start GOP cache · pluggable HLS &
recording · publish/play authorization · live QoS — with **zero** baked-in
config, metrics singletons, or HTTP runtime.

[![crates.io](https://img.shields.io/crates/v/arcly-stream.svg)](https://crates.io/crates/arcly-stream)
[![docs.rs](https://img.shields.io/docsrs/arcly-stream)](https://docs.rs/arcly-stream)
[![license](https://img.shields.io/crates/l/arcly-stream.svg)](#license)
[![MSRV](https://img.shields.io/badge/MSRV-1.85-blue.svg)](#minimum-supported-rust-version)

</div>

---

## Why arcly-stream?

Building a streaming server means solving the same hard core every time:
fanning one publisher's frames out to thousands of late-joining subscribers,
**without copying payloads, without locks on the hot path, and without dropping
the slow ones silently.** Everything else — which wire protocol you ingest, how
you package egress, where you store segments, how you authorize and observe — is
deployment-specific.

`arcly-stream` is exactly that hard core, extracted and hardened into a reusable
**kernel**. You implement a handful of small traits for the parts unique to your
system; the kernel owns the lock-free pub/sub bus, instant-start replay,
back-pressure handling, lifecycle, and quality-of-service measurement.

It deliberately ships **no opinion** about your runtime: no global registries, no
TOML schema, no axum server. Telemetry, authorization, and audit are *injected*
traits with safe defaults — never singletons.

## Architecture

Three tiers: a small always-on **kernel**, a feature-gated **pure-Rust media
plane**, and **wire protocols deferred to traits** (so the kernel never drags in
a native codec or transport you didn't ask for).

```text
                          ┌───────────────────────────────────────────────┐
                          │                 CORE KERNEL                     │
                          │                (always compiled)                │
      publisher  ───────▶ │   Engine ─ StreamHandle                         │ ──────▶ subscribers
   (your protocol)        │     • broadcast fan-out (zero-copy Arc<Frame>)  │     (packager / SFU /
                          │     • GOP cache → instant-start replay          │      recorder / edge)
                          │     • live QoS (bitrate / fps)                  │
                          │   PublishRegistry · PlaybackRegistry · EventBus │
                          │   Observer · StreamAuthenticator · AuditSink    │
                          │   HealthRegistry · MediaFrame · StreamError     │
                          └───────────────────────────────────────────────┘
                                   ▲  implements traits          ▲  feature-gated, pure Rust
        ┌──────────────────────────┴───────────┐   ┌─────────────┴───────────────────────────┐
        │       WIRE PROTOCOLS & MUXERS         │   │              MEDIA PLANE                  │
        │   (rtmp/mpegts ready · rest traits)   │   │            (opt-in features)             │
        │  rtmp     RTMP publish/play  ✅       │   │  codec    H.264/H.265/AV1/VP9/VVC · AAC  │
        │  mpegts   MPEG-TS muxer      ✅       │   │  hls      Packager + keyframe segmenter  │
        │  ProtocolHandler  SRT · WHIP · RTSP   │   │  record   RecordingSink (VOD/DVR)        │
        │  Transcoder/ClusterRelay  (traits)    │   │  ingest   TCP accept loop · rate limit   │
        └───────────────────────────────────────┘   └───────────────────────────────────────┘
```

The `rtmp` handler (publish **and** play over real RTMP) and the `mpegts` muxer
(playable `.ts` segments) ship operational. Remaining transports (SRT, WHIP,
RTSP) and ABR/cluster backends remain trait seams a host fills in.

## Feature matrix

| Capability | Status | How |
|---|---|---|
| Zero-copy broadcast fan-out |**Core** | `StreamHandle::subscribe_resilient` |
| Instant-start GOP replay |**Core** | `AppSpec::gop_cache(frames)` + `replay_buffer()` |
| Publish/play authorization |**Core** | `StreamAuthenticator` (permit-all default) |
| Live QoS (bitrate / fps) |**Core** | `StreamHandle::qos()` / `metadata_snapshot()` |
| Slow-subscriber eviction |**Core** | `Subscription::max_lag` + observer hooks |
| Idle-stream reaper |**Core** | `EngineBuilder::idle_timeout` |
| Coordinated graceful shutdown |**Core** | `Engine::serve` / `serve_until_signal` |
| Health & audit |**Core** | `HealthRegistry`, `AuditPipeline`/`AuditSink` |
| **RTMP publish / play** |`rtmp` | `protocol::rtmp::RtmpHandler` — real handshake, chunks, AMF0, FLV |
| **MPEG-TS muxing (playable)** |`mpegts` | `packager::MpegTsMuxer` — PAT/PMT/PES/PCR `.ts` |
| H.264 / H.265 / AV1 / VP9 / VVC / AAC parsing | 🧩 `codec*` | `codec::{h264,h265,av1,vp9,vvc,aac}` (pure Rust) |
| HLS / LL-HLS packaging | 🧩 `hls` | `HlsSegmenter` + `HlsPlaylist``StorageBackend` |
| Recording (VOD / DVR) | 🧩 `record` | `RecordingSink` |
| Filesystem object storage | 🧩 `storage-fs` | `FsStorage` |
| Prometheus metrics | 🧩 `metrics` | `PrometheusObserver` |
| Ingest scaffolding | 🧩 `ingest` | TCP accept loop, keyframe gate, NAL scan |
| SRT / WebRTC (WHIP) / RTSP ingest | 🧭 **Deferred** | implement `ProtocolHandler` |
| fMP4 / CMAF muxing | 🧭 **Deferred** | implement `Muxer` |
| Hardware transcoding / ABR | 🧭 **Deferred** | implement `Transcoder` |
| Edge federation | 🧭 **Deferred** | implement `ClusterRelay` |

>`rtmp` + `mpegts` are operational, native-Rust, and `unsafe`-free: you can
> ingest from OBS/FFmpeg and write playable HLS today.
> 🧭 **Deferred** means the *seam* exists as a documented trait. SRT, WebRTC,
> RTSP, fMP4/CMAF, and FFmpeg/NVENC transcoding need heavier native stacks and
> live in downstream crates or your application — the kernel stays lean.

## Install

```toml
[dependencies]
arcly-stream = "0.1"

# Opt into the pure-Rust media plane as needed:
# arcly-stream = { version = "0.1", features = ["hls", "record", "codec"] }

# A full RTMP-in → HLS-out origin (everything you need to ingest + package):
# arcly-stream = { version = "0.1", features = ["rtmp", "mpegts", "hls", "storage-fs"] }
```

## Quick start

### Basic — publish and subscribe

```rust
use arcly_stream::prelude::*;

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .max_publishers(1_000)
        .application(AppSpec::new("live"))
        .build();

    let key = StreamKey::new("live", "demo");

    // A publisher claims the stream and pumps frames (zero-copy fan-out):
    let handle = engine.start_publish(&key).await?;

    // A subscriber (packager / recorder / SFU) drains them. `subscribe_resilient`
    // resynchronizes past `broadcast` lag instead of dying on a slow tick.
    let mut sub = engine.get_stream(&key)?.subscribe_resilient();
    tokio::spawn(async move {
        while let Some(frame) = sub.recv().await {
            // packetize `frame` …
            let _ = frame;
        }
    });

    handle.publish_frame(MediaFrame::new_video(
        0, 0, bytes::Bytes::from_static(b"…"), CodecId::H264, true,
    ))?;

    engine.end_publish(&key).await?;
    Ok(())
}
```

### Advanced — instant-start, authorization, and HLS packaging

```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::packager::{HlsSegmenter, Packager};
use arcly_stream::storage::FsStorage;

// 1. A custom authorizer: only the right stream key may publish.
struct KeyAuth { secret: String }

#[async_trait]
impl StreamAuthenticator for KeyAuth {
    async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> arcly_stream::Result<()> {
        match creds.token.as_deref() {
            Some(t) if t == self.secret => Ok(()),
            _ => Err(StreamError::Unauthorized("bad publish key".into())),
        }
    }
}

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        // 120-frame keyframe-anchored GOP cache → sub-second join times:
        .application(AppSpec::new("live").gop_cache(120))
        .authenticator(KeyAuth { secret: "s3cr3t".into() })
        .idle_timeout(std::time::Duration::from_secs(30))
        .build();

    let key = StreamKey::new("live", "cam");
    let handle = engine
        .start_publish_authorized(&key, &Credentials::token("s3cr3t"))
        .await?;

    // 2. Package the live stream to HLS, writing segments + playlist to disk.
    //    `MpegTsMuxer` (feature `mpegts`) produces playable `.ts` segments.
    let mut hls = HlsSegmenter::new(
        arcly_stream::packager::MpegTsMuxer::new(),
        FsStorage::new("/var/hls"),
        "live/cam",
        /* target seconds */ 4,
        /* playlist window */ 6,
    );
    let mut sub = engine.get_stream(&key)?.subscribe_resilient();
    tokio::spawn(async move {
        while let Some(frame) = sub.recv().await {
            let _ = hls.push(&frame).await;
        }
        let _ = hls.finish().await;
    });

    // … feed `handle.publish_frame(..)` from your protocol parser …
    let _ = handle;
    Ok(())
}
```

### Operational — a real RTMP-in → HLS-out origin

No hand-rolled protocol parser needed: the `rtmp` handler ingests from OBS or
FFmpeg, and the `mpegts` muxer writes playable HLS. This is a complete live
origin in ~20 lines.

```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::protocol::rtmp::RtmpHandler;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .application(AppSpec::new("live").gop_cache(120)) // instant-start replay
        .build();

    // `.with_playback(engine.clone())` enables RTMP play (egress) too.
    let rtmp = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
        .with_playback(engine.clone());

    // Publish:  ffmpeg -re -i in.mp4 -c copy -f flv rtmp://localhost/live/cam
    // Play:     ffplay rtmp://localhost/live/cam
    // (point an HlsSegmenter + MpegTsMuxer at the same stream for HLS egress)
    engine.serve(vec![Box::new(rtmp)], CancellationToken::new()).await
}
```

Run the bundled, fully-wired version (RTMP ingest **and** disk HLS) with
[`examples/rtmp_to_hls`](examples/rtmp_to_hls):

```bash
cargo run -p rtmp-to-hls
# then: ffmpeg -re -i input.mp4 -c:v libx264 -c:a aac -f flv rtmp://localhost:1935/live/cam
# and play the HLS it writes under ./hls/live/cam/index.m3u8
```

## Extension points (the traits you implement)

| Trait | Purpose |
|-------|---------|
| `ProtocolHandler` | Accept connections, drive the publish lifecycle (RTMP **shipped**; SRT, WHIP… DIY) |
| `MediaSource` / `MediaSink` | Produce / consume frames (driven by `Engine::pump_source`) |
| `StorageBackend` | Object storage for segments & recordings |
| `Muxer` | Container byte-format for HLS segments (MPEG-TS **shipped**; fMP4/CMAF DIY) |
| `Transcoder` | Decode/scale/encode into an ABR ladder |
| `StreamAuthenticator` | Authorize publish / play (permit-all by default) |
| `Observer` / `AuditSink` / `HealthCheck` | Telemetry, audit trail, readiness |
| `ClusterRelay` | Origin/edge discovery & stream federation |
| `PublishRegistry` / `PlaybackRegistry` / `EventBus` | Swap the engine for your own bus |

## Cargo features

| Feature | Effect |
|---|---|
| *(none)* | Pure kernel: frame model, bus (GOP cache, live QoS), auth/audit/health/cluster contracts |
| `codec` | H.264 NAL/SPS + AAC ADTS bitstream parsers |
| `storage-fs` | Filesystem `StorageBackend` adapter |
| `ingest` | Shared TCP accept loop, keyframe gate, rate limiter, NAL scan |
| `hls` | HLS/LL-HLS packager: `Muxer`/`Packager`, segmenter, playlists |
| `mpegts` | Native MPEG-TS `Muxer` (`MpegTsMuxer`) — playable `.ts` segments |
| `rtmp` | Working RTMP publish/play `ProtocolHandler` (`RtmpHandler`) |
| `record` | Segment/recording sink over a `StorageBackend` |
| `metrics` | Prometheus `Observer` implementation |
| `macros` | `#[derive(MediaSink)]`, `#[protocol("…")]` ergonomics |
| `full` | Everything above |

## Guarantees

- **Zero-copy** — one publish clones an `Arc<MediaFrame>` pointer per subscriber,
  never the payload (`bytes::Bytes`).
- **Lock-free hot path**`tokio::broadcast` fan-out; `ArcSwap` + atomics for
  cached config, GOP head, and QoS counters.
- **`#![forbid(unsafe_code)]`** — no `unsafe` in the kernel, compiler-enforced.
- **No global state** — many engines per process; ideal for tests.

## Workspace layout

```text
arcly-stream/          the published library crate (this README documents it)
arcly-stream-macros/   optional proc-macros (the `macros` feature)
examples/
  minimal_ingest/      implement MediaSource + Observer, drive the bus
  custom_protocol/     implement ProtocolHandler + Engine::serve run loop
  codec_inspect/       per-codec random-access classification + HLS codec strings
  rtmp_to_hls/         RTMP ingest → MPEG-TS HLS on disk (operational origin)
```

## Examples

```bash
cargo run -p minimal-ingest     # synthetic publisher + resilient subscriber
cargo run -p custom-protocol    # ProtocolHandler driven by Engine::serve
cargo run -p codec-inspect      # codec::dispatch over H.264/H.265/AV1/VP9/VVC
cargo run -p rtmp-to-hls        # real RTMP origin: ingest → playable HLS segments
cargo bench  -p arcly-stream    # broadcast fan-out throughput (criterion)
```

## Minimum Supported Rust Version

`arcly-stream` supports **Rust 1.85** and later. An MSRV bump is a minor-version
change.

## Documentation

Full API docs are on [docs.rs/arcly-stream](https://docs.rs/arcly-stream). See
[`BLUEPRINT.md`](BLUEPRINT.md) for the extraction analysis and the mapping from
the original 25-crate `stream-center` to this single crate.

## Contributing

Issues and merge requests are welcome. CI enforces `cargo fmt`, `clippy -D
warnings`, the full test suite across the feature matrix, and `cargo deny`; see
[`RELEASING.md`](RELEASING.md) for the release process and
[`CHANGELOG.md`](CHANGELOG.md) for the history.

## License

Licensed under the [MIT License](LICENSE).