kithara_events/downloader.rs
1#![forbid(unsafe_code)]
2
3use std::num::NonZeroU64;
4
5use kithara_net::NetError;
6use kithara_platform::time::Duration;
7use url::Url;
8
9/// Stable id for a single Downloader request.
10///
11/// Allocated internally by the Downloader's `Registry` when wrapping a
12/// `FetchCmd` into an `InternalCmd`. Echoed in every
13/// [`DownloaderEvent`] for the same logical fetch.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
15pub struct RequestId(NonZeroU64);
16
17impl RequestId {
18 /// Construct from a non-zero `u64`. Use a monotonic source (e.g.
19 /// an `AtomicU64` started at 1).
20 #[must_use]
21 pub const fn new(id: NonZeroU64) -> Self {
22 Self(id)
23 }
24
25 /// Get the inner `u64` for logging.
26 #[must_use]
27 pub const fn get(self) -> u64 {
28 self.0.get()
29 }
30}
31
32/// HTTP method of a Downloader request.
33///
34/// Lives in `kithara-events` (not `kithara-stream`) because both the
35/// command type and the lifecycle events refer to it; keeping it next
36/// to the events avoids the dependency cycle.
37#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
38pub enum RequestMethod {
39 /// HTTP GET, streaming body. Default — used for large downloads
40 /// (segments, files) that write directly to storage.
41 #[default]
42 Get,
43 /// HTTP HEAD, headers only. Used for metadata queries
44 /// (`Content-Length`).
45 Head,
46}
47
48/// Effective scheduling priority of a request.
49///
50/// Used in the Downloader's 2×2 slot map (peer priority × cmd
51/// priority): `High` commands and peers are processed before `Low`.
52#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
53pub enum RequestPriority {
54 /// Latency-sensitive: demand segments, `execute`/`batch` calls,
55 /// seek.
56 High = 0,
57 /// Background: prefetch, idle downloads. Default.
58 #[default]
59 Low = 1,
60}
61
62/// Why a fetch was cancelled.
63///
64/// Distinguishes the cancel paths so subscribers can tell e.g. a
65/// seek-driven epoch flush from a peer drop or a downloader-wide
66/// shutdown.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum CancelReason {
69 /// The protocol's epoch cancel token fired (e.g. HLS bumped
70 /// `seek_epoch`, invalidating in-flight fetches of the prior
71 /// epoch).
72 EpochCancel,
73 /// The peer's own cancel token fired — the last `PeerHandle` clone
74 /// was dropped, the protocol is shutting down its track.
75 PeerCancel,
76 /// Downloader-wide shutdown (the `Downloader` cancel token fired).
77 DownloaderShutdown,
78 /// The request's `CancelGroup` was already cancelled when the
79 /// Downloader tried to spawn the fetch — the fetch never started.
80 BeforeStart,
81}
82
83/// Events emitted by the unified downloader layer.
84///
85/// Published on the **peer's bus scope**, set via
86/// `PeerHandle::with_bus`. A per-track subscriber sees only its own
87/// fetches; a root-bus subscriber sees fetches from every peer.
88///
89/// Every variant for a single fetch carries the same [`RequestId`].
90#[derive(Debug, Clone)]
91#[non_exhaustive]
92pub enum DownloaderEvent {
93 /// Request was accepted by the Downloader and placed into a
94 /// priority slot. Published exactly once when `Registry::poll_peers`
95 /// pushes the wrapped command into `slots[idx]`. Carries everything
96 /// a subscriber needs to build a `request_id → meaning` table.
97 RequestEnqueued {
98 request_id: RequestId,
99 url: Url,
100 method: RequestMethod,
101 priority: RequestPriority,
102 },
103 /// HTTP fetch started — slot acquired, task spawned. Between
104 /// [`RequestEnqueued`](Self::RequestEnqueued) and this event there
105 /// can be an arbitrary delay bounded by `max_concurrent` (slot
106 /// pressure indicator: `wait_in_queue`).
107 RequestStarted {
108 request_id: RequestId,
109 /// Time from `RequestEnqueued` to here.
110 wait_in_queue: Duration,
111 },
112 /// `DownloaderConfig::soft_timeout` elapsed without the fetch
113 /// completing. Informational; the request keeps running.
114 LoadSlow {
115 request_id: RequestId,
116 elapsed: Duration,
117 },
118 /// HTTP body finished successfully.
119 RequestCompleted {
120 request_id: RequestId,
121 bytes_transferred: u64,
122 /// Total wall time from `RequestStarted` to here.
123 duration: Duration,
124 /// Pre-computed (`bytes / duration` → bps) so subscribers
125 /// don't repeat the math.
126 bandwidth_bps: u64,
127 },
128 /// HTTP fetch ended with a network-level error.
129 RequestFailed {
130 request_id: RequestId,
131 error: NetError,
132 /// `error.is_retryable()` — pre-evaluated.
133 retryable: bool,
134 },
135 /// HTTP fetch was cancelled before completion.
136 RequestCancelled {
137 request_id: RequestId,
138 reason: CancelReason,
139 /// Bytes received before the cancel fired (if any).
140 bytes_transferred: u64,
141 },
142 /// Effective priority of an in-queue (not-yet-started) request
143 /// changed. Reserved shape — the Downloader does not emit this
144 /// today (priority is immutable post-enqueue). Will be emitted
145 /// when the scheduler learns to demote prefetch on demand arrival.
146 PriorityChanged {
147 request_id: RequestId,
148 from: RequestPriority,
149 to: RequestPriority,
150 },
151}