Skip to main content

gio/
input_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{Priority, prelude::*, translate::*};
8
9use crate::{Cancellable, InputStream, Seekable, error::to_std_io_result, ffi, prelude::*};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12    #[doc(alias = "g_input_stream_read")]
13    fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
14        &self,
15        mut buffer: B,
16        cancellable: Option<&C>,
17    ) -> Result<usize, glib::Error> {
18        let cancellable = cancellable.map(|c| c.as_ref());
19        let gcancellable = cancellable.to_glib_none();
20        let buffer = buffer.as_mut();
21        let buffer_ptr = buffer.as_mut_ptr();
22        let count = buffer.len();
23        unsafe {
24            let mut error = ptr::null_mut();
25            let ret = ffi::g_input_stream_read(
26                self.as_ref().to_glib_none().0,
27                buffer_ptr,
28                count,
29                gcancellable.0,
30                &mut error,
31            );
32            if error.is_null() {
33                Ok(ret as usize)
34            } else {
35                Err(from_glib_full(error))
36            }
37        }
38    }
39
40    #[doc(alias = "g_input_stream_read_all")]
41    fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
42        &self,
43        mut buffer: B,
44        cancellable: Option<&C>,
45    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
46        let cancellable = cancellable.map(|c| c.as_ref());
47        let gcancellable = cancellable.to_glib_none();
48        let buffer = buffer.as_mut();
49        let buffer_ptr = buffer.as_mut_ptr();
50        let count = buffer.len();
51        unsafe {
52            let mut bytes_read = mem::MaybeUninit::uninit();
53            let mut error = ptr::null_mut();
54            let _ = ffi::g_input_stream_read_all(
55                self.as_ref().to_glib_none().0,
56                buffer_ptr,
57                count,
58                bytes_read.as_mut_ptr(),
59                gcancellable.0,
60                &mut error,
61            );
62
63            let bytes_read = bytes_read.assume_init();
64            if error.is_null() {
65                Ok((bytes_read, None))
66            } else if bytes_read != 0 {
67                Ok((bytes_read, Some(from_glib_full(error))))
68            } else {
69                Err(from_glib_full(error))
70            }
71        }
72    }
73
74    #[doc(alias = "g_input_stream_read_all_async")]
75    fn read_all_async<
76        B: AsMut<[u8]> + Send + 'static,
77        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
78        C: IsA<Cancellable>,
79    >(
80        &self,
81        buffer: B,
82        io_priority: Priority,
83        cancellable: Option<&C>,
84        callback: Q,
85    ) {
86        let main_context = glib::MainContext::ref_thread_default();
87        let is_main_context_owner = main_context.is_owner();
88        let has_acquired_main_context = (!is_main_context_owner)
89            .then(|| main_context.acquire().ok())
90            .flatten();
91        assert!(
92            is_main_context_owner || has_acquired_main_context.is_some(),
93            "Async operations only allowed if the thread is owning the MainContext"
94        );
95
96        let cancellable = cancellable.map(|c| c.as_ref());
97        let gcancellable = cancellable.to_glib_none();
98        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
99            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
100        // Need to do this after boxing as the contents pointer might change by moving into the box
101        let (count, buffer_ptr) = {
102            let buffer = &mut user_data.1;
103            let slice = (*buffer).as_mut();
104            (slice.len(), slice.as_mut_ptr())
105        };
106        unsafe extern "C" fn read_all_async_trampoline<
107            B: AsMut<[u8]> + Send + 'static,
108            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
109        >(
110            _source_object: *mut glib::gobject_ffi::GObject,
111            res: *mut ffi::GAsyncResult,
112            user_data: glib::ffi::gpointer,
113        ) {
114            unsafe {
115                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
116                    Box::from_raw(user_data as *mut _);
117                let (callback, buffer) = *user_data;
118                let callback = callback.into_inner();
119
120                let mut error = ptr::null_mut();
121                let mut bytes_read = mem::MaybeUninit::uninit();
122                let _ = ffi::g_input_stream_read_all_finish(
123                    _source_object as *mut _,
124                    res,
125                    bytes_read.as_mut_ptr(),
126                    &mut error,
127                );
128
129                let bytes_read = bytes_read.assume_init();
130                let result = if error.is_null() {
131                    Ok((buffer, bytes_read, None))
132                } else if bytes_read != 0 {
133                    Ok((buffer, bytes_read, Some(from_glib_full(error))))
134                } else {
135                    Err((buffer, from_glib_full(error)))
136                };
137
138                callback(result);
139            }
140        }
141        let callback = read_all_async_trampoline::<B, Q>;
142        unsafe {
143            ffi::g_input_stream_read_all_async(
144                self.as_ref().to_glib_none().0,
145                buffer_ptr,
146                count,
147                io_priority.into_glib(),
148                gcancellable.0,
149                Some(callback),
150                Box::into_raw(user_data) as *mut _,
151            );
152        }
153    }
154
155    #[doc(alias = "g_input_stream_read_async")]
156    fn read_async<
157        B: AsMut<[u8]> + Send + 'static,
158        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
159        C: IsA<Cancellable>,
160    >(
161        &self,
162        buffer: B,
163        io_priority: Priority,
164        cancellable: Option<&C>,
165        callback: Q,
166    ) {
167        let main_context = glib::MainContext::ref_thread_default();
168        let is_main_context_owner = main_context.is_owner();
169        let has_acquired_main_context = (!is_main_context_owner)
170            .then(|| main_context.acquire().ok())
171            .flatten();
172        assert!(
173            is_main_context_owner || has_acquired_main_context.is_some(),
174            "Async operations only allowed if the thread is owning the MainContext"
175        );
176
177        let cancellable = cancellable.map(|c| c.as_ref());
178        let gcancellable = cancellable.to_glib_none();
179        let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
180            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
181        // Need to do this after boxing as the contents pointer might change by moving into the box
182        let (count, buffer_ptr) = {
183            let buffer = &mut user_data.1;
184            let slice = (*buffer).as_mut();
185            (slice.len(), slice.as_mut_ptr())
186        };
187        unsafe extern "C" fn read_async_trampoline<
188            B: AsMut<[u8]> + Send + 'static,
189            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
190        >(
191            _source_object: *mut glib::gobject_ffi::GObject,
192            res: *mut ffi::GAsyncResult,
193            user_data: glib::ffi::gpointer,
194        ) {
195            unsafe {
196                let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
197                    Box::from_raw(user_data as *mut _);
198                let (callback, buffer) = *user_data;
199                let callback = callback.into_inner();
200
201                let mut error = ptr::null_mut();
202                let ret =
203                    ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
204
205                let result = if error.is_null() {
206                    Ok((buffer, ret as usize))
207                } else {
208                    Err((buffer, from_glib_full(error)))
209                };
210
211                callback(result);
212            }
213        }
214        let callback = read_async_trampoline::<B, Q>;
215        unsafe {
216            ffi::g_input_stream_read_async(
217                self.as_ref().to_glib_none().0,
218                buffer_ptr,
219                count,
220                io_priority.into_glib(),
221                gcancellable.0,
222                Some(callback),
223                Box::into_raw(user_data) as *mut _,
224            );
225        }
226    }
227
228    fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
229        &self,
230        buffer: B,
231        io_priority: Priority,
232    ) -> Pin<
233        Box<
234            dyn std::future::Future<
235                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
236                > + 'static,
237        >,
238    > {
239        Box::pin(crate::GioFuture::new(
240            self,
241            move |obj, cancellable, send| {
242                obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
243                    send.resolve(res);
244                });
245            },
246        ))
247    }
248
249    fn read_future<B: AsMut<[u8]> + Send + 'static>(
250        &self,
251        buffer: B,
252        io_priority: Priority,
253    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
254    {
255        Box::pin(crate::GioFuture::new(
256            self,
257            move |obj, cancellable, send| {
258                obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
259                    send.resolve(res);
260                });
261            },
262        ))
263    }
264
265    fn into_read(self) -> InputStreamRead<Self>
266    where
267        Self: IsA<InputStream>,
268    {
269        InputStreamRead(self)
270    }
271
272    fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
273    where
274        Self: IsA<InputStream>,
275    {
276        InputStreamAsyncBufRead::new(self, buffer_size)
277    }
278}
279
280impl<O: IsA<InputStream>> InputStreamExtManual for O {}
281
282#[derive(Debug)]
283pub struct InputStreamRead<T: IsA<InputStream>>(T);
284
285impl<T: IsA<InputStream>> InputStreamRead<T> {
286    pub fn into_input_stream(self) -> T {
287        self.0
288    }
289
290    pub fn input_stream(&self) -> &T {
291        &self.0
292    }
293}
294
295impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
296    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
297        let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
298        to_std_io_result(gio_result)
299    }
300}
301
302impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
303    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
304        let (pos, type_) = match pos {
305            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
306            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
307            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
308        };
309        let seekable: &Seekable = self.0.as_ref();
310        let gio_result = seekable
311            .seek(pos, type_, crate::Cancellable::NONE)
312            .map(|_| seekable.tell() as u64);
313        to_std_io_result(gio_result)
314    }
315}
316
317enum State {
318    Waiting {
319        buffer: Vec<u8>,
320    },
321    Transitioning,
322    Reading {
323        pending: Pin<
324            Box<
325                dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
326                    + 'static,
327            >,
328        >,
329    },
330    HasData {
331        buffer: Vec<u8>,
332        valid: (usize, usize), // first index is inclusive, second is exclusive
333    },
334    Failed(crate::IOErrorEnum),
335}
336
337impl State {
338    fn into_buffer(self) -> Vec<u8> {
339        match self {
340            State::Waiting { buffer } => buffer,
341            _ => panic!("Invalid state"),
342        }
343    }
344
345    #[doc(alias = "get_pending")]
346    fn pending(
347        &mut self,
348    ) -> &mut Pin<
349        Box<
350            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
351                + 'static,
352        >,
353    > {
354        match self {
355            State::Reading { pending } => pending,
356            _ => panic!("Invalid state"),
357        }
358    }
359}
360pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
361    stream: T,
362    state: State,
363}
364
365impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
366    pub fn into_input_stream(self) -> T {
367        self.stream
368    }
369
370    pub fn input_stream(&self) -> &T {
371        &self.stream
372    }
373
374    fn new(stream: T, buffer_size: usize) -> Self {
375        let buffer = vec![0; buffer_size];
376
377        Self {
378            stream,
379            state: State::Waiting { buffer },
380        }
381    }
382    fn set_reading(
383        &mut self,
384    ) -> &mut Pin<
385        Box<
386            dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
387                + 'static,
388        >,
389    > {
390        match self.state {
391            State::Waiting { .. } => {
392                let waiting = mem::replace(&mut self.state, State::Transitioning);
393                let buffer = waiting.into_buffer();
394                let pending = self.input_stream().read_future(buffer, Priority::default());
395                self.state = State::Reading { pending };
396            }
397            State::Reading { .. } => {}
398            _ => panic!("Invalid state"),
399        };
400
401        self.state.pending()
402    }
403
404    #[doc(alias = "get_data")]
405    fn data(&self) -> Poll<io::Result<&[u8]>> {
406        if let State::HasData {
407            ref buffer,
408            valid: (i, j),
409        } = self.state
410        {
411            return Poll::Ready(Ok(&buffer[i..j]));
412        }
413        panic!("Invalid state")
414    }
415
416    fn set_waiting(&mut self, buffer: Vec<u8>) {
417        match self.state {
418            State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
419            _ => panic!("Invalid state"),
420        }
421    }
422
423    fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
424        match self.state {
425            State::Reading { .. } | State::Transitioning => {
426                self.state = State::HasData { buffer, valid }
427            }
428            _ => panic!("Invalid state"),
429        }
430    }
431
432    fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
433        match self.state {
434            State::Failed(kind) => Poll::Ready(Err(io::Error::new(
435                io::ErrorKind::from(kind),
436                BufReadError::Failed,
437            ))),
438            State::HasData { .. } => self.data(),
439            State::Transitioning => panic!("Invalid state"),
440            State::Waiting { .. } | State::Reading { .. } => {
441                let pending = self.set_reading();
442                match Pin::new(pending).poll(cx) {
443                    Poll::Ready(Ok((buffer, res))) => {
444                        if res == 0 {
445                            self.set_waiting(buffer);
446                            Poll::Ready(Ok(&[]))
447                        } else {
448                            self.set_has_data(buffer, (0, res));
449                            self.data()
450                        }
451                    }
452                    Poll::Ready(Err((_, err))) => {
453                        let kind = err
454                            .kind::<crate::IOErrorEnum>()
455                            .unwrap_or(crate::IOErrorEnum::Failed);
456                        self.state = State::Failed(kind);
457                        Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
458                    }
459                    Poll::Pending => Poll::Pending,
460                }
461            }
462        }
463    }
464
465    fn consume(&mut self, amt: usize) {
466        if amt == 0 {
467            return;
468        }
469
470        if let State::HasData { .. } = self.state {
471            let has_data = mem::replace(&mut self.state, State::Transitioning);
472            if let State::HasData {
473                buffer,
474                valid: (i, j),
475            } = has_data
476            {
477                let available = j - i;
478                if amt > available {
479                    panic!("Cannot consume {amt} bytes as only {available} are available",)
480                }
481                let remaining = available - amt;
482                if remaining == 0 {
483                    return self.set_waiting(buffer);
484                } else {
485                    return self.set_has_data(buffer, (i + amt, j));
486                }
487            }
488        }
489
490        panic!("Invalid state")
491    }
492}
493
494#[derive(Debug)]
495enum BufReadError {
496    Failed,
497}
498
499impl std::error::Error for BufReadError {}
500
501impl fmt::Display for BufReadError {
502    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
503        match self {
504            Self::Failed => fmt.write_str("Previous read operation failed"),
505        }
506    }
507}
508
509impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
510    fn poll_read(
511        self: Pin<&mut Self>,
512        cx: &mut Context,
513        out_buf: &mut [u8],
514    ) -> Poll<io::Result<usize>> {
515        let reader = self.get_mut();
516        let poll = reader.poll_fill_buf(cx);
517
518        let poll = poll.map_ok(|buffer| {
519            let copied = buffer.len().min(out_buf.len());
520            out_buf[..copied].copy_from_slice(&buffer[..copied]);
521            copied
522        });
523
524        if let Poll::Ready(Ok(consumed)) = poll {
525            reader.consume(consumed);
526        }
527        poll
528    }
529}
530
531impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
532    fn poll_fill_buf(
533        self: Pin<&mut Self>,
534        cx: &mut Context,
535    ) -> Poll<Result<&[u8], futures_io::Error>> {
536        self.get_mut().poll_fill_buf(cx)
537    }
538
539    fn consume(self: Pin<&mut Self>, amt: usize) {
540        self.get_mut().consume(amt);
541    }
542}
543
544impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
545
546#[cfg(test)]
547mod tests {
548    use std::io::Read;
549
550    use glib::Bytes;
551
552    use crate::{MemoryInputStream, prelude::*, test_util::run_async};
553
554    #[test]
555    fn read_all_async() {
556        let ret = run_async(|tx, l| {
557            let b = Bytes::from_owned(vec![1, 2, 3]);
558            let strm = MemoryInputStream::from_bytes(&b);
559
560            let buf = vec![0; 10];
561            strm.read_all_async(
562                buf,
563                glib::Priority::DEFAULT_IDLE,
564                crate::Cancellable::NONE,
565                move |ret| {
566                    tx.send(ret).unwrap();
567                    l.quit();
568                },
569            );
570        });
571
572        let (buf, count, err) = ret.unwrap();
573        assert_eq!(count, 3);
574        assert!(err.is_none());
575        assert_eq!(buf[0], 1);
576        assert_eq!(buf[1], 2);
577        assert_eq!(buf[2], 3);
578    }
579
580    #[test]
581    fn read_all() {
582        let b = Bytes::from_owned(vec![1, 2, 3]);
583        let strm = MemoryInputStream::from_bytes(&b);
584        let mut buf = vec![0; 10];
585
586        let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
587
588        assert_eq!(ret.0, 3);
589        assert!(ret.1.is_none());
590        assert_eq!(buf[0], 1);
591        assert_eq!(buf[1], 2);
592        assert_eq!(buf[2], 3);
593    }
594
595    #[test]
596    fn read() {
597        let b = Bytes::from_owned(vec![1, 2, 3]);
598        let strm = MemoryInputStream::from_bytes(&b);
599        let mut buf = vec![0; 10];
600
601        let ret = strm.read(&mut buf, crate::Cancellable::NONE);
602
603        assert_eq!(ret.unwrap(), 3);
604        assert_eq!(buf[0], 1);
605        assert_eq!(buf[1], 2);
606        assert_eq!(buf[2], 3);
607    }
608
609    #[test]
610    fn read_async() {
611        let ret = run_async(|tx, l| {
612            let b = Bytes::from_owned(vec![1, 2, 3]);
613            let strm = MemoryInputStream::from_bytes(&b);
614
615            let buf = vec![0; 10];
616            strm.read_async(
617                buf,
618                glib::Priority::DEFAULT_IDLE,
619                crate::Cancellable::NONE,
620                move |ret| {
621                    tx.send(ret).unwrap();
622                    l.quit();
623                },
624            );
625        });
626
627        let (buf, count) = ret.unwrap();
628        assert_eq!(count, 3);
629        assert_eq!(buf[0], 1);
630        assert_eq!(buf[1], 2);
631        assert_eq!(buf[2], 3);
632    }
633
634    #[test]
635    fn read_bytes_async() {
636        let ret = run_async(|tx, l| {
637            let b = Bytes::from_owned(vec![1, 2, 3]);
638            let strm = MemoryInputStream::from_bytes(&b);
639
640            strm.read_bytes_async(
641                10,
642                glib::Priority::DEFAULT_IDLE,
643                crate::Cancellable::NONE,
644                move |ret| {
645                    tx.send(ret).unwrap();
646                    l.quit();
647                },
648            );
649        });
650
651        let bytes = ret.unwrap();
652        assert_eq!(bytes, vec![1, 2, 3]);
653    }
654
655    #[test]
656    fn skip_async() {
657        let ret = run_async(|tx, l| {
658            let b = Bytes::from_owned(vec![1, 2, 3]);
659            let strm = MemoryInputStream::from_bytes(&b);
660
661            strm.skip_async(
662                10,
663                glib::Priority::DEFAULT_IDLE,
664                crate::Cancellable::NONE,
665                move |ret| {
666                    tx.send(ret).unwrap();
667                    l.quit();
668                },
669            );
670        });
671
672        let skipped = ret.unwrap();
673        assert_eq!(skipped, 3);
674    }
675
676    #[test]
677    fn std_io_read() {
678        let b = Bytes::from_owned(vec![1, 2, 3]);
679        let mut read = MemoryInputStream::from_bytes(&b).into_read();
680        let mut buf = [0u8; 10];
681
682        let ret = read.read(&mut buf);
683
684        assert_eq!(ret.unwrap(), 3);
685        assert_eq!(buf[0], 1);
686        assert_eq!(buf[1], 2);
687        assert_eq!(buf[2], 3);
688    }
689
690    #[test]
691    fn into_input_stream() {
692        let b = Bytes::from_owned(vec![1, 2, 3]);
693        let stream = MemoryInputStream::from_bytes(&b);
694        let stream_clone = stream.clone();
695        let stream = stream.into_read().into_input_stream();
696
697        assert_eq!(stream, stream_clone);
698    }
699}