kithara_stream/dl/peer.rs
1use std::{
2 sync::Arc,
3 task::{Context, Poll},
4};
5
6use futures::future::join_all;
7use kithara_abr::{Abr, AbrHandle, AbrPeerId};
8use kithara_events::{EventBus, RequestPriority};
9use kithara_net::NetError;
10use kithara_platform::{
11 CancelGroup, RwLock,
12 tokio::sync::{mpsc, oneshot},
13};
14use tokio_util::sync::CancellationToken;
15
16use super::{cmd::FetchCmd, downloader::DownloaderInner, response::FetchResponse};
17
18/// Protocol-agnostic contract for download orchestration.
19///
20/// Protocols (HLS, File) implement this trait. The
21/// [`Downloader`](super::Downloader) queries peers through this
22/// interface without knowing domain specifics.
23///
24/// All methods have defaults so that simple peers (File) only need
25/// to exist — the Downloader drives everything through `execute()`.
26/// Complex peers (HLS) override `poll_next` to let the Downloader
27/// drive media segment downloads via per-command `writer`/`on_complete`
28/// closures in [`FetchCmd`].
29pub trait Peer: Abr {
30 /// Yield the next batch of commands for the Downloader to execute.
31 ///
32 /// Returns `Ready(Some(batch))` with one or more self-contained
33 /// [`FetchCmd`]s. Each command carries its own `writer` and
34 /// `on_complete` closures — the Downloader calls them directly.
35 ///
36 /// Returns `Ready(None)` when the peer has no more work (stream
37 /// ended). Returns `Pending` when waiting (throttle, idle).
38 fn poll_next(&self, _cx: &mut Context<'_>) -> Poll<Option<Vec<FetchCmd>>> {
39 Poll::Ready(None)
40 }
41
42 /// Peer-level priority. `High` = active playback track.
43 fn priority(&self) -> RequestPriority {
44 RequestPriority::Low
45 }
46}
47
48/// How the Downloader delivers the response for a command.
49pub(super) enum ResponseTarget {
50 /// Imperative path: send via oneshot (`execute` / `batch`).
51 Channel(oneshot::Sender<Result<FetchResponse, NetError>>),
52 /// Streaming path: per-command `writer`/`on_complete` in [`FetchCmd`].
53 Streaming,
54}
55
56/// Pair of an [`InternalCmd`] and the **peer-level** cancel token used
57/// to discriminate `CancelReason` later in `deliver`.
58///
59/// The peer-level token lives in [`PeerInner::cancel`] (and the
60/// Registry's `PeerEntry::peer_cancel`). Carrying a clone alongside
61/// the cmd through the slot queue lets the spawned fetch task report
62/// `CancelReason::PeerCancel` without holding a reference back into
63/// the Registry.
64pub(super) struct SlotEntry {
65 pub(super) peer_cancel: CancellationToken,
66 pub(super) cmd: InternalCmd,
67}
68
69/// Per-peer command sent through the channel to the downloader loop.
70pub(super) struct InternalCmd {
71 /// ABR peer identifier for bandwidth accounting after fetch completes.
72 pub(super) peer_id: AbrPeerId,
73 pub(super) cancel: CancelGroup,
74 pub(super) cmd: FetchCmd,
75 /// Wall-clock instant the cmd was placed into a priority slot.
76 /// Used to compute `RequestStarted::wait_in_queue` later in the
77 /// pipeline.
78 pub(super) enqueued_at: kithara_platform::time::Instant,
79 /// Bus of the peer that issued this command. Downloader publishes
80 /// per-fetch `DownloaderEvent`s here.
81 pub(super) bus: Option<EventBus>,
82 /// Arena index of the owning peer. `None` when sent from `PeerHandle`
83 /// (filled in by Registry on receipt).
84 pub(super) peer: Option<thunderdome::Index>,
85 /// Stable id allocated by the Downloader on enqueue. Carried in
86 /// every `DownloaderEvent` for this fetch's lifecycle so subscribers
87 /// can correlate Enqueued → Started → Completed/Failed/Cancelled.
88 pub(super) request_id: kithara_events::RequestId,
89 pub(super) priority: RequestPriority,
90 pub(super) response: ResponseTarget,
91}
92
93/// Shared per-peer state. Cancel fires when the last clone is dropped.
94struct PeerInner {
95 /// ABR side of the double registration. Keeps the peer registered
96 /// with the shared `AbrController` until the last `PeerHandle` drops
97 /// (the handle's `Drop` calls `controller.unregister`).
98 abr: AbrHandle,
99 /// Keeps `DownloaderInner` (`HttpClient`, cancel, runtime) alive
100 /// for this peer's lifetime.
101 _pool: Arc<DownloaderInner>,
102 /// Shared with the Registry's `PeerEntry`. Writing through
103 /// [`PeerHandle::with_bus`] immediately makes the new bus visible
104 /// to both the handle's own imperative path and the Registry's
105 /// proactive `poll_next` path.
106 bus: Arc<RwLock<Option<EventBus>>>,
107 cancel: CancellationToken,
108 cmd_tx: mpsc::Sender<InternalCmd>,
109}
110
111impl Drop for PeerInner {
112 fn drop(&mut self) {
113 self.cancel.cancel();
114 }
115}
116
117/// Per-peer handle for submitting fetch commands and awaiting
118/// responses.
119///
120/// Cheap to [`Clone`] (one Arc bump). When the last clone is dropped,
121/// the peer-level cancel token fires, aborting all in-flight fetches
122/// for this peer.
123#[derive(Clone)]
124pub struct PeerHandle {
125 inner: Arc<PeerInner>,
126}
127
128impl std::fmt::Debug for PeerHandle {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 f.debug_struct("PeerHandle").finish_non_exhaustive()
131 }
132}
133
134impl PeerHandle {
135 pub(super) fn new(
136 pool: Arc<DownloaderInner>,
137 cancel: CancellationToken,
138 cmd_tx: mpsc::Sender<InternalCmd>,
139 bus: Arc<RwLock<Option<EventBus>>>,
140 abr: AbrHandle,
141 ) -> Self {
142 Self {
143 inner: Arc::new(PeerInner {
144 cancel,
145 cmd_tx,
146 bus,
147 abr,
148 _pool: pool,
149 }),
150 }
151 }
152
153 /// ABR-side handle attached at registration.
154 #[must_use]
155 pub fn abr(&self) -> &AbrHandle {
156 &self.inner.abr
157 }
158
159 /// Submit a batch of fetch commands and await all responses.
160 ///
161 /// Commands execute in parallel. Results are returned **in array
162 /// order**, not completion order.
163 ///
164 /// # Errors
165 /// Individual commands may fail independently. Each slot in the
166 /// returned `Vec` contains its own `Result`.
167 pub async fn batch(&self, cmds: Vec<FetchCmd>) -> Vec<Result<FetchResponse, NetError>> {
168 let mut receivers: Vec<Option<oneshot::Receiver<Result<FetchResponse, NetError>>>> =
169 Vec::with_capacity(cmds.len());
170 let bus = self.bus();
171 let peer_id = self.inner.abr.peer_id();
172
173 for cmd in cmds {
174 let (internal, resp_rx) = self.make_imperative(cmd, bus.clone(), peer_id);
175 if self.inner.cmd_tx.send(internal).await.is_err() {
176 receivers.push(None);
177 continue;
178 }
179 receivers.push(Some(resp_rx));
180 }
181
182 join_all(receivers.into_iter().map(|rx| async move {
183 match rx {
184 Some(resp_rx) => resp_rx.await.unwrap_or(Err(NetError::Cancelled)),
185 None => Err(NetError::Cancelled),
186 }
187 }))
188 .await
189 }
190
191 /// Currently attached bus, if any.
192 #[must_use]
193 pub fn bus(&self) -> Option<EventBus> {
194 self.inner.bus.lock_sync_read().clone()
195 }
196
197 /// Peer-level cancellation token.
198 ///
199 /// Cancelling this token aborts all in-flight fetches for this
200 /// peer. The cancel also fires automatically when the last clone
201 /// of this handle is dropped.
202 #[must_use]
203 pub fn cancel(&self) -> CancellationToken {
204 self.inner.cancel.clone()
205 }
206
207 /// Submit a single fetch command and await the response.
208 ///
209 /// Always runs at `High` priority — imperative requests are
210 /// latency-sensitive.
211 ///
212 /// # Errors
213 /// Returns [`NetError::Cancelled`] when the peer cancel fires,
214 /// the downloader shuts down, or the HTTP request itself fails.
215 pub async fn execute(&self, cmd: FetchCmd) -> Result<FetchResponse, NetError> {
216 let (internal, resp_rx) = self.make_imperative(cmd, self.bus(), self.inner.abr.peer_id());
217 self.inner
218 .cmd_tx
219 .send(internal)
220 .await
221 .map_err(|_| NetError::Cancelled)?;
222 resp_rx.await.map_err(|_| NetError::Cancelled)?
223 }
224
225 /// Build a High-priority imperative `InternalCmd` paired with its
226 /// response receiver. Shared by [`Self::execute`] and [`Self::batch`].
227 fn make_imperative(
228 &self,
229 cmd: FetchCmd,
230 bus: Option<EventBus>,
231 peer_id: AbrPeerId,
232 ) -> (
233 InternalCmd,
234 oneshot::Receiver<Result<FetchResponse, NetError>>,
235 ) {
236 let cancel = CancelGroup::new(vec![self.inner.cancel.child_token()]);
237 let (resp_tx, resp_rx) = oneshot::channel();
238 let request_id = self.inner._pool.next_request_id();
239 let enqueued_at = kithara_platform::time::Instant::now();
240 let internal = InternalCmd {
241 cmd,
242 cancel,
243 bus,
244 peer_id,
245 request_id,
246 enqueued_at,
247 priority: RequestPriority::High,
248 response: ResponseTarget::Channel(resp_tx),
249 peer: None,
250 };
251 (internal, resp_rx)
252 }
253
254 /// ABR peer identifier for this handle.
255 #[must_use]
256 pub fn peer_id(&self) -> AbrPeerId {
257 self.inner.abr.peer_id()
258 }
259
260 /// Attach an event bus so the Downloader can publish per-peer
261 /// [`DownloaderEvent`](kithara_events::DownloaderEvent)s and the ABR
262 /// controller can publish [`AbrEvent`](kithara_events::AbrEvent)s to
263 /// it. Returns `self` so the call chains naturally after
264 /// [`Downloader::register`](super::Downloader::register).
265 #[must_use]
266 pub fn with_bus(self, bus: EventBus) -> Self {
267 *self.inner.bus.lock_sync_write() = Some(bus.clone());
268 let _ = self.inner.abr.clone().with_bus(bus);
269 self
270 }
271}