omni_llm_kit/http_client/
async_body.rs

1use std::{
2    io::{Cursor, Read},
3    pin::Pin,
4    task::Poll,
5};
6
7use bytes::Bytes;
8use futures::AsyncRead;
9
10/// Based on the implementation of AsyncBody in
11/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
12pub struct AsyncBody(pub Inner);
13
14pub enum Inner {
15    /// An empty body.
16    Empty,
17
18    /// A body stored in memory.
19    Bytes(std::io::Cursor<Bytes>),
20
21    /// An asynchronous reader.
22    AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
23}
24
25impl AsyncBody {
26    /// Create a new empty body.
27    ///
28    /// An empty body represents the *absence* of a body, which is semantically
29    /// different than the presence of a body of zero length.
30    pub fn empty() -> Self {
31        Self(Inner::Empty)
32    }
33    /// Create a streaming body that reads from the given reader.
34    pub fn from_reader<R>(read: R) -> Self
35    where
36        R: AsyncRead + Send + Sync + 'static,
37    {
38        Self(Inner::AsyncReader(Box::pin(read)))
39    }
40
41    pub fn from_bytes(bytes: Bytes) -> Self {
42        Self(Inner::Bytes(Cursor::new(bytes.clone())))
43    }
44}
45
46impl Default for AsyncBody {
47    fn default() -> Self {
48        Self(Inner::Empty)
49    }
50}
51
52impl From<()> for AsyncBody {
53    fn from(_: ()) -> Self {
54        Self(Inner::Empty)
55    }
56}
57
58impl From<Bytes> for AsyncBody {
59    fn from(bytes: Bytes) -> Self {
60        Self::from_bytes(bytes)
61    }
62}
63
64impl From<Vec<u8>> for AsyncBody {
65    fn from(body: Vec<u8>) -> Self {
66        Self::from_bytes(body.into())
67    }
68}
69
70impl From<String> for AsyncBody {
71    fn from(body: String) -> Self {
72        Self::from_bytes(body.into())
73    }
74}
75
76impl From<&'static [u8]> for AsyncBody {
77    #[inline]
78    fn from(s: &'static [u8]) -> Self {
79        Self::from_bytes(Bytes::from_static(s))
80    }
81}
82
83impl From<&'static str> for AsyncBody {
84    #[inline]
85    fn from(s: &'static str) -> Self {
86        Self::from_bytes(Bytes::from_static(s.as_bytes()))
87    }
88}
89
90impl<T: Into<Self>> From<Option<T>> for AsyncBody {
91    fn from(body: Option<T>) -> Self {
92        match body {
93            Some(body) => body.into(),
94            None => Self::empty(),
95        }
96    }
97}
98
99impl futures::AsyncRead for AsyncBody {
100    fn poll_read(
101        self: Pin<&mut Self>,
102        cx: &mut std::task::Context<'_>,
103        buf: &mut [u8],
104    ) -> std::task::Poll<std::io::Result<usize>> {
105        // SAFETY: Standard Enum pin projection
106        let inner = unsafe { &mut self.get_unchecked_mut().0 };
107        match inner {
108            Inner::Empty => Poll::Ready(Ok(0)),
109            // Blocking call is over an in-memory buffer
110            Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
111            Inner::AsyncReader(async_reader) => {
112                AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
113            }
114        }
115    }
116}