Skip to main content

kithara_events/
downloader.rs

1#![forbid(unsafe_code)]
2
3use std::num::NonZeroU64;
4
5use kithara_net::NetError;
6use kithara_platform::time::Duration;
7use url::Url;
8
9/// Stable id for a single Downloader request.
10///
11/// Allocated internally by the Downloader's `Registry` when wrapping a
12/// `FetchCmd` into an `InternalCmd`. Echoed in every
13/// [`DownloaderEvent`] for the same logical fetch.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
15pub struct RequestId(NonZeroU64);
16
17impl RequestId {
18    /// Construct from a non-zero `u64`. Use a monotonic source (e.g.
19    /// an `AtomicU64` started at 1).
20    #[must_use]
21    pub const fn new(id: NonZeroU64) -> Self {
22        Self(id)
23    }
24
25    /// Get the inner `u64` for logging.
26    #[must_use]
27    pub const fn get(self) -> u64 {
28        self.0.get()
29    }
30}
31
32/// HTTP method of a Downloader request.
33///
34/// Lives in `kithara-events` (not `kithara-stream`) because both the
35/// command type and the lifecycle events refer to it; keeping it next
36/// to the events avoids the dependency cycle.
37#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
38pub enum RequestMethod {
39    /// HTTP GET, streaming body. Default — used for large downloads
40    /// (segments, files) that write directly to storage.
41    #[default]
42    Get,
43    /// HTTP HEAD, headers only. Used for metadata queries
44    /// (`Content-Length`).
45    Head,
46}
47
48/// Effective scheduling priority of a request.
49///
50/// Used in the Downloader's 2×2 slot map (peer priority × cmd
51/// priority): `High` commands and peers are processed before `Low`.
52#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
53pub enum RequestPriority {
54    /// Latency-sensitive: demand segments, `execute`/`batch` calls,
55    /// seek.
56    High = 0,
57    /// Background: prefetch, idle downloads. Default.
58    #[default]
59    Low = 1,
60}
61
62/// Why a fetch was cancelled.
63///
64/// Distinguishes the cancel paths so subscribers can tell e.g. a
65/// seek-driven epoch flush from a peer drop or a downloader-wide
66/// shutdown.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum CancelReason {
69    /// The protocol's epoch cancel token fired (e.g. HLS bumped
70    /// `seek_epoch`, invalidating in-flight fetches of the prior
71    /// epoch).
72    EpochCancel,
73    /// The peer's own cancel token fired — the last `PeerHandle` clone
74    /// was dropped, the protocol is shutting down its track.
75    PeerCancel,
76    /// Downloader-wide shutdown (the `Downloader` cancel token fired).
77    DownloaderShutdown,
78    /// The request's `CancelGroup` was already cancelled when the
79    /// Downloader tried to spawn the fetch — the fetch never started.
80    BeforeStart,
81}
82
83/// Events emitted by the unified downloader layer.
84///
85/// Published on the **peer's bus scope**, set via
86/// `PeerHandle::with_bus`. A per-track subscriber sees only its own
87/// fetches; a root-bus subscriber sees fetches from every peer.
88///
89/// Every variant for a single fetch carries the same [`RequestId`].
90#[derive(Debug, Clone)]
91#[non_exhaustive]
92pub enum DownloaderEvent {
93    /// Request was accepted by the Downloader and placed into a
94    /// priority slot. Published exactly once when `Registry::poll_peers`
95    /// pushes the wrapped command into `slots[idx]`. Carries everything
96    /// a subscriber needs to build a `request_id → meaning` table.
97    RequestEnqueued {
98        request_id: RequestId,
99        url: Url,
100        method: RequestMethod,
101        priority: RequestPriority,
102    },
103    /// HTTP fetch started — slot acquired, task spawned. Between
104    /// [`RequestEnqueued`](Self::RequestEnqueued) and this event there
105    /// can be an arbitrary delay bounded by `max_concurrent` (slot
106    /// pressure indicator: `wait_in_queue`).
107    RequestStarted {
108        request_id: RequestId,
109        /// Time from `RequestEnqueued` to here.
110        wait_in_queue: Duration,
111    },
112    /// `DownloaderConfig::soft_timeout` elapsed without the fetch
113    /// completing. Informational; the request keeps running.
114    LoadSlow {
115        request_id: RequestId,
116        elapsed: Duration,
117    },
118    /// HTTP body finished successfully.
119    RequestCompleted {
120        request_id: RequestId,
121        bytes_transferred: u64,
122        /// Total wall time from `RequestStarted` to here.
123        duration: Duration,
124        /// Pre-computed (`bytes / duration` → bps) so subscribers
125        /// don't repeat the math.
126        bandwidth_bps: u64,
127    },
128    /// HTTP fetch ended with a network-level error.
129    RequestFailed {
130        request_id: RequestId,
131        error: NetError,
132        /// `error.is_retryable()` — pre-evaluated.
133        retryable: bool,
134    },
135    /// HTTP fetch was cancelled before completion.
136    RequestCancelled {
137        request_id: RequestId,
138        reason: CancelReason,
139        /// Bytes received before the cancel fired (if any).
140        bytes_transferred: u64,
141    },
142    /// Effective priority of an in-queue (not-yet-started) request
143    /// changed. Reserved shape — the Downloader does not emit this
144    /// today (priority is immutable post-enqueue). Will be emitted
145    /// when the scheduler learns to demote prefetch on demand arrival.
146    PriorityChanged {
147        request_id: RequestId,
148        from: RequestPriority,
149        to: RequestPriority,
150    },
151}