gio/
input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{prelude::*, translate::*, Priority};
8
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, InputStream, Seekable};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12    #[doc(alias = "g_input_stream_read")]
13    fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
14        &self,
15        mut buffer: B,
16        cancellable: Option<&C>,
17    ) -> Result<usize, glib::Error> {
18        let cancellable = cancellable.map(|c| c.as_ref());
19        let gcancellable = cancellable.to_glib_none();
20        let buffer = buffer.as_mut();
21        let buffer_ptr = buffer.as_mut_ptr();
22        let count = buffer.len();
23        unsafe {
24            let mut error = ptr::null_mut();
25            let ret = ffi::g_input_stream_read(
26                self.as_ref().to_glib_none().0,
27                buffer_ptr,
28                count,
29                gcancellable.0,
30                &mut error,
31            );
32            if error.is_null() {
33                Ok(ret as usize)
34            } else {
35                Err(from_glib_full(error))
36            }
37        }
38    }
39
40    #[doc(alias = "g_input_stream_read_all")]
41    fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
42        &self,
43        mut buffer: B,
44        cancellable: Option<&C>,
45    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
46        let cancellable = cancellable.map(|c| c.as_ref());
47        let gcancellable = cancellable.to_glib_none();
48        let buffer = buffer.as_mut();
49        let buffer_ptr = buffer.as_mut_ptr();
50        let count = buffer.len();
51        unsafe {
52            let mut bytes_read = mem::MaybeUninit::uninit();
53            let mut error = ptr::null_mut();
54            let _ = ffi::g_input_stream_read_all(
55                self.as_ref().to_glib_none().0,
56                buffer_ptr,
57                count,
58                bytes_read.as_mut_ptr(),
59                gcancellable.0,
60                &mut error,
61            );
62
63            let bytes_read = bytes_read.assume_init();
64            if error.is_null() {
65                Ok((bytes_read, None))
66            } else if bytes_read != 0 {
67                Ok((bytes_read, Some(from_glib_full(error))))
68            } else {
69                Err(from_glib_full(error))
70            }
71        }
72    }
73
74    #[doc(alias = "g_input_stream_read_all_async")]
75    fn read_all_async<
76        B: AsMut<[u8]> + Send + 'static,
77        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
78        C: IsA<Cancellable>,
79    >(
80        &self,
81        buffer: B,
82        io_priority: Priority,
83        cancellable: Option<&C>,
84        callback: Q,
85    ) {
86        let main_context = glib::MainContext::ref_thread_default();
87        let is_main_context_owner = main_context.is_owner();
88        let has_acquired_main_context = (!is_main_context_owner)
89            .then(|| main_context.acquire().ok())
90            .flatten();
91        assert!(
92            is_main_context_owner || has_acquired_main_context.is_some(),
93            "Async operations only allowed if the thread is owning the MainContext"
94        );
95
96        let cancellable = cancellable.map(|c| c.as_ref());
97        let gcancellable = cancellable.to_glib_none();
98        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
99            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
100        // Need to do this after boxing as the contents pointer might change by moving into the box
101        let (count, buffer_ptr) = {
102            let buffer = &mut user_data.1;
103            let slice = (*buffer).as_mut();
104            (slice.len(), slice.as_mut_ptr())
105        };
106        unsafe extern "C" fn read_all_async_trampoline<
107            B: AsMut<[u8]> + Send + 'static,
108            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
109        >(
110            _source_object: *mut glib::gobject_ffi::GObject,
111            res: *mut ffi::GAsyncResult,
112            user_data: glib::ffi::gpointer,
113        ) {
114            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
115                Box::from_raw(user_data as *mut _);
116            let (callback, buffer) = *user_data;
117            let callback = callback.into_inner();
118
119            let mut error = ptr::null_mut();
120            let mut bytes_read = mem::MaybeUninit::uninit();
121            let _ = ffi::g_input_stream_read_all_finish(
122                _source_object as *mut _,
123                res,
124                bytes_read.as_mut_ptr(),
125                &mut error,
126            );
127
128            let bytes_read = bytes_read.assume_init();
129            let result = if error.is_null() {
130                Ok((buffer, bytes_read, None))
131            } else if bytes_read != 0 {
132                Ok((buffer, bytes_read, Some(from_glib_full(error))))
133            } else {
134                Err((buffer, from_glib_full(error)))
135            };
136
137            callback(result);
138        }
139        let callback = read_all_async_trampoline::<B, Q>;
140        unsafe {
141            ffi::g_input_stream_read_all_async(
142                self.as_ref().to_glib_none().0,
143                buffer_ptr,
144                count,
145                io_priority.into_glib(),
146                gcancellable.0,
147                Some(callback),
148                Box::into_raw(user_data) as *mut _,
149            );
150        }
151    }
152
153    #[doc(alias = "g_input_stream_read_async")]
154    fn read_async<
155        B: AsMut<[u8]> + Send + 'static,
156        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
157        C: IsA<Cancellable>,
158    >(
159        &self,
160        buffer: B,
161        io_priority: Priority,
162        cancellable: Option<&C>,
163        callback: Q,
164    ) {
165        let main_context = glib::MainContext::ref_thread_default();
166        let is_main_context_owner = main_context.is_owner();
167        let has_acquired_main_context = (!is_main_context_owner)
168            .then(|| main_context.acquire().ok())
169            .flatten();
170        assert!(
171            is_main_context_owner || has_acquired_main_context.is_some(),
172            "Async operations only allowed if the thread is owning the MainContext"
173        );
174
175        let cancellable = cancellable.map(|c| c.as_ref());
176        let gcancellable = cancellable.to_glib_none();
177        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
178            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
179        // Need to do this after boxing as the contents pointer might change by moving into the box
180        let (count, buffer_ptr) = {
181            let buffer = &mut user_data.1;
182            let slice = (*buffer).as_mut();
183            (slice.len(), slice.as_mut_ptr())
184        };
185        unsafe extern "C" fn read_async_trampoline<
186            B: AsMut<[u8]> + Send + 'static,
187            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
188        >(
189            _source_object: *mut glib::gobject_ffi::GObject,
190            res: *mut ffi::GAsyncResult,
191            user_data: glib::ffi::gpointer,
192        ) {
193            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
194                Box::from_raw(user_data as *mut _);
195            let (callback, buffer) = *user_data;
196            let callback = callback.into_inner();
197
198            let mut error = ptr::null_mut();
199            let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
200
201            let result = if error.is_null() {
202                Ok((buffer, ret as usize))
203            } else {
204                Err((buffer, from_glib_full(error)))
205            };
206
207            callback(result);
208        }
209        let callback = read_async_trampoline::<B, Q>;
210        unsafe {
211            ffi::g_input_stream_read_async(
212                self.as_ref().to_glib_none().0,
213                buffer_ptr,
214                count,
215                io_priority.into_glib(),
216                gcancellable.0,
217                Some(callback),
218                Box::into_raw(user_data) as *mut _,
219            );
220        }
221    }
222
223    fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
224        &self,
225        buffer: B,
226        io_priority: Priority,
227    ) -> Pin<
228        Box<
229            dyn std::future::Future<
230                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
231                > + 'static,
232        >,
233    > {
234        Box::pin(crate::GioFuture::new(
235            self,
236            move |obj, cancellable, send| {
237                obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
238                    send.resolve(res);
239                });
240            },
241        ))
242    }
243
244    fn read_future<B: AsMut<[u8]> + Send + 'static>(
245        &self,
246        buffer: B,
247        io_priority: Priority,
248    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
249    {
250        Box::pin(crate::GioFuture::new(
251            self,
252            move |obj, cancellable, send| {
253                obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
254                    send.resolve(res);
255                });
256            },
257        ))
258    }
259
260    fn into_read(self) -> InputStreamRead<Self>
261    where
262        Self: IsA<InputStream>,
263    {
264        InputStreamRead(self)
265    }
266
267    fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
268    where
269        Self: IsA<InputStream>,
270    {
271        InputStreamAsyncBufRead::new(self, buffer_size)
272    }
273}
274
275impl<O: IsA<InputStream>> InputStreamExtManual for O {}
276
277#[derive(Debug)]
278pub struct InputStreamRead<T: IsA<InputStream>>(T);
279
280impl<T: IsA<InputStream>> InputStreamRead<T> {
281    pub fn into_input_stream(self) -> T {
282        self.0
283    }
284
285    pub fn input_stream(&self) -> &T {
286        &self.0
287    }
288}
289
290impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
291    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
292        let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
293        to_std_io_result(gio_result)
294    }
295}
296
297impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
298    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
299        let (pos, type_) = match pos {
300            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
301            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
302            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
303        };
304        let seekable: &Seekable = self.0.as_ref();
305        let gio_result = seekable
306            .seek(pos, type_, crate::Cancellable::NONE)
307            .map(|_| seekable.tell() as u64);
308        to_std_io_result(gio_result)
309    }
310}
311
312enum State {
313    Waiting {
314        buffer: Vec<u8>,
315    },
316    Transitioning,
317    Reading {
318        pending: Pin<
319            Box<
320                dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
321                    + 'static,
322            >,
323        >,
324    },
325    HasData {
326        buffer: Vec<u8>,
327        valid: (usize, usize), // first index is inclusive, second is exclusive
328    },
329    Failed(crate::IOErrorEnum),
330}
331
332impl State {
333    fn into_buffer(self) -> Vec<u8> {
334        match self {
335            State::Waiting { buffer } => buffer,
336            _ => panic!("Invalid state"),
337        }
338    }
339
340    #[doc(alias = "get_pending")]
341    fn pending(
342        &mut self,
343    ) -> &mut Pin<
344        Box<
345            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
346                + 'static,
347        >,
348    > {
349        match self {
350            State::Reading { ref mut pending } => pending,
351            _ => panic!("Invalid state"),
352        }
353    }
354}
355pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
356    stream: T,
357    state: State,
358}
359
360impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
361    pub fn into_input_stream(self) -> T {
362        self.stream
363    }
364
365    pub fn input_stream(&self) -> &T {
366        &self.stream
367    }
368
369    fn new(stream: T, buffer_size: usize) -> Self {
370        let buffer = vec![0; buffer_size];
371
372        Self {
373            stream,
374            state: State::Waiting { buffer },
375        }
376    }
377    fn set_reading(
378        &mut self,
379    ) -> &mut Pin<
380        Box<
381            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
382                + 'static,
383        >,
384    > {
385        match self.state {
386            State::Waiting { .. } => {
387                let waiting = mem::replace(&mut self.state, State::Transitioning);
388                let buffer = waiting.into_buffer();
389                let pending = self.input_stream().read_future(buffer, Priority::default());
390                self.state = State::Reading { pending };
391            }
392            State::Reading { .. } => {}
393            _ => panic!("Invalid state"),
394        };
395
396        self.state.pending()
397    }
398
399    #[doc(alias = "get_data")]
400    fn data(&self) -> Poll<io::Result<&[u8]>> {
401        if let State::HasData {
402            ref buffer,
403            valid: (i, j),
404        } = self.state
405        {
406            return Poll::Ready(Ok(&buffer[i..j]));
407        }
408        panic!("Invalid state")
409    }
410
411    fn set_waiting(&mut self, buffer: Vec<u8>) {
412        match self.state {
413            State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
414            _ => panic!("Invalid state"),
415        }
416    }
417
418    fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
419        match self.state {
420            State::Reading { .. } | State::Transitioning => {
421                self.state = State::HasData { buffer, valid }
422            }
423            _ => panic!("Invalid state"),
424        }
425    }
426
427    fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
428        match self.state {
429            State::Failed(kind) => Poll::Ready(Err(io::Error::new(
430                io::ErrorKind::from(kind),
431                BufReadError::Failed,
432            ))),
433            State::HasData { .. } => self.data(),
434            State::Transitioning => panic!("Invalid state"),
435            State::Waiting { .. } | State::Reading { .. } => {
436                let pending = self.set_reading();
437                match Pin::new(pending).poll(cx) {
438                    Poll::Ready(Ok((buffer, res))) => {
439                        if res == 0 {
440                            self.set_waiting(buffer);
441                            Poll::Ready(Ok(&[]))
442                        } else {
443                            self.set_has_data(buffer, (0, res));
444                            self.data()
445                        }
446                    }
447                    Poll::Ready(Err((_, err))) => {
448                        let kind = err
449                            .kind::<crate::IOErrorEnum>()
450                            .unwrap_or(crate::IOErrorEnum::Failed);
451                        self.state = State::Failed(kind);
452                        Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
453                    }
454                    Poll::Pending => Poll::Pending,
455                }
456            }
457        }
458    }
459
460    fn consume(&mut self, amt: usize) {
461        if amt == 0 {
462            return;
463        }
464
465        if let State::HasData { .. } = self.state {
466            let has_data = mem::replace(&mut self.state, State::Transitioning);
467            if let State::HasData {
468                buffer,
469                valid: (i, j),
470            } = has_data
471            {
472                let available = j - i;
473                if amt > available {
474                    panic!("Cannot consume {amt} bytes as only {available} are available",)
475                }
476                let remaining = available - amt;
477                if remaining == 0 {
478                    return self.set_waiting(buffer);
479                } else {
480                    return self.set_has_data(buffer, (i + amt, j));
481                }
482            }
483        }
484
485        panic!("Invalid state")
486    }
487}
488
489#[derive(Debug)]
490enum BufReadError {
491    Failed,
492}
493
494impl std::error::Error for BufReadError {}
495
496impl fmt::Display for BufReadError {
497    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
498        match self {
499            Self::Failed => fmt.write_str("Previous read operation failed"),
500        }
501    }
502}
503
504impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
505    fn poll_read(
506        self: Pin<&mut Self>,
507        cx: &mut Context,
508        out_buf: &mut [u8],
509    ) -> Poll<io::Result<usize>> {
510        let reader = self.get_mut();
511        let poll = reader.poll_fill_buf(cx);
512
513        let poll = poll.map_ok(|buffer| {
514            let copied = buffer.len().min(out_buf.len());
515            out_buf[..copied].copy_from_slice(&buffer[..copied]);
516            copied
517        });
518
519        if let Poll::Ready(Ok(consumed)) = poll {
520            reader.consume(consumed);
521        }
522        poll
523    }
524}
525
526impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
527    fn poll_fill_buf(
528        self: Pin<&mut Self>,
529        cx: &mut Context,
530    ) -> Poll<Result<&[u8], futures_io::Error>> {
531        self.get_mut().poll_fill_buf(cx)
532    }
533
534    fn consume(self: Pin<&mut Self>, amt: usize) {
535        self.get_mut().consume(amt);
536    }
537}
538
539impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
540
541#[cfg(test)]
542mod tests {
543    use std::io::Read;
544
545    use glib::Bytes;
546
547    use crate::{prelude::*, test_util::run_async, MemoryInputStream};
548
549    #[test]
550    fn read_all_async() {
551        let ret = run_async(|tx, l| {
552            let b = Bytes::from_owned(vec![1, 2, 3]);
553            let strm = MemoryInputStream::from_bytes(&b);
554
555            let buf = vec![0; 10];
556            strm.read_all_async(
557                buf,
558                glib::Priority::DEFAULT_IDLE,
559                crate::Cancellable::NONE,
560                move |ret| {
561                    tx.send(ret).unwrap();
562                    l.quit();
563                },
564            );
565        });
566
567        let (buf, count, err) = ret.unwrap();
568        assert_eq!(count, 3);
569        assert!(err.is_none());
570        assert_eq!(buf[0], 1);
571        assert_eq!(buf[1], 2);
572        assert_eq!(buf[2], 3);
573    }
574
575    #[test]
576    fn read_all() {
577        let b = Bytes::from_owned(vec![1, 2, 3]);
578        let strm = MemoryInputStream::from_bytes(&b);
579        let mut buf = vec![0; 10];
580
581        let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
582
583        assert_eq!(ret.0, 3);
584        assert!(ret.1.is_none());
585        assert_eq!(buf[0], 1);
586        assert_eq!(buf[1], 2);
587        assert_eq!(buf[2], 3);
588    }
589
590    #[test]
591    fn read() {
592        let b = Bytes::from_owned(vec![1, 2, 3]);
593        let strm = MemoryInputStream::from_bytes(&b);
594        let mut buf = vec![0; 10];
595
596        let ret = strm.read(&mut buf, crate::Cancellable::NONE);
597
598        assert_eq!(ret.unwrap(), 3);
599        assert_eq!(buf[0], 1);
600        assert_eq!(buf[1], 2);
601        assert_eq!(buf[2], 3);
602    }
603
604    #[test]
605    fn read_async() {
606        let ret = run_async(|tx, l| {
607            let b = Bytes::from_owned(vec![1, 2, 3]);
608            let strm = MemoryInputStream::from_bytes(&b);
609
610            let buf = vec![0; 10];
611            strm.read_async(
612                buf,
613                glib::Priority::DEFAULT_IDLE,
614                crate::Cancellable::NONE,
615                move |ret| {
616                    tx.send(ret).unwrap();
617                    l.quit();
618                },
619            );
620        });
621
622        let (buf, count) = ret.unwrap();
623        assert_eq!(count, 3);
624        assert_eq!(buf[0], 1);
625        assert_eq!(buf[1], 2);
626        assert_eq!(buf[2], 3);
627    }
628
629    #[test]
630    fn read_bytes_async() {
631        let ret = run_async(|tx, l| {
632            let b = Bytes::from_owned(vec![1, 2, 3]);
633            let strm = MemoryInputStream::from_bytes(&b);
634
635            strm.read_bytes_async(
636                10,
637                glib::Priority::DEFAULT_IDLE,
638                crate::Cancellable::NONE,
639                move |ret| {
640                    tx.send(ret).unwrap();
641                    l.quit();
642                },
643            );
644        });
645
646        let bytes = ret.unwrap();
647        assert_eq!(bytes, vec![1, 2, 3]);
648    }
649
650    #[test]
651    fn skip_async() {
652        let ret = run_async(|tx, l| {
653            let b = Bytes::from_owned(vec![1, 2, 3]);
654            let strm = MemoryInputStream::from_bytes(&b);
655
656            strm.skip_async(
657                10,
658                glib::Priority::DEFAULT_IDLE,
659                crate::Cancellable::NONE,
660                move |ret| {
661                    tx.send(ret).unwrap();
662                    l.quit();
663                },
664            );
665        });
666
667        let skipped = ret.unwrap();
668        assert_eq!(skipped, 3);
669    }
670
671    #[test]
672    fn std_io_read() {
673        let b = Bytes::from_owned(vec![1, 2, 3]);
674        let mut read = MemoryInputStream::from_bytes(&b).into_read();
675        let mut buf = [0u8; 10];
676
677        let ret = read.read(&mut buf);
678
679        assert_eq!(ret.unwrap(), 3);
680        assert_eq!(buf[0], 1);
681        assert_eq!(buf[1], 2);
682        assert_eq!(buf[2], 3);
683    }
684
685    #[test]
686    fn into_input_stream() {
687        let b = Bytes::from_owned(vec![1, 2, 3]);
688        let stream = MemoryInputStream::from_bytes(&b);
689        let stream_clone = stream.clone();
690        let stream = stream.into_read().into_input_stream();
691
692        assert_eq!(stream, stream_clone);
693    }
694}