kithara-stream
Bridges async producers (network) to sync consumers (decoders). Exposes:
- the sync
Sourcetrait that decoders read through; - the
Stream<T>wrapper that givesSourceaRead + Seekshape; - the pull-driven
Downloader(struct) andPeer(trait) — the workspace's unified HTTP transport; - the canonical media vocabulary:
AudioCodec,ContainerFormat,MediaInfo, used by every crate that talks about codecs or containers.
Usage
use ;
use File;
// `File` and `Hls` implement `StreamType`.
let stream = new.await?;
// `stream` implements `Read + Seek` via the underlying `Source`.
Architecture
flowchart LR
Peer["Peer impl<br/>(HlsPeer / FilePeer)"]
DL["Downloader<br/>(shared HTTP pool)"]
FC["FetchCmd<br/>writer + on_complete"]
SR["StorageResource<br/>(kithara-storage)"]
Stream["Stream<T><br/>(Read + Seek)"]
Source["Source impl<br/>wait_range / read_at"]
Peer -- "poll_next()" --> FC
FC --> DL
DL -- "writer(chunk)" --> SR
DL -- "on_complete()" --> Peer
Stream --> Source
Source -- "wait_range / read_at" --> SR
- A protocol peer (
HlsPeer,FilePeer) registers with the sharedDownloaderviaDownloader::register(peer)and emits batches ofFetchCmdfromPeer::poll_next(). - Each
FetchCmdcarries closures: a per-chunkwriterthat lands bytes intoStorageResource, and anon_completethat lets the peer advance its state. - The sync side reads through
Stream<T>, which delegates to aSourceimplementation.Source::wait_rangeblocks (with a bounded retry budget) until the requested byte range is present in the underlyingStorageResource.
Key Public Items
Canonical Media Types
Defined here as the single source of truth and re-exported by other crates:
AudioCodec— codec identifier (AacLc,Mp3,Flac, …)ContainerFormat— container identifier (Fmp4,MpegTs,Adts,Flac,Wav,Ogg, …)MediaInfo— format metadata: channels, codec, container, sample rate, variant index
Async-to-Sync Bridge
- The
Downloaderis async; peers andFetchCmdcallbacks run on the tokio runtime. FetchCmd.writer(chunk)writes bytes directly into theStorageResourceshared with the sync reader.- The sync reader inside
Stream<T>callsSource::wait_range(range), which polls the underlying storage with a bounded spin budget (MAX_WAIT_SPINS × WAIT_RANGE_TIMEOUT) before returningPending(NotReady). Source::read_at(offset, buf)performs the actual sync copy once the range is present.- Cancellation flows top-down through the cancel-token hierarchy described in
crates/kithara-play/README.md.
Features
Agent Guardrails
- Keep
kithara-streamgeneric. Do not move HLS-, file-, or surface-specific policy into shared contracts. - Treat
wait_range,read_at, and the pull-drivenPeercontract as the surface of this crate. Fix the owned invariant instead of papering over it with surface-specific hacks. - Shared media vocabulary stays here. Reuse
AudioCodec,ContainerFormat, andMediaInfoinstead of creating parallel cross-crate types.
Integration
Central orchestration layer. Protocol crates (kithara-file, kithara-hls) implement StreamType and dl::Peer. kithara-decode consumes Stream<T>. The Downloader is owned at the consumer-crate top (kithara-play::PlayerImpl, kithara-queue::Queue, etc.) so all peers share one HTTP pool. Other crates re-export AudioCodec, ContainerFormat, MediaInfo from here.