Skip to main content

zng_task/channel/
ipc_read.rs

1use futures_lite::AsyncSeekExt as _;
2use std::{
3    fs,
4    io::{self, Seek as _},
5    mem,
6    pin::{Pin, pin},
7};
8use zng_unit::ByteUnits as _;
9
10use crate::channel::{IpcBytes, IpcFileHandle};
11
12/// File handle or allocated bytes that can be read after sending to another process.
13///
14/// # Position
15///
16/// Read always starts from the beginning, the `read` methods seek the file start before returning. Note
17/// that the read position is associated with the system handle, if you create multiple duplicates of the
18/// same handle reading in one instance advances the position in all.
19#[derive(Debug)]
20#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
21#[non_exhaustive]
22pub enum IpcReadHandle {
23    /// Read directly from disk.
24    File(IpcFileHandle),
25    /// Read from allocated bytes.
26    Bytes(IpcBytes),
27}
28impl From<IpcFileHandle> for IpcReadHandle {
29    fn from(f: IpcFileHandle) -> Self {
30        IpcReadHandle::File(f)
31    }
32}
33impl From<IpcBytes> for IpcReadHandle {
34    fn from(b: IpcBytes) -> Self {
35        IpcReadHandle::Bytes(b)
36    }
37}
38impl From<fs::File> for IpcReadHandle {
39    fn from(f: fs::File) -> Self {
40        IpcReadHandle::File(f.into())
41    }
42}
43impl IpcReadHandle {
44    /// Either keep the handle or read to bytes, whichever is likely to be faster for
45    /// the common usage pattern of sending to another process and a single full read with some seeking.
46    pub fn best_read_blocking(mut file: std::fs::File) -> io::Result<Self> {
47        if file.metadata()?.len() > 5.megabytes().0 {
48            Ok(file.into())
49        } else {
50            file.seek(io::SeekFrom::Start(0))?;
51            IpcBytes::from_file_blocking(file).map(Into::into)
52        }
53    }
54
55    /// Either keep the handle or read to bytes, whichever is likely to be faster for
56    /// the common usage pattern of sending to another process and a single full read with some seeking.
57    pub async fn best_read(mut file: crate::fs::File) -> io::Result<Self> {
58        if file.metadata().await?.len() > 5.megabytes().0 {
59            let file = file.try_unwrap().await.unwrap();
60            Ok(file.into())
61        } else {
62            file.seek(io::SeekFrom::Start(0)).await?;
63            IpcBytes::from_file(file).await.map(Into::into)
64        }
65    }
66
67    /// Duplicate file handle or clone reference to bytes.
68    pub fn duplicate(&self) -> io::Result<Self> {
69        match self {
70            IpcReadHandle::File(h) => h.duplicate().map(Self::File),
71            IpcReadHandle::Bytes(b) => Ok(IpcReadHandle::Bytes(b.clone())),
72        }
73    }
74
75    /// Begin reading using the std blocking API.
76    pub fn read_blocking(self) -> io::Result<IpcReadBlocking> {
77        match self {
78            IpcReadHandle::File(h) => {
79                let mut file = std::fs::File::from(h);
80                file.seek(io::SeekFrom::Start(0))?;
81                Ok(IpcReadBlocking::File(io::BufReader::new(file)))
82            }
83            IpcReadHandle::Bytes(b) => Ok(IpcReadBlocking::Bytes(io::Cursor::new(b))),
84        }
85    }
86
87    /// Begin reading using the async API.
88    pub async fn read(self) -> io::Result<IpcRead> {
89        match self {
90            IpcReadHandle::File(h) => {
91                let mut file = crate::fs::File::from(h);
92                file.seek(io::SeekFrom::Start(0)).await?;
93                Ok(IpcRead::File(crate::io::BufReader::new(file)))
94            }
95            IpcReadHandle::Bytes(b) => Ok(IpcRead::Bytes(crate::io::Cursor::new(b))),
96        }
97    }
98
99    /// Read file to new [`IpcBytes`] or unwrap bytes.
100    pub fn read_to_bytes_blocking(self) -> io::Result<IpcBytes> {
101        match self {
102            IpcReadHandle::File(h) => {
103                let mut file = std::fs::File::from(h);
104                file.seek(io::SeekFrom::Start(0))?;
105                IpcBytes::from_file_blocking(file)
106            }
107            IpcReadHandle::Bytes(b) => Ok(b),
108        }
109    }
110
111    /// Read file to new [`IpcBytes`] or unwrap bytes.
112    pub async fn read_to_bytes(self) -> io::Result<IpcBytes> {
113        match self {
114            IpcReadHandle::File(h) => {
115                let mut file = crate::fs::File::from(h);
116                file.seek(io::SeekFrom::Start(0)).await?;
117                IpcBytes::from_file(file).await
118            }
119            IpcReadHandle::Bytes(b) => Ok(b),
120        }
121    }
122
123    /// Attempts [`duplicate`] with read fallback.
124    ///
125    /// If duplicate fails attempts [`read_to_bytes`], if it succeeds replaces `self` with read bytes and returns a clone
126    /// if it fails replaces `self` with empty and returns the read error.
127    ///
128    /// [`duplicate`]: Self::duplicate
129    /// [`read_to_bytes`]: Self::read_to_bytes
130    pub async fn duplicate_or_read(&mut self) -> io::Result<Self> {
131        match self.duplicate() {
132            Ok(d) => Ok(d),
133            Err(e) => {
134                tracing::debug!("duplicate_or_read duplicate error, {e}");
135                let f = mem::replace(self, IpcReadHandle::Bytes(IpcBytes::empty()));
136                let b = f.read_to_bytes().await?;
137                *self = IpcReadHandle::Bytes(b);
138                self.duplicate()
139            }
140        }
141    }
142
143    /// Attempts [`duplicate`] with read fallback.
144    ///
145    /// If duplicate fails attempts [`read_to_bytes_blocking`], if it succeeds replaces `self` with read bytes and returns a clone
146    /// if it fails replaces `self` with empty and returns the read error.
147    ///
148    /// [`duplicate`]: Self::duplicate
149    /// [`read_to_bytes_blocking`]: Self::read_to_bytes_blocking
150    pub fn duplicate_or_read_blocking(&mut self) -> io::Result<Self> {
151        match self.duplicate() {
152            Ok(d) => Ok(d),
153            Err(e) => {
154                tracing::debug!("duplicate_or_read_blocking duplicate error, {e}");
155                let f = mem::replace(self, IpcReadHandle::Bytes(IpcBytes::empty()));
156                let b = f.read_to_bytes_blocking()?;
157                *self = IpcReadHandle::Bytes(b);
158                self.duplicate()
159            }
160        }
161    }
162}
163
164/// Blocking read implementer for [`IpcReadHandle::read_blocking`].
165#[derive(Debug)]
166#[non_exhaustive]
167pub enum IpcReadBlocking {
168    /// Buffered reader from file.
169    File(io::BufReader<fs::File>),
170    /// Bytes.
171    Bytes(io::Cursor<IpcBytes>),
172}
173impl IpcReadBlocking {
174    /// Read all bytes until EOF to a new [`IpcBytes`].
175    ///
176    /// If the position is at 0 and is already `Bytes` returns it.
177    pub fn read_to_bytes(&mut self) -> io::Result<IpcBytes> {
178        match self {
179            IpcReadBlocking::File(f) => IpcBytes::from_read_blocking(f),
180            IpcReadBlocking::Bytes(c) => {
181                let start = c.position();
182                let len = c.get_ref().len();
183                c.set_position(len as u64);
184                if start == 0 {
185                    Ok(c.get_ref().clone())
186                } else {
187                    IpcBytes::from_slice_blocking(&c.get_ref()[start as usize..])
188                }
189            }
190        }
191    }
192
193    /// Remaining bytes length.
194    pub fn remaining_len(&mut self) -> io::Result<u64> {
195        match self {
196            IpcReadBlocking::File(b) => {
197                let total_len = b.get_ref().metadata()?.len();
198                let position = b.stream_position()?;
199                Ok(total_len - position.min(total_len))
200            }
201            IpcReadBlocking::Bytes(b) => {
202                let total_len = b.get_ref().len() as u64;
203                Ok(total_len - b.position().min(total_len))
204            }
205        }
206    }
207}
208impl io::Read for IpcReadBlocking {
209    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
210        match self {
211            IpcReadBlocking::File(f) => f.read(buf),
212            IpcReadBlocking::Bytes(b) => b.read(buf),
213        }
214    }
215
216    fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
217        match self {
218            IpcReadBlocking::File(f) => f.read_vectored(bufs),
219            IpcReadBlocking::Bytes(b) => b.read_vectored(bufs),
220        }
221    }
222
223    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
224        match self {
225            IpcReadBlocking::File(f) => f.read_to_end(buf),
226            IpcReadBlocking::Bytes(b) => b.read_to_end(buf),
227        }
228    }
229
230    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
231        match self {
232            IpcReadBlocking::File(f) => f.read_to_string(buf),
233            IpcReadBlocking::Bytes(b) => b.read_to_string(buf),
234        }
235    }
236
237    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
238        match self {
239            IpcReadBlocking::File(f) => f.read_exact(buf),
240            IpcReadBlocking::Bytes(b) => b.read_exact(buf),
241        }
242    }
243}
244impl io::Seek for IpcReadBlocking {
245    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
246        match self {
247            IpcReadBlocking::File(f) => f.seek(pos),
248            IpcReadBlocking::Bytes(b) => b.seek(pos),
249        }
250    }
251
252    fn stream_position(&mut self) -> io::Result<u64> {
253        match self {
254            IpcReadBlocking::File(f) => f.stream_position(),
255            IpcReadBlocking::Bytes(b) => b.stream_position(),
256        }
257    }
258}
259impl io::BufRead for IpcReadBlocking {
260    fn fill_buf(&mut self) -> io::Result<&[u8]> {
261        match self {
262            IpcReadBlocking::File(f) => f.fill_buf(),
263            IpcReadBlocking::Bytes(b) => b.fill_buf(),
264        }
265    }
266
267    fn consume(&mut self, amount: usize) {
268        match self {
269            IpcReadBlocking::File(f) => f.consume(amount),
270            IpcReadBlocking::Bytes(b) => b.consume(amount),
271        }
272    }
273}
274
275/// Async read implementer for [`IpcReadHandle::read`]
276#[derive(Debug)]
277#[non_exhaustive]
278pub enum IpcRead {
279    /// Buffered reader from file.
280    File(crate::io::BufReader<crate::fs::File>),
281    /// Bytes.
282    Bytes(crate::io::Cursor<IpcBytes>),
283}
284impl IpcRead {
285    /// Read all bytes until EOF to a new [`IpcBytes`].
286    ///
287    /// If the position is at 0 and is already `Bytes` returns it.
288    pub async fn read_to_bytes(&mut self) -> io::Result<IpcBytes> {
289        match self {
290            IpcRead::File(f) => IpcBytes::from_read(pin!(f)).await,
291            IpcRead::Bytes(c) => {
292                let start = c.position();
293                let len = c.get_ref().len();
294                c.set_position(len as u64);
295                let b = c.get_ref().clone();
296                if start == 0 {
297                    Ok(b)
298                } else {
299                    blocking::unblock(move || IpcBytes::from_slice_blocking(&b[start as usize..])).await
300                }
301            }
302        }
303    }
304
305    /// Remaining bytes length.
306    pub async fn remaining_len(&mut self) -> io::Result<u64> {
307        match self {
308            IpcRead::File(b) => {
309                let total_len = b.get_ref().metadata().await?.len();
310                let pos = b.seek(io::SeekFrom::Current(0)).await?;
311                Ok(total_len - pos.min(total_len))
312            }
313            IpcRead::Bytes(b) => {
314                let total_len = b.get_ref().len() as u64;
315                Ok(total_len - b.position().min(total_len))
316            }
317        }
318    }
319}
320impl crate::io::AsyncRead for IpcRead {
321    fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> std::task::Poll<io::Result<usize>> {
322        match self.get_mut() {
323            IpcRead::File(f) => Pin::new(f).poll_read(cx, buf),
324            IpcRead::Bytes(b) => Pin::new(b).poll_read(cx, buf),
325        }
326    }
327
328    fn poll_read_vectored(
329        self: Pin<&mut Self>,
330        cx: &mut std::task::Context<'_>,
331        bufs: &mut [io::IoSliceMut<'_>],
332    ) -> std::task::Poll<io::Result<usize>> {
333        match self.get_mut() {
334            IpcRead::File(f) => Pin::new(f).poll_read_vectored(cx, bufs),
335            IpcRead::Bytes(b) => Pin::new(b).poll_read_vectored(cx, bufs),
336        }
337    }
338}
339impl crate::io::AsyncBufRead for IpcRead {
340    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<&[u8]>> {
341        match self.get_mut() {
342            IpcRead::File(f) => Pin::new(f).poll_fill_buf(cx),
343            IpcRead::Bytes(b) => Pin::new(b).poll_fill_buf(cx),
344        }
345    }
346
347    fn consume(self: Pin<&mut Self>, amt: usize) {
348        match self.get_mut() {
349            IpcRead::File(f) => Pin::new(f).consume(amt),
350            IpcRead::Bytes(b) => Pin::new(b).consume(amt),
351        }
352    }
353}
354impl crate::io::AsyncSeek for IpcRead {
355    fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
356        match self.get_mut() {
357            IpcRead::File(f) => Pin::new(f).poll_seek(cx, pos),
358            IpcRead::Bytes(b) => Pin::new(b).poll_seek(cx, pos),
359        }
360    }
361}