kithara-stream 0.0.1-alpha2

Streaming source-to-bytes layer with sync Read+Seek for audio playback.
Documentation
use std::{
    sync::Arc,
    task::{Context, Poll},
};

use futures::future::join_all;
use kithara_abr::{Abr, AbrHandle, AbrPeerId};
use kithara_events::{EventBus, RequestPriority};
use kithara_net::NetError;
use kithara_platform::{
    CancelGroup, RwLock,
    tokio::sync::{mpsc, oneshot},
};
use tokio_util::sync::CancellationToken;

use super::{cmd::FetchCmd, downloader::DownloaderInner, response::FetchResponse};

/// Protocol-agnostic contract for download orchestration.
///
/// Protocols (HLS, File) implement this trait. The
/// [`Downloader`](super::Downloader) queries peers through this
/// interface without knowing domain specifics.
///
/// All methods have defaults so that simple peers (File) only need
/// to exist — the Downloader drives everything through `execute()`.
/// Complex peers (HLS) override `poll_next` to let the Downloader
/// drive media segment downloads via per-command `writer`/`on_complete`
/// closures in [`FetchCmd`].
pub trait Peer: Abr {
    /// Yield the next batch of commands for the Downloader to execute.
    ///
    /// Returns `Ready(Some(batch))` with one or more self-contained
    /// [`FetchCmd`]s. Each command carries its own `writer` and
    /// `on_complete` closures — the Downloader calls them directly.
    ///
    /// Returns `Ready(None)` when the peer has no more work (stream
    /// ended). Returns `Pending` when waiting (throttle, idle).
    fn poll_next(&self, _cx: &mut Context<'_>) -> Poll<Option<Vec<FetchCmd>>> {
        Poll::Ready(None)
    }

    /// Peer-level priority. `High` = active playback track.
    fn priority(&self) -> RequestPriority {
        RequestPriority::Low
    }
}

/// How the Downloader delivers the response for a command.
pub(super) enum ResponseTarget {
    /// Imperative path: send via oneshot (`execute` / `batch`).
    Channel(oneshot::Sender<Result<FetchResponse, NetError>>),
    /// Streaming path: per-command `writer`/`on_complete` in [`FetchCmd`].
    Streaming,
}

/// Pair of an [`InternalCmd`] and the **peer-level** cancel token used
/// to discriminate `CancelReason` later in `deliver`.
///
/// The peer-level token lives in [`PeerInner::cancel`] (and the
/// Registry's `PeerEntry::peer_cancel`). Carrying a clone alongside
/// the cmd through the slot queue lets the spawned fetch task report
/// `CancelReason::PeerCancel` without holding a reference back into
/// the Registry.
pub(super) struct SlotEntry {
    pub(super) peer_cancel: CancellationToken,
    pub(super) cmd: InternalCmd,
}

/// Per-peer command sent through the channel to the downloader loop.
pub(super) struct InternalCmd {
    /// ABR peer identifier for bandwidth accounting after fetch completes.
    pub(super) peer_id: AbrPeerId,
    pub(super) cancel: CancelGroup,
    pub(super) cmd: FetchCmd,
    /// Wall-clock instant the cmd was placed into a priority slot.
    /// Used to compute `RequestStarted::wait_in_queue` later in the
    /// pipeline.
    pub(super) enqueued_at: kithara_platform::time::Instant,
    /// Bus of the peer that issued this command. Downloader publishes
    /// per-fetch `DownloaderEvent`s here.
    pub(super) bus: Option<EventBus>,
    /// Arena index of the owning peer. `None` when sent from `PeerHandle`
    /// (filled in by Registry on receipt).
    pub(super) peer: Option<thunderdome::Index>,
    /// Stable id allocated by the Downloader on enqueue. Carried in
    /// every `DownloaderEvent` for this fetch's lifecycle so subscribers
    /// can correlate Enqueued → Started → Completed/Failed/Cancelled.
    pub(super) request_id: kithara_events::RequestId,
    pub(super) priority: RequestPriority,
    pub(super) response: ResponseTarget,
}

/// Shared per-peer state. Cancel fires when the last clone is dropped.
struct PeerInner {
    /// ABR side of the double registration. Keeps the peer registered
    /// with the shared `AbrController` until the last `PeerHandle` drops
    /// (the handle's `Drop` calls `controller.unregister`).
    abr: AbrHandle,
    /// Keeps `DownloaderInner` (`HttpClient`, cancel, runtime) alive
    /// for this peer's lifetime.
    _pool: Arc<DownloaderInner>,
    /// Shared with the Registry's `PeerEntry`. Writing through
    /// [`PeerHandle::with_bus`] immediately makes the new bus visible
    /// to both the handle's own imperative path and the Registry's
    /// proactive `poll_next` path.
    bus: Arc<RwLock<Option<EventBus>>>,
    cancel: CancellationToken,
    cmd_tx: mpsc::Sender<InternalCmd>,
}

impl Drop for PeerInner {
    fn drop(&mut self) {
        self.cancel.cancel();
    }
}

/// Per-peer handle for submitting fetch commands and awaiting
/// responses.
///
/// Cheap to [`Clone`] (one Arc bump). When the last clone is dropped,
/// the peer-level cancel token fires, aborting all in-flight fetches
/// for this peer.
#[derive(Clone)]
pub struct PeerHandle {
    inner: Arc<PeerInner>,
}

impl std::fmt::Debug for PeerHandle {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PeerHandle").finish_non_exhaustive()
    }
}

impl PeerHandle {
    pub(super) fn new(
        pool: Arc<DownloaderInner>,
        cancel: CancellationToken,
        cmd_tx: mpsc::Sender<InternalCmd>,
        bus: Arc<RwLock<Option<EventBus>>>,
        abr: AbrHandle,
    ) -> Self {
        Self {
            inner: Arc::new(PeerInner {
                cancel,
                cmd_tx,
                bus,
                abr,
                _pool: pool,
            }),
        }
    }

    /// ABR-side handle attached at registration.
    #[must_use]
    pub fn abr(&self) -> &AbrHandle {
        &self.inner.abr
    }

    /// Submit a batch of fetch commands and await all responses.
    ///
    /// Commands execute in parallel. Results are returned **in array
    /// order**, not completion order.
    ///
    /// # Errors
    /// Individual commands may fail independently. Each slot in the
    /// returned `Vec` contains its own `Result`.
    pub async fn batch(&self, cmds: Vec<FetchCmd>) -> Vec<Result<FetchResponse, NetError>> {
        let mut receivers: Vec<Option<oneshot::Receiver<Result<FetchResponse, NetError>>>> =
            Vec::with_capacity(cmds.len());
        let bus = self.bus();
        let peer_id = self.inner.abr.peer_id();

        for cmd in cmds {
            let (internal, resp_rx) = self.make_imperative(cmd, bus.clone(), peer_id);
            if self.inner.cmd_tx.send(internal).await.is_err() {
                receivers.push(None);
                continue;
            }
            receivers.push(Some(resp_rx));
        }

        join_all(receivers.into_iter().map(|rx| async move {
            match rx {
                Some(resp_rx) => resp_rx.await.unwrap_or(Err(NetError::Cancelled)),
                None => Err(NetError::Cancelled),
            }
        }))
        .await
    }

    /// Currently attached bus, if any.
    #[must_use]
    pub fn bus(&self) -> Option<EventBus> {
        self.inner.bus.lock_sync_read().clone()
    }

    /// Peer-level cancellation token.
    ///
    /// Cancelling this token aborts all in-flight fetches for this
    /// peer. The cancel also fires automatically when the last clone
    /// of this handle is dropped.
    #[must_use]
    pub fn cancel(&self) -> CancellationToken {
        self.inner.cancel.clone()
    }

    /// Submit a single fetch command and await the response.
    ///
    /// Always runs at `High` priority — imperative requests are
    /// latency-sensitive.
    ///
    /// # Errors
    /// Returns [`NetError::Cancelled`] when the peer cancel fires,
    /// the downloader shuts down, or the HTTP request itself fails.
    pub async fn execute(&self, cmd: FetchCmd) -> Result<FetchResponse, NetError> {
        let (internal, resp_rx) = self.make_imperative(cmd, self.bus(), self.inner.abr.peer_id());
        self.inner
            .cmd_tx
            .send(internal)
            .await
            .map_err(|_| NetError::Cancelled)?;
        resp_rx.await.map_err(|_| NetError::Cancelled)?
    }

    /// Build a High-priority imperative `InternalCmd` paired with its
    /// response receiver. Shared by [`Self::execute`] and [`Self::batch`].
    fn make_imperative(
        &self,
        cmd: FetchCmd,
        bus: Option<EventBus>,
        peer_id: AbrPeerId,
    ) -> (
        InternalCmd,
        oneshot::Receiver<Result<FetchResponse, NetError>>,
    ) {
        let cancel = CancelGroup::new(vec![self.inner.cancel.child_token()]);
        let (resp_tx, resp_rx) = oneshot::channel();
        let request_id = self.inner._pool.next_request_id();
        let enqueued_at = kithara_platform::time::Instant::now();
        let internal = InternalCmd {
            cmd,
            cancel,
            bus,
            peer_id,
            request_id,
            enqueued_at,
            priority: RequestPriority::High,
            response: ResponseTarget::Channel(resp_tx),
            peer: None,
        };
        (internal, resp_rx)
    }

    /// ABR peer identifier for this handle.
    #[must_use]
    pub fn peer_id(&self) -> AbrPeerId {
        self.inner.abr.peer_id()
    }

    /// Attach an event bus so the Downloader can publish per-peer
    /// [`DownloaderEvent`](kithara_events::DownloaderEvent)s and the ABR
    /// controller can publish [`AbrEvent`](kithara_events::AbrEvent)s to
    /// it. Returns `self` so the call chains naturally after
    /// [`Downloader::register`](super::Downloader::register).
    #[must_use]
    pub fn with_bus(self, bus: EventBus) -> Self {
        *self.inner.bus.lock_sync_write() = Some(bus.clone());
        let _ = self.inner.abr.clone().with_bus(bus);
        self
    }
}