1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::Stream;
10use kithara_platform::{MaybeSend, MaybeSync};
11use tokio_util::sync::CancellationToken;
12use url::Url;
13
14mod kithara {
15 pub(crate) use kithara_test_macros::mock;
16}
17
18use crate::{
19 error::NetError,
20 retry::{DefaultRetryPolicy, RetryNet},
21 timeout::TimeoutNet,
22 types::{Headers, RangeSpec, RetryPolicy},
23};
24
25#[cfg(not(target_arch = "wasm32"))]
30type RawByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>> + Send>>;
31#[cfg(target_arch = "wasm32")]
32type RawByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>>>>;
33
34pub struct ByteStream {
43 pub headers: Headers,
45 inner: RawByteStream,
46}
47
48impl ByteStream {
49 #[must_use]
51 pub fn new(headers: Headers, inner: RawByteStream) -> Self {
52 Self { headers, inner }
53 }
54
55 #[must_use]
57 pub fn into_inner(self) -> RawByteStream {
58 self.inner
59 }
60}
61
62impl Stream for ByteStream {
63 type Item = Result<Bytes, NetError>;
64
65 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 self.get_mut().inner.as_mut().poll_next(cx)
67 }
68}
69
70#[cfg_attr(not(target_arch = "wasm32"), kithara::mock(api = NetMock))]
76#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78pub trait Net: MaybeSend + MaybeSync {
79 async fn get_bytes(&self, url: Url, headers: Option<Headers>) -> Result<Bytes, NetError>;
81
82 async fn get_range(
84 &self,
85 url: Url,
86 range: RangeSpec,
87 headers: Option<Headers>,
88 ) -> Result<ByteStream, NetError>;
89
90 async fn head(&self, url: Url, headers: Option<Headers>) -> Result<Headers, NetError>;
95
96 async fn stream(&self, url: Url, headers: Option<Headers>) -> Result<ByteStream, NetError>;
98}
99
100pub trait NetExt: Net + Sized {
101 fn with_retry(
103 self,
104 policy: RetryPolicy,
105 cancel: CancellationToken,
106 ) -> RetryNet<Self, DefaultRetryPolicy> {
107 RetryNet::new(self, DefaultRetryPolicy::new(policy), cancel)
108 }
109
110 fn with_timeout(self, timeout: Duration) -> TimeoutNet<Self> {
112 TimeoutNet::new(self, timeout)
113 }
114}
115
116impl<T: Net> NetExt for T {}