Skip to main content

kithara_net/
traits.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::Stream;
10use kithara_platform::{MaybeSend, MaybeSync};
11use tokio_util::sync::CancellationToken;
12use url::Url;
13
14mod kithara {
15    pub(crate) use kithara_test_macros::mock;
16}
17
18use crate::{
19    error::NetError,
20    retry::{DefaultRetryPolicy, RetryNet},
21    timeout::TimeoutNet,
22    types::{Headers, RangeSpec, RetryPolicy},
23};
24
25/// Inner stream type used inside [`ByteStream`].
26///
27/// On native: requires `Send` (multi-threaded tokio runtime).
28/// On wasm32: no `Send` bound (JsValue-based streams are `!Send`).
29#[cfg(not(target_arch = "wasm32"))]
30type RawByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>> + Send>>;
31#[cfg(target_arch = "wasm32")]
32type RawByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>>>>;
33
34/// HTTP byte stream with response headers.
35///
36/// Wraps a streaming body together with the HTTP response headers so that
37/// callers can inspect metadata (e.g. `Content-Length`) without an extra
38/// HEAD request.
39///
40/// Implements [`Stream`] by delegating to the inner body, so it can be
41/// passed directly to any consumer that expects a byte stream.
42pub struct ByteStream {
43    /// Response headers from the HTTP request that produced this stream.
44    pub headers: Headers,
45    inner: RawByteStream,
46}
47
48impl ByteStream {
49    /// Create a new `ByteStream` from response headers and a raw body stream.
50    #[must_use]
51    pub fn new(headers: Headers, inner: RawByteStream) -> Self {
52        Self { headers, inner }
53    }
54
55    /// Consume the wrapper, returning just the raw byte stream.
56    #[must_use]
57    pub fn into_inner(self) -> RawByteStream {
58        self.inner
59    }
60}
61
62impl Stream for ByteStream {
63    type Item = Result<Bytes, NetError>;
64
65    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        self.get_mut().inner.as_mut().poll_next(cx)
67    }
68}
69
70/// HTTP networking trait.
71///
72/// Single definition for both native and wasm32 targets.
73/// On native: `MaybeSend` = `Send`, `MaybeSync` = `Sync`, futures are `Send`.
74/// On wasm32: `MaybeSend`/`MaybeSync` are blanket-implemented (no-op), futures are `!Send`.
75#[cfg_attr(not(target_arch = "wasm32"), kithara::mock(api = NetMock))]
76#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78pub trait Net: MaybeSend + MaybeSync {
79    /// Get all bytes from a URL
80    async fn get_bytes(&self, url: Url, headers: Option<Headers>) -> Result<Bytes, NetError>;
81
82    /// Get a range of bytes from a URL
83    async fn get_range(
84        &self,
85        url: Url,
86        range: RangeSpec,
87        headers: Option<Headers>,
88    ) -> Result<ByteStream, NetError>;
89
90    /// Perform a HEAD request.
91    ///
92    /// This is intended for lightweight metadata probes (e.g. `Content-Length`,
93    /// `Accept-Ranges`, `Content-Type`). Implementations should return response headers.
94    async fn head(&self, url: Url, headers: Option<Headers>) -> Result<Headers, NetError>;
95
96    /// Stream bytes from a URL
97    async fn stream(&self, url: Url, headers: Option<Headers>) -> Result<ByteStream, NetError>;
98}
99
100pub trait NetExt: Net + Sized {
101    /// Add retry layer
102    fn with_retry(
103        self,
104        policy: RetryPolicy,
105        cancel: CancellationToken,
106    ) -> RetryNet<Self, DefaultRetryPolicy> {
107        RetryNet::new(self, DefaultRetryPolicy::new(policy), cancel)
108    }
109
110    /// Add timeout layer
111    fn with_timeout(self, timeout: Duration) -> TimeoutNet<Self> {
112        TimeoutNet::new(self, timeout)
113    }
114}
115
116impl<T: Net> NetExt for T {}