kithara-audio
Audio pipeline with decoding, effects chain, and sample rate conversion. Runs a shared OS thread (AudioWorker) for blocking decode/process work and bridges it to the caller via ringbuf lock-free ring buffers. Audio<S> is the main entry point; multiple tracks share one worker thread via AudioWorkerHandle.
Usage
use ;
use GaplessMode;
use ;
use Stream;
let audio_config = builder
.stream_config
.host_sample_rate
.resampler_quality
.gapless_mode
.build;
let mut audio = new.await?;
AudioConfig is a bon builder; the fields shown above are the most common knobs. The exact builder method names match the field names on AudioConfig.
Threading model
flowchart TB
subgraph "Consumer Thread"
App["Application code"]
AS["Audio<S><br/>(impl PcmReader)"]
App -- "read(buf)" --> AS
end
subgraph "AudioWorker (shared OS thread)"
Sched["runtime::Scheduler"]
DN["DecoderNode<br/>(per-track impl Node)"]
Resampler
Effects["effects/<br/>(per-track AudioEffect chain)"]
Sched --> DN --> Effects --> Resampler
end
subgraph "Downloader (tokio)"
DL["kithara-stream::dl::Downloader"]
end
subgraph "Shared state"
SR["StorageResource<br/>(kithara-storage)"]
Bus["EventBus<br/>(kithara-events)"]
end
DL -- "fetch + writer()" --> SR
DN -- "wait_range / read_at" --> SR
Resampler -- "PcmChunk" --> Ring["ringbuf::HeapRb<PcmChunk>"]
AS -- "try_pop()" --> Ring
DN -- "AudioEvent" --> Bus
Bus --> App
- AudioWorker (shared OS thread): an internal priority scheduler in
runtime/ticks each registered track. Each track is a singleNode(DecoderNode) — effects run as direct operator calls inside the node, not as separateNodes with ring buffers between them. - Downloader (tokio): lives in
kithara-stream::dl. It owns the HTTP pool and writes bytes directly into theStorageResourcetheDecoderNodereads from. The downloader is not spawned bykithara-audio. - Ring: a lock-free
ringbuf::HeapRb<PcmChunk>carries processed PCM from the worker to the consumer; backpressure is enforced by the ring's capacity and anOutletoverflow slot. - Trash ring (spent-chunk return): the consumer (
Audio) runs on the caller's real-time audio thread, so it must neverfree. Returning aPcmChunk's pooled buffer tokithara-bufpoolcan deallocate (shard full, or trim), so the consumer never drops a consumed chunk: it pushes every spent chunk back through a second lock-free ring, andDecoderNode::drain_trashdrops them on the worker thread on its next tick. The ring is sizedpcm_buffer_chunks + 2— enough to absorb a seek that drains the whole forward ring at once — so the real-time push is infallible and no buffer is ever freed on the audio thread. - Events: every layer publishes into a unified
EventBus(AudioEvent,HlsEvent,FileEvent, ABR events). - Epoch-based seek invalidation: each seek bumps an
AtomicU64epoch; stale chunks tagged with an older epoch are dropped before reaching the ring.
Pipeline Architecture
flowchart LR
ST["Stream<T><br/>(Read + Seek)"]
DF["DecoderFactory<br/>Box<dyn Decoder>"]
Node["DecoderNode<br/>(impl Node, runtime/)"]
AW["AudioWorker<br/>(shared OS thread)"]
Ring["ringbuf<br/>(lock-free)"]
A["Audio<S><br/>(impl PcmReader)"]
ST --> DF --> Node --> AW --> Ring --> A
Resampler Quality Levels
Format Change Handling
On an ABR variant switch, the DecoderNode detects the format change via Source::media_info() polling and then:
- Uses the variant fence on the source to prevent cross-variant reads.
- Seeks to the first segment of the new variant (where init data lives).
- Recreates the decoder via
DecoderFactory. - Resets the effects chain to avoid audio artifacts.
Decoder recreate policy
- Decoder is not recreated on every seek.
- Decoder is recreated when a stream format changes (codec/container boundary) or when post-seek decode reports a recoverable format mismatch.
- Recreate path is metadata-first (
MediaInfo) with native Symphonia probe fallback from a fresh source. - Decoder recreate always uses seek target anchor/base offset from timeline/source, so new decoder starts from stream timeline truth.
Epoch-Based Seek
On seek, epoch is incremented atomically. The worker tags each decoded chunk with the current epoch. The consumer discards stale chunks (old epoch), preventing leftover data from reaching output after a seek.
Agent guardrails
- Node Architecture: A track is represented by a single
Nodeimplementation (DecoderNode), stored in the shared scheduler asBox<dyn Node>throughruntime/. - Operators vs Nodes: Audio effects are implemented as operators (
AudioEffect) that are called directly within the track'sNode. We do not use separateNodes or ring buffers between effects. - Buffers: If a backpressure boundary or rate-matching is needed (e.g. between the worker and the audio callback), a separate buffer
Nodeshould be introduced explicitly. - Push with Backpressure: Producer nodes call
Outlet::try_pushdirectly. The outlet has a built-in single-slot overflow that absorbs one backpressure miss per tick, so a producer that emits at most one chunk per tick treatstry_pushas infallible. Each tick begins withOutlet::flush()to forward the parked item to the ring once the consumer drains it; if the ring is still full the node returnsTickResult::Waiting. Under normal operation, the source's FSM is ticked every pass. However, if the outlet is completely saturated (both the ring buffer and the overflow slot are full), the node will returnWaitingimmediately without ticking the FSM. This provides strict backpressure, pausing all internal state transitions (including seeks) until the consumer drains the ring. - Cancellation: Do not use
CancellationTokeninsideNodeimplementations. Cancellation is handled centrally by callingworker.unregister_track(...), which triggers the scheduler to callNode::on_cancel(). - Track lifecycle: A
NodereturnsTickResult::Doneonly when truly terminal (e.g.TrackStep::Failed). EOF is not terminal — the track stays alive so a subsequent seek can re-arm it; idle ticks just returnTickResult::Waiting. kithara-audioowns decoder lifecycle, seek or session state, effects reset timing, and stale chunk invalidation.- Prefer explicit FSM or session objects for multi-step control flow. Avoid scattering new
pending_*or shadow flags across worker, source, and consumer layers. - Audio should consume source contracts, not reconstruct HLS or file policy from protocol-specific heuristics.
Features
Integration
Sits between kithara-decode and the consumer (cpal via Firewheel inside kithara-play, or custom PCM readers). Depends on kithara-stream for Stream<T> and Source, kithara-bufpool for zero-allocation PCM buffers, kithara-decode for the decoder factory, kithara-events for the EventBus, and kithara-platform for cross-platform sync types.