zng_task/channel/
ipc_read.rs1use futures_lite::AsyncSeekExt as _;
2use std::{
3 fs,
4 io::{self, Seek as _},
5 mem,
6 pin::{Pin, pin},
7};
8use zng_unit::ByteUnits as _;
9
10use crate::channel::{IpcBytes, IpcFileHandle};
11
12#[derive(Debug)]
20#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
21#[non_exhaustive]
22pub enum IpcReadHandle {
23 File(IpcFileHandle),
25 Bytes(IpcBytes),
27}
28impl From<IpcFileHandle> for IpcReadHandle {
29 fn from(f: IpcFileHandle) -> Self {
30 IpcReadHandle::File(f)
31 }
32}
33impl From<IpcBytes> for IpcReadHandle {
34 fn from(b: IpcBytes) -> Self {
35 IpcReadHandle::Bytes(b)
36 }
37}
38impl From<fs::File> for IpcReadHandle {
39 fn from(f: fs::File) -> Self {
40 IpcReadHandle::File(f.into())
41 }
42}
43impl IpcReadHandle {
44 pub fn best_read_blocking(mut file: std::fs::File) -> io::Result<Self> {
47 if file.metadata()?.len() > 5.megabytes().0 {
48 Ok(file.into())
49 } else {
50 file.seek(io::SeekFrom::Start(0))?;
51 IpcBytes::from_file_blocking(file).map(Into::into)
52 }
53 }
54
55 pub async fn best_read(mut file: crate::fs::File) -> io::Result<Self> {
58 if file.metadata().await?.len() > 5.megabytes().0 {
59 let file = file.try_unwrap().await.unwrap();
60 Ok(file.into())
61 } else {
62 file.seek(io::SeekFrom::Start(0)).await?;
63 IpcBytes::from_file(file).await.map(Into::into)
64 }
65 }
66
67 pub fn duplicate(&self) -> io::Result<Self> {
69 match self {
70 IpcReadHandle::File(h) => h.duplicate().map(Self::File),
71 IpcReadHandle::Bytes(b) => Ok(IpcReadHandle::Bytes(b.clone())),
72 }
73 }
74
75 pub fn read_blocking(self) -> io::Result<IpcReadBlocking> {
77 match self {
78 IpcReadHandle::File(h) => {
79 let mut file = std::fs::File::from(h);
80 file.seek(io::SeekFrom::Start(0))?;
81 Ok(IpcReadBlocking::File(io::BufReader::new(file)))
82 }
83 IpcReadHandle::Bytes(b) => Ok(IpcReadBlocking::Bytes(io::Cursor::new(b))),
84 }
85 }
86
87 pub async fn read(self) -> io::Result<IpcRead> {
89 match self {
90 IpcReadHandle::File(h) => {
91 let mut file = crate::fs::File::from(h);
92 file.seek(io::SeekFrom::Start(0)).await?;
93 Ok(IpcRead::File(crate::io::BufReader::new(file)))
94 }
95 IpcReadHandle::Bytes(b) => Ok(IpcRead::Bytes(crate::io::Cursor::new(b))),
96 }
97 }
98
99 pub fn read_to_bytes_blocking(self) -> io::Result<IpcBytes> {
101 match self {
102 IpcReadHandle::File(h) => {
103 let mut file = std::fs::File::from(h);
104 file.seek(io::SeekFrom::Start(0))?;
105 IpcBytes::from_file_blocking(file)
106 }
107 IpcReadHandle::Bytes(b) => Ok(b),
108 }
109 }
110
111 pub async fn read_to_bytes(self) -> io::Result<IpcBytes> {
113 match self {
114 IpcReadHandle::File(h) => {
115 let mut file = crate::fs::File::from(h);
116 file.seek(io::SeekFrom::Start(0)).await?;
117 IpcBytes::from_file(file).await
118 }
119 IpcReadHandle::Bytes(b) => Ok(b),
120 }
121 }
122
123 pub async fn duplicate_or_read(&mut self) -> io::Result<Self> {
131 match self.duplicate() {
132 Ok(d) => Ok(d),
133 Err(e) => {
134 tracing::debug!("duplicate_or_read duplicate error, {e}");
135 let f = mem::replace(self, IpcReadHandle::Bytes(IpcBytes::empty()));
136 let b = f.read_to_bytes().await?;
137 *self = IpcReadHandle::Bytes(b);
138 self.duplicate()
139 }
140 }
141 }
142
143 pub fn duplicate_or_read_blocking(&mut self) -> io::Result<Self> {
151 match self.duplicate() {
152 Ok(d) => Ok(d),
153 Err(e) => {
154 tracing::debug!("duplicate_or_read_blocking duplicate error, {e}");
155 let f = mem::replace(self, IpcReadHandle::Bytes(IpcBytes::empty()));
156 let b = f.read_to_bytes_blocking()?;
157 *self = IpcReadHandle::Bytes(b);
158 self.duplicate()
159 }
160 }
161 }
162}
163
164#[derive(Debug)]
166#[non_exhaustive]
167pub enum IpcReadBlocking {
168 File(io::BufReader<fs::File>),
170 Bytes(io::Cursor<IpcBytes>),
172}
173impl IpcReadBlocking {
174 pub fn read_to_bytes(&mut self) -> io::Result<IpcBytes> {
178 match self {
179 IpcReadBlocking::File(f) => IpcBytes::from_read_blocking(f),
180 IpcReadBlocking::Bytes(c) => {
181 let start = c.position();
182 let len = c.get_ref().len();
183 c.set_position(len as u64);
184 if start == 0 {
185 Ok(c.get_ref().clone())
186 } else {
187 IpcBytes::from_slice_blocking(&c.get_ref()[start as usize..])
188 }
189 }
190 }
191 }
192
193 pub fn remaining_len(&mut self) -> io::Result<u64> {
195 match self {
196 IpcReadBlocking::File(b) => {
197 let total_len = b.get_ref().metadata()?.len();
198 let position = b.stream_position()?;
199 Ok(total_len - position.min(total_len))
200 }
201 IpcReadBlocking::Bytes(b) => {
202 let total_len = b.get_ref().len() as u64;
203 Ok(total_len - b.position().min(total_len))
204 }
205 }
206 }
207}
208impl io::Read for IpcReadBlocking {
209 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
210 match self {
211 IpcReadBlocking::File(f) => f.read(buf),
212 IpcReadBlocking::Bytes(b) => b.read(buf),
213 }
214 }
215
216 fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
217 match self {
218 IpcReadBlocking::File(f) => f.read_vectored(bufs),
219 IpcReadBlocking::Bytes(b) => b.read_vectored(bufs),
220 }
221 }
222
223 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
224 match self {
225 IpcReadBlocking::File(f) => f.read_to_end(buf),
226 IpcReadBlocking::Bytes(b) => b.read_to_end(buf),
227 }
228 }
229
230 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
231 match self {
232 IpcReadBlocking::File(f) => f.read_to_string(buf),
233 IpcReadBlocking::Bytes(b) => b.read_to_string(buf),
234 }
235 }
236
237 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
238 match self {
239 IpcReadBlocking::File(f) => f.read_exact(buf),
240 IpcReadBlocking::Bytes(b) => b.read_exact(buf),
241 }
242 }
243}
244impl io::Seek for IpcReadBlocking {
245 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
246 match self {
247 IpcReadBlocking::File(f) => f.seek(pos),
248 IpcReadBlocking::Bytes(b) => b.seek(pos),
249 }
250 }
251
252 fn stream_position(&mut self) -> io::Result<u64> {
253 match self {
254 IpcReadBlocking::File(f) => f.stream_position(),
255 IpcReadBlocking::Bytes(b) => b.stream_position(),
256 }
257 }
258}
259impl io::BufRead for IpcReadBlocking {
260 fn fill_buf(&mut self) -> io::Result<&[u8]> {
261 match self {
262 IpcReadBlocking::File(f) => f.fill_buf(),
263 IpcReadBlocking::Bytes(b) => b.fill_buf(),
264 }
265 }
266
267 fn consume(&mut self, amount: usize) {
268 match self {
269 IpcReadBlocking::File(f) => f.consume(amount),
270 IpcReadBlocking::Bytes(b) => b.consume(amount),
271 }
272 }
273}
274
275#[derive(Debug)]
277#[non_exhaustive]
278pub enum IpcRead {
279 File(crate::io::BufReader<crate::fs::File>),
281 Bytes(crate::io::Cursor<IpcBytes>),
283}
284impl IpcRead {
285 pub async fn read_to_bytes(&mut self) -> io::Result<IpcBytes> {
289 match self {
290 IpcRead::File(f) => IpcBytes::from_read(pin!(f)).await,
291 IpcRead::Bytes(c) => {
292 let start = c.position();
293 let len = c.get_ref().len();
294 c.set_position(len as u64);
295 let b = c.get_ref().clone();
296 if start == 0 {
297 Ok(b)
298 } else {
299 blocking::unblock(move || IpcBytes::from_slice_blocking(&b[start as usize..])).await
300 }
301 }
302 }
303 }
304
305 pub async fn remaining_len(&mut self) -> io::Result<u64> {
307 match self {
308 IpcRead::File(b) => {
309 let total_len = b.get_ref().metadata().await?.len();
310 let pos = b.seek(io::SeekFrom::Current(0)).await?;
311 Ok(total_len - pos.min(total_len))
312 }
313 IpcRead::Bytes(b) => {
314 let total_len = b.get_ref().len() as u64;
315 Ok(total_len - b.position().min(total_len))
316 }
317 }
318 }
319}
320impl crate::io::AsyncRead for IpcRead {
321 fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> std::task::Poll<io::Result<usize>> {
322 match self.get_mut() {
323 IpcRead::File(f) => Pin::new(f).poll_read(cx, buf),
324 IpcRead::Bytes(b) => Pin::new(b).poll_read(cx, buf),
325 }
326 }
327
328 fn poll_read_vectored(
329 self: Pin<&mut Self>,
330 cx: &mut std::task::Context<'_>,
331 bufs: &mut [io::IoSliceMut<'_>],
332 ) -> std::task::Poll<io::Result<usize>> {
333 match self.get_mut() {
334 IpcRead::File(f) => Pin::new(f).poll_read_vectored(cx, bufs),
335 IpcRead::Bytes(b) => Pin::new(b).poll_read_vectored(cx, bufs),
336 }
337 }
338}
339impl crate::io::AsyncBufRead for IpcRead {
340 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<&[u8]>> {
341 match self.get_mut() {
342 IpcRead::File(f) => Pin::new(f).poll_fill_buf(cx),
343 IpcRead::Bytes(b) => Pin::new(b).poll_fill_buf(cx),
344 }
345 }
346
347 fn consume(self: Pin<&mut Self>, amt: usize) {
348 match self.get_mut() {
349 IpcRead::File(f) => Pin::new(f).consume(amt),
350 IpcRead::Bytes(b) => Pin::new(b).consume(amt),
351 }
352 }
353}
354impl crate::io::AsyncSeek for IpcRead {
355 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
356 match self.get_mut() {
357 IpcRead::File(f) => Pin::new(f).poll_seek(cx, pos),
358 IpcRead::Bytes(b) => Pin::new(b).poll_seek(cx, pos),
359 }
360 }
361}