Skip to main content

kithara_stream/dl/
peer.rs

1use std::{
2    sync::Arc,
3    task::{Context, Poll},
4};
5
6use futures::future::join_all;
7use kithara_abr::{Abr, AbrHandle, AbrPeerId};
8use kithara_events::{EventBus, RequestPriority};
9use kithara_net::NetError;
10use kithara_platform::{
11    CancelGroup, RwLock,
12    tokio::sync::{mpsc, oneshot},
13};
14use tokio_util::sync::CancellationToken;
15
16use super::{cmd::FetchCmd, downloader::DownloaderInner, response::FetchResponse};
17
18/// Protocol-agnostic contract for download orchestration.
19///
20/// Protocols (HLS, File) implement this trait. The
21/// [`Downloader`](super::Downloader) queries peers through this
22/// interface without knowing domain specifics.
23///
24/// All methods have defaults so that simple peers (File) only need
25/// to exist — the Downloader drives everything through `execute()`.
26/// Complex peers (HLS) override `poll_next` to let the Downloader
27/// drive media segment downloads via per-command `writer`/`on_complete`
28/// closures in [`FetchCmd`].
29pub trait Peer: Abr {
30    /// Yield the next batch of commands for the Downloader to execute.
31    ///
32    /// Returns `Ready(Some(batch))` with one or more self-contained
33    /// [`FetchCmd`]s. Each command carries its own `writer` and
34    /// `on_complete` closures — the Downloader calls them directly.
35    ///
36    /// Returns `Ready(None)` when the peer has no more work (stream
37    /// ended). Returns `Pending` when waiting (throttle, idle).
38    fn poll_next(&self, _cx: &mut Context<'_>) -> Poll<Option<Vec<FetchCmd>>> {
39        Poll::Ready(None)
40    }
41
42    /// Peer-level priority. `High` = active playback track.
43    fn priority(&self) -> RequestPriority {
44        RequestPriority::Low
45    }
46}
47
48/// How the Downloader delivers the response for a command.
49pub(super) enum ResponseTarget {
50    /// Imperative path: send via oneshot (`execute` / `batch`).
51    Channel(oneshot::Sender<Result<FetchResponse, NetError>>),
52    /// Streaming path: per-command `writer`/`on_complete` in [`FetchCmd`].
53    Streaming,
54}
55
56/// Pair of an [`InternalCmd`] and the **peer-level** cancel token used
57/// to discriminate `CancelReason` later in `deliver`.
58///
59/// The peer-level token lives in [`PeerInner::cancel`] (and the
60/// Registry's `PeerEntry::peer_cancel`). Carrying a clone alongside
61/// the cmd through the slot queue lets the spawned fetch task report
62/// `CancelReason::PeerCancel` without holding a reference back into
63/// the Registry.
64pub(super) struct SlotEntry {
65    pub(super) peer_cancel: CancellationToken,
66    pub(super) cmd: InternalCmd,
67}
68
69/// Per-peer command sent through the channel to the downloader loop.
70pub(super) struct InternalCmd {
71    /// ABR peer identifier for bandwidth accounting after fetch completes.
72    pub(super) peer_id: AbrPeerId,
73    pub(super) cancel: CancelGroup,
74    pub(super) cmd: FetchCmd,
75    /// Wall-clock instant the cmd was placed into a priority slot.
76    /// Used to compute `RequestStarted::wait_in_queue` later in the
77    /// pipeline.
78    pub(super) enqueued_at: kithara_platform::time::Instant,
79    /// Bus of the peer that issued this command. Downloader publishes
80    /// per-fetch `DownloaderEvent`s here.
81    pub(super) bus: Option<EventBus>,
82    /// Arena index of the owning peer. `None` when sent from `PeerHandle`
83    /// (filled in by Registry on receipt).
84    pub(super) peer: Option<thunderdome::Index>,
85    /// Stable id allocated by the Downloader on enqueue. Carried in
86    /// every `DownloaderEvent` for this fetch's lifecycle so subscribers
87    /// can correlate Enqueued → Started → Completed/Failed/Cancelled.
88    pub(super) request_id: kithara_events::RequestId,
89    pub(super) priority: RequestPriority,
90    pub(super) response: ResponseTarget,
91}
92
93/// Shared per-peer state. Cancel fires when the last clone is dropped.
94struct PeerInner {
95    /// ABR side of the double registration. Keeps the peer registered
96    /// with the shared `AbrController` until the last `PeerHandle` drops
97    /// (the handle's `Drop` calls `controller.unregister`).
98    abr: AbrHandle,
99    /// Keeps `DownloaderInner` (`HttpClient`, cancel, runtime) alive
100    /// for this peer's lifetime.
101    _pool: Arc<DownloaderInner>,
102    /// Shared with the Registry's `PeerEntry`. Writing through
103    /// [`PeerHandle::with_bus`] immediately makes the new bus visible
104    /// to both the handle's own imperative path and the Registry's
105    /// proactive `poll_next` path.
106    bus: Arc<RwLock<Option<EventBus>>>,
107    cancel: CancellationToken,
108    cmd_tx: mpsc::Sender<InternalCmd>,
109}
110
111impl Drop for PeerInner {
112    fn drop(&mut self) {
113        self.cancel.cancel();
114    }
115}
116
117/// Per-peer handle for submitting fetch commands and awaiting
118/// responses.
119///
120/// Cheap to [`Clone`] (one Arc bump). When the last clone is dropped,
121/// the peer-level cancel token fires, aborting all in-flight fetches
122/// for this peer.
123#[derive(Clone)]
124pub struct PeerHandle {
125    inner: Arc<PeerInner>,
126}
127
128impl std::fmt::Debug for PeerHandle {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        f.debug_struct("PeerHandle").finish_non_exhaustive()
131    }
132}
133
134impl PeerHandle {
135    pub(super) fn new(
136        pool: Arc<DownloaderInner>,
137        cancel: CancellationToken,
138        cmd_tx: mpsc::Sender<InternalCmd>,
139        bus: Arc<RwLock<Option<EventBus>>>,
140        abr: AbrHandle,
141    ) -> Self {
142        Self {
143            inner: Arc::new(PeerInner {
144                cancel,
145                cmd_tx,
146                bus,
147                abr,
148                _pool: pool,
149            }),
150        }
151    }
152
153    /// ABR-side handle attached at registration.
154    #[must_use]
155    pub fn abr(&self) -> &AbrHandle {
156        &self.inner.abr
157    }
158
159    /// Submit a batch of fetch commands and await all responses.
160    ///
161    /// Commands execute in parallel. Results are returned **in array
162    /// order**, not completion order.
163    ///
164    /// # Errors
165    /// Individual commands may fail independently. Each slot in the
166    /// returned `Vec` contains its own `Result`.
167    pub async fn batch(&self, cmds: Vec<FetchCmd>) -> Vec<Result<FetchResponse, NetError>> {
168        let mut receivers: Vec<Option<oneshot::Receiver<Result<FetchResponse, NetError>>>> =
169            Vec::with_capacity(cmds.len());
170        let bus = self.bus();
171        let peer_id = self.inner.abr.peer_id();
172
173        for cmd in cmds {
174            let (internal, resp_rx) = self.make_imperative(cmd, bus.clone(), peer_id);
175            if self.inner.cmd_tx.send(internal).await.is_err() {
176                receivers.push(None);
177                continue;
178            }
179            receivers.push(Some(resp_rx));
180        }
181
182        join_all(receivers.into_iter().map(|rx| async move {
183            match rx {
184                Some(resp_rx) => resp_rx.await.unwrap_or(Err(NetError::Cancelled)),
185                None => Err(NetError::Cancelled),
186            }
187        }))
188        .await
189    }
190
191    /// Currently attached bus, if any.
192    #[must_use]
193    pub fn bus(&self) -> Option<EventBus> {
194        self.inner.bus.lock_sync_read().clone()
195    }
196
197    /// Peer-level cancellation token.
198    ///
199    /// Cancelling this token aborts all in-flight fetches for this
200    /// peer. The cancel also fires automatically when the last clone
201    /// of this handle is dropped.
202    #[must_use]
203    pub fn cancel(&self) -> CancellationToken {
204        self.inner.cancel.clone()
205    }
206
207    /// Submit a single fetch command and await the response.
208    ///
209    /// Always runs at `High` priority — imperative requests are
210    /// latency-sensitive.
211    ///
212    /// # Errors
213    /// Returns [`NetError::Cancelled`] when the peer cancel fires,
214    /// the downloader shuts down, or the HTTP request itself fails.
215    pub async fn execute(&self, cmd: FetchCmd) -> Result<FetchResponse, NetError> {
216        let (internal, resp_rx) = self.make_imperative(cmd, self.bus(), self.inner.abr.peer_id());
217        self.inner
218            .cmd_tx
219            .send(internal)
220            .await
221            .map_err(|_| NetError::Cancelled)?;
222        resp_rx.await.map_err(|_| NetError::Cancelled)?
223    }
224
225    /// Build a High-priority imperative `InternalCmd` paired with its
226    /// response receiver. Shared by [`Self::execute`] and [`Self::batch`].
227    fn make_imperative(
228        &self,
229        cmd: FetchCmd,
230        bus: Option<EventBus>,
231        peer_id: AbrPeerId,
232    ) -> (
233        InternalCmd,
234        oneshot::Receiver<Result<FetchResponse, NetError>>,
235    ) {
236        let cancel = CancelGroup::new(vec![self.inner.cancel.child_token()]);
237        let (resp_tx, resp_rx) = oneshot::channel();
238        let request_id = self.inner._pool.next_request_id();
239        let enqueued_at = kithara_platform::time::Instant::now();
240        let internal = InternalCmd {
241            cmd,
242            cancel,
243            bus,
244            peer_id,
245            request_id,
246            enqueued_at,
247            priority: RequestPriority::High,
248            response: ResponseTarget::Channel(resp_tx),
249            peer: None,
250        };
251        (internal, resp_rx)
252    }
253
254    /// ABR peer identifier for this handle.
255    #[must_use]
256    pub fn peer_id(&self) -> AbrPeerId {
257        self.inner.abr.peer_id()
258    }
259
260    /// Attach an event bus so the Downloader can publish per-peer
261    /// [`DownloaderEvent`](kithara_events::DownloaderEvent)s and the ABR
262    /// controller can publish [`AbrEvent`](kithara_events::AbrEvent)s to
263    /// it. Returns `self` so the call chains naturally after
264    /// [`Downloader::register`](super::Downloader::register).
265    #[must_use]
266    pub fn with_bus(self, bus: EventBus) -> Self {
267        *self.inner.bus.lock_sync_write() = Some(bus.clone());
268        let _ = self.inner.abr.clone().with_bus(bus);
269        self
270    }
271}