arcly-stream 0.1.1

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
<div align="center">

# arcly-stream

**A high-performance live-media streaming kernel for Rust.**

Lock-free, zero-copy frame fan-out · instant-start GOP cache · pluggable HLS &
recording · publish/play authorization · live QoS — with **zero** baked-in
config, metrics singletons, or HTTP runtime.

[![crates.io](https://img.shields.io/crates/v/arcly-stream.svg)](https://crates.io/crates/arcly-stream)
[![docs.rs](https://img.shields.io/docsrs/arcly-stream)](https://docs.rs/arcly-stream)
[![license](https://img.shields.io/crates/l/arcly-stream.svg)](#license)
[![MSRV](https://img.shields.io/badge/MSRV-1.85-blue.svg)](#minimum-supported-rust-version)

</div>

---

## Why arcly-stream?

Building a streaming server means solving the same hard core every time:
fanning one publisher's frames out to thousands of late-joining subscribers,
**without copying payloads, without locks on the hot path, and without dropping
the slow ones silently.** Everything else — which wire protocol you ingest, how
you package egress, where you store segments, how you authorize and observe — is
deployment-specific.

`arcly-stream` is exactly that hard core, extracted and hardened into a reusable
**kernel**. You implement a handful of small traits for the parts unique to your
system; the kernel owns the lock-free pub/sub bus, instant-start replay,
back-pressure handling, lifecycle, and quality-of-service measurement.

It deliberately ships **no opinion** about your runtime: no global registries, no
TOML schema, no axum server. Telemetry, authorization, and audit are *injected*
traits with safe defaults — never singletons.

## Architecture

Three tiers: a small always-on **kernel**, a feature-gated **pure-Rust media
plane**, and **wire protocols deferred to traits** (so the kernel never drags in
a native codec or transport you didn't ask for).

```text
                          ┌───────────────────────────────────────────────┐
                          │                 CORE KERNEL                     │
                          │                (always compiled)                │
      publisher  ───────▶ │   Engine ─ StreamHandle                         │ ──────▶ subscribers
   (your protocol)        │     • broadcast fan-out (zero-copy Arc<Frame>)  │     (packager / SFU /
                          │     • GOP cache → instant-start replay          │      recorder / edge)
                          │     • live QoS (bitrate / fps)                  │
                          │   PublishRegistry · PlaybackRegistry · EventBus │
                          │   Observer · StreamAuthenticator · AuditSink    │
                          │   HealthRegistry · MediaFrame · StreamError     │
                          └───────────────────────────────────────────────┘
                                   ▲  implements traits          ▲  feature-gated, pure Rust
        ┌──────────────────────────┴───────────┐   ┌─────────────┴───────────────────────────┐
        │       WIRE PROTOCOLS & MUXERS         │   │              MEDIA PLANE                  │
        │   (rtmp/mpegts ready · rest traits)   │   │            (opt-in features)             │
        │  rtmp     RTMP publish/play  ✅       │   │  codec    H.264/H.265/AV1/VP9/VVC · AAC  │
        │  mpegts   MPEG-TS muxer      ✅       │   │  hls      Packager + keyframe segmenter  │
        │  InboundProtocol  SRT · WHIP · RTSP   │   │  record   RecordingSink (VOD/DVR)        │
        │  Transcoder/ClusterRelay  (traits)    │   │  ingest   TCP accept loop · rate limit   │
        └───────────────────────────────────────┘   └───────────────────────────────────────┘
```

The `rtmp` handler (publish **and** play over real RTMP) and the `mpegts` muxer
(playable `.ts` segments) ship operational. Remaining transports (SRT, WHIP,
RTSP) and ABR/cluster backends remain trait seams a host fills in.

## Feature matrix

| Capability | Status | How |
|---|---|---|
| Zero-copy broadcast fan-out |**Core** | `StreamHandle::subscribe_resilient` |
| Instant-start GOP replay |**Core** | `AppSpec::gop_cache(frames)` + `replay_buffer()` |
| Publish/play authorization |**Core** | `StreamAuthenticator` (permit-all default) |
| Live QoS (bitrate / fps) |**Core** | `StreamHandle::qos()` / `metadata_snapshot()` |
| Slow-subscriber eviction |**Core** | `Subscription::max_lag` + observer hooks |
| Idle-stream reaper |**Core** | `EngineBuilder::idle_timeout` |
| Coordinated graceful shutdown |**Core** | `Engine::serve` / `serve_until_signal` |
| Health & audit |**Core** | `HealthRegistry`, `AuditPipeline`/`AuditSink` |
| **RTMP publish / play** |`rtmp` | `protocol::rtmp::RtmpHandler` — real handshake, chunks, AMF0, FLV |
| **MPEG-TS muxing (playable)** |`mpegts` | `packager::MpegTsMuxer` — PAT/PMT/PES/PCR `.ts` |
| H.264 / H.265 / AV1 / VP9 / VVC / AAC parsing | 🧩 `codec*` | `codec::{h264,h265,av1,vp9,vvc,aac}` (pure Rust) |
| HLS / LL-HLS packaging | 🧩 `hls` | `HlsSegmenter` + `HlsPlaylist``StorageBackend` |
| Recording (VOD / DVR) | 🧩 `record` | `RecordingSink` |
| Filesystem object storage | 🧩 `storage-fs` | `FsStorage` |
| Prometheus metrics | 🧩 `metrics` | `PrometheusObserver` |
| Ingest scaffolding | 🧩 `ingest` | TCP accept loop, keyframe gate, NAL scan |
| SRT / WebRTC (WHIP) / RTSP ingest | 🧭 **Deferred** | implement `InboundProtocol` |
| fMP4 / CMAF muxing | 🧭 **Deferred** | implement `Muxer` |
| Hardware transcoding / ABR | 🧭 **Deferred** | implement `Transcoder` |
| Edge federation | 🧭 **Deferred** | implement `ClusterRelay` |

>`rtmp` + `mpegts` are operational, native-Rust, and `unsafe`-free: you can
> ingest from OBS/FFmpeg and write playable HLS today.
> 🧭 **Deferred** means the *seam* exists as a documented trait. SRT, WebRTC,
> RTSP, fMP4/CMAF, and FFmpeg/NVENC transcoding need heavier native stacks and
> live in downstream crates or your application — the kernel stays lean.

## Install

```toml
[dependencies]
arcly-stream = "0.1"

# Opt into the pure-Rust media plane as needed:
# arcly-stream = { version = "0.1", features = ["hls", "record", "codec"] }

# A full RTMP-in → HLS-out origin (everything you need to ingest + package):
# arcly-stream = { version = "0.1", features = ["rtmp", "mpegts", "hls", "storage-fs"] }
```

## Quick start

### Basic — publish and subscribe

```rust
use arcly_stream::prelude::*;

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .max_publishers(1_000)
        .application(AppSpec::new("live"))
        .build();

    let key = StreamKey::new("live", "demo");

    // A publisher claims the stream and pumps frames (zero-copy fan-out):
    let handle = engine.start_publish(&key).await?;

    // A subscriber (packager / recorder / SFU) drains them. `subscribe_resilient`
    // resynchronizes past `broadcast` lag instead of dying on a slow tick.
    let mut sub = engine.get_stream(&key)?.subscribe_resilient();
    tokio::spawn(async move {
        while let Some(frame) = sub.recv().await {
            // packetize `frame` …
            let _ = frame;
        }
    });

    handle.publish_frame(MediaFrame::new_video(
        0, 0, bytes::Bytes::from_static(b"…"), CodecId::H264, true,
    ))?;

    engine.end_publish(&key).await?;
    Ok(())
}
```

### Advanced — instant-start, authorization, and HLS packaging

```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::packager::{HlsSegmenter, Packager};
use arcly_stream::storage::FsStorage;

// 1. A custom authorizer: only the right stream key may publish.
struct KeyAuth { secret: String }

#[async_trait]
impl StreamAuthenticator for KeyAuth {
    async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> arcly_stream::Result<()> {
        match creds.token.as_deref() {
            Some(t) if t == self.secret => Ok(()),
            _ => Err(StreamError::Unauthorized("bad publish key".into())),
        }
    }
}

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        // 120-frame keyframe-anchored GOP cache → sub-second join times:
        .application(AppSpec::new("live").gop_cache(120))
        .authenticator(KeyAuth { secret: "s3cr3t".into() })
        .idle_timeout(std::time::Duration::from_secs(30))
        .build();

    let key = StreamKey::new("live", "cam");
    let handle = engine
        .start_publish_authorized(&key, &Credentials::token("s3cr3t"))
        .await?;

    // 2. Package the live stream to HLS, writing segments + playlist to disk.
    //    `MpegTsMuxer` (feature `mpegts`) produces playable `.ts` segments.
    let mut hls = HlsSegmenter::new(
        arcly_stream::packager::MpegTsMuxer::new(),
        FsStorage::new("/var/hls"),
        "live/cam",
        /* target seconds */ 4,
        /* playlist window */ 6,
    );
    let mut sub = engine.get_stream(&key)?.subscribe_resilient();
    tokio::spawn(async move {
        while let Some(frame) = sub.recv().await {
            let _ = hls.push(&frame).await;
        }
        let _ = hls.finish().await;
    });

    // … feed `handle.publish_frame(..)` from your protocol parser …
    let _ = handle;
    Ok(())
}
```

### Operational — a real RTMP-in → HLS-out origin

No hand-rolled protocol parser needed: the `rtmp` handler ingests from OBS or
FFmpeg, and the `mpegts` muxer writes playable HLS. This is a complete live
origin in ~20 lines.

```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::protocol::rtmp::RtmpHandler;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .application(AppSpec::new("live").gop_cache(120)) // instant-start replay
        .build();

    // `.with_playback(engine.clone())` enables RTMP play (egress) too.
    let rtmp = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
        .with_playback(engine.clone());

    // Publish:  ffmpeg -re -i in.mp4 -c copy -f flv rtmp://localhost/live/cam
    // Play:     ffplay rtmp://localhost/live/cam
    // (point an HlsSegmenter + MpegTsMuxer at the same stream for HLS egress)
    engine.serve(vec![Box::new(rtmp)], CancellationToken::new()).await
}
```

Run the bundled, fully-wired version (RTMP ingest **and** disk HLS) with
[`examples/rtmp_to_hls`](examples/rtmp_to_hls):

```bash
cargo run -p rtmp-to-hls
# then: ffmpeg -re -i input.mp4 -c:v libx264 -c:a aac -f flv rtmp://localhost:1935/live/cam
# and play the HLS it writes under ./hls/live/cam/index.m3u8
```

## Extending the Kernel: Implementing Custom Protocols

`arcly-stream` is a **multi-protocol ingestion architecture**: teaching the engine
a new inbound wire protocol (RTSP, SRT, WebRTC WHIP/WHEP, your own) is one trait
impl in *your* crate — the engine kernel never changes. The bundled `RtmpHandler`
is just the first reference implementation of the same public trait you'll use.

Three public types ([`inbound`](https://docs.rs/arcly-stream/latest/arcly_stream/inbound/index.html)) form the seam:

| Type | Role |
|------|------|
| `InboundProtocol` | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. `Send + Sync + 'static`. |
| `IngestContext` | The cloneable handle your worker uses to reach the engine bus; hands out publish sessions. |
| `PublishSession` | An RAII token for one live stream — every frame lands in the GOP cache + live QoS; dropping it frees the publish slot. |

```text
  your crate                          arcly-stream kernel
 ┌────────────────────────┐          ┌───────────────────────────────┐
 │ struct MyRtspHandler    │  serve   │ Engine (lock-free bus)        │
 │ impl InboundProtocol {  │◀─────────│  • broadcast fan-out          │
 │   async fn serve(ctx){  │          │  • GOP cache (instant start)  │
 │     ctx.open_publish()──┼────────▶ │  • live QoS counters          │
 │       .publish_frame()  │ frames   │  PublishRegistry              │
 │ } }                     │          │                               │
 └────────────────────────┘          └───────────────────────────────┘
```

Implement the trait, then register it on the builder **next to** the native RTMP
handler — both run concurrently under one coordinated shutdown:

```rust,ignore
use arcly_stream::prelude::*;
use arcly_stream::inbound::{InboundProtocol, IngestContext};
use arcly_stream::protocol::rtmp::RtmpHandler;

struct MyProtocol { /* listener config, handshake state, … */ }

#[async_trait]
impl InboundProtocol for MyProtocol {
    fn name(&self) -> &'static str { "my-proto" }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken)
        -> arcly_stream::Result<()>
    {
        // 1. bind your listener, 2. accept + handshake, 3. resolve a StreamKey…
        let session = ctx.open_publish(StreamKey::new("live", "cam")).await?;
        // 4. bridge decoded access units → MediaFrame and publish:
        //    session.publish_frame(frame)?;   // → fan-out · GOP cache · QoS
        shutdown.cancelled().await;            // 5. wind down on shutdown
        session.finish().await                 //    (Drop releases it otherwise)
    }
}

#[tokio::main]
async fn main() -> arcly_stream::Result<()> {
    let engine = Engine::builder()
        .application(AppSpec::new("live").gop_cache(120))
        .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap())) // native
        .protocol(MyProtocol { /* … */ })                            // yours
        .build();
    engine.serve_registered(CancellationToken::new()).await
}
```

**Runtime contract:** workers are shared across tasks (`Send + Sync + 'static`),
must observe `shutdown` and return promptly, and run on a Tokio runtime. Returning
`Err` trips the engine's coordinated teardown, winding sibling workers down too.
Existing `ProtocolHandler` impls keep working unchanged — a blanket bridge makes
every one an `InboundProtocol` automatically.

A complete, runnable version (a custom protocol registered alongside RTMP) lives
in [`examples/custom_protocol_plugin`](examples/custom_protocol_plugin):

```bash
cargo run -p custom-protocol-plugin
```

## Extension points (the traits you implement)

| Trait | Purpose |
|-------|---------|
| `InboundProtocol` | **Teach the engine a new wire protocol** — own a listener, bridge frames (RTMP **shipped**; RTSP, SRT, WHIP… one trait impl away) |
| `ProtocolHandler` | Legacy registry-only handler; auto-bridged to `InboundProtocol` |
| `MediaSource` / `MediaSink` | Produce / consume frames (driven by `Engine::pump_source`) |
| `StorageBackend` | Object storage for segments & recordings |
| `Muxer` | Container byte-format for HLS segments (MPEG-TS **shipped**; fMP4/CMAF DIY) |
| `Transcoder` | Decode/scale/encode into an ABR ladder |
| `StreamAuthenticator` | Authorize publish / play (permit-all by default) |
| `Observer` / `AuditSink` / `HealthCheck` | Telemetry, audit trail, readiness |
| `ClusterRelay` | Origin/edge discovery & stream federation |
| `PublishRegistry` / `PlaybackRegistry` / `EventBus` | Swap the engine for your own bus |

## Cargo features

| Feature | Effect |
|---|---|
| *(none)* | Pure kernel: frame model, bus (GOP cache, live QoS), auth/audit/health/cluster contracts |
| `codec` | H.264 NAL/SPS + AAC ADTS bitstream parsers |
| `storage-fs` | Filesystem `StorageBackend` adapter |
| `ingest` | Shared TCP accept loop, keyframe gate, rate limiter, NAL scan |
| `hls` | HLS/LL-HLS packager: `Muxer`/`Packager`, segmenter, playlists |
| `mpegts` | Native MPEG-TS `Muxer` (`MpegTsMuxer`) — playable `.ts` segments |
| `rtmp` | Working RTMP publish/play `ProtocolHandler` (`RtmpHandler`) |
| `record` | Segment/recording sink over a `StorageBackend` |
| `metrics` | Prometheus `Observer` implementation |
| `macros` | `#[derive(MediaSink)]`, `#[protocol("…")]` ergonomics |
| `full` | Everything above |

## Guarantees

- **Zero-copy** — one publish clones an `Arc<MediaFrame>` pointer per subscriber,
  never the payload (`bytes::Bytes`).
- **Lock-free hot path**`tokio::broadcast` fan-out; `ArcSwap` + atomics for
  cached config, GOP head, and QoS counters.
- **`#![forbid(unsafe_code)]`** — no `unsafe` in the kernel, compiler-enforced.
- **No global state** — many engines per process; ideal for tests.

## Workspace layout

```text
arcly-stream/          the published library crate (this README documents it)
arcly-stream-macros/   optional proc-macros (the `macros` feature)
examples/
  minimal_ingest/         implement MediaSource + Observer, drive the bus
  custom_protocol/        implement ProtocolHandler + Engine::serve run loop
  custom_protocol_plugin/ implement InboundProtocol, register alongside RTMP
  codec_inspect/          per-codec random-access classification + HLS codec strings
  rtmp_to_hls/            RTMP ingest → MPEG-TS HLS on disk (operational origin)
```

## Examples

```bash
cargo run -p minimal-ingest        # synthetic publisher + resilient subscriber
cargo run -p custom-protocol       # ProtocolHandler driven by Engine::serve
cargo run -p custom-protocol-plugin # custom InboundProtocol registered with RTMP
cargo run -p codec-inspect         # codec::dispatch over H.264/H.265/AV1/VP9/VVC
cargo run -p rtmp-to-hls        # real RTMP origin: ingest → playable HLS segments
cargo bench  -p arcly-stream    # broadcast fan-out throughput (criterion)
```

## Minimum Supported Rust Version

`arcly-stream` supports **Rust 1.85** and later. An MSRV bump is a minor-version
change.

## Documentation

Full API docs are on [docs.rs/arcly-stream](https://docs.rs/arcly-stream). See
[`BLUEPRINT.md`](BLUEPRINT.md) for the extraction analysis and the mapping from
the original 25-crate `stream-center` to this single crate.

## Contributing

Issues and merge requests are welcome. CI enforces `cargo fmt`, `clippy -D
warnings`, the full test suite across the feature matrix, and `cargo deny`; see
[`RELEASING.md`](RELEASING.md) for the release process and
[`CHANGELOG.md`](CHANGELOG.md) for the history.

## License

Licensed under the [MIT License](LICENSE).