1use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{prelude::*, translate::*, Priority};
8
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, InputStream, Seekable};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12 #[doc(alias = "g_input_stream_read")]
13 fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
14 &self,
15 mut buffer: B,
16 cancellable: Option<&C>,
17 ) -> Result<usize, glib::Error> {
18 let cancellable = cancellable.map(|c| c.as_ref());
19 let gcancellable = cancellable.to_glib_none();
20 let buffer = buffer.as_mut();
21 let buffer_ptr = buffer.as_mut_ptr();
22 let count = buffer.len();
23 unsafe {
24 let mut error = ptr::null_mut();
25 let ret = ffi::g_input_stream_read(
26 self.as_ref().to_glib_none().0,
27 buffer_ptr,
28 count,
29 gcancellable.0,
30 &mut error,
31 );
32 if error.is_null() {
33 Ok(ret as usize)
34 } else {
35 Err(from_glib_full(error))
36 }
37 }
38 }
39
40 #[doc(alias = "g_input_stream_read_all")]
41 fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
42 &self,
43 mut buffer: B,
44 cancellable: Option<&C>,
45 ) -> Result<(usize, Option<glib::Error>), glib::Error> {
46 let cancellable = cancellable.map(|c| c.as_ref());
47 let gcancellable = cancellable.to_glib_none();
48 let buffer = buffer.as_mut();
49 let buffer_ptr = buffer.as_mut_ptr();
50 let count = buffer.len();
51 unsafe {
52 let mut bytes_read = mem::MaybeUninit::uninit();
53 let mut error = ptr::null_mut();
54 let _ = ffi::g_input_stream_read_all(
55 self.as_ref().to_glib_none().0,
56 buffer_ptr,
57 count,
58 bytes_read.as_mut_ptr(),
59 gcancellable.0,
60 &mut error,
61 );
62
63 let bytes_read = bytes_read.assume_init();
64 if error.is_null() {
65 Ok((bytes_read, None))
66 } else if bytes_read != 0 {
67 Ok((bytes_read, Some(from_glib_full(error))))
68 } else {
69 Err(from_glib_full(error))
70 }
71 }
72 }
73
74 #[doc(alias = "g_input_stream_read_all_async")]
75 fn read_all_async<
76 B: AsMut<[u8]> + Send + 'static,
77 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
78 C: IsA<Cancellable>,
79 >(
80 &self,
81 buffer: B,
82 io_priority: Priority,
83 cancellable: Option<&C>,
84 callback: Q,
85 ) {
86 let main_context = glib::MainContext::ref_thread_default();
87 let is_main_context_owner = main_context.is_owner();
88 let has_acquired_main_context = (!is_main_context_owner)
89 .then(|| main_context.acquire().ok())
90 .flatten();
91 assert!(
92 is_main_context_owner || has_acquired_main_context.is_some(),
93 "Async operations only allowed if the thread is owning the MainContext"
94 );
95
96 let cancellable = cancellable.map(|c| c.as_ref());
97 let gcancellable = cancellable.to_glib_none();
98 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
99 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
100 let (count, buffer_ptr) = {
102 let buffer = &mut user_data.1;
103 let slice = (*buffer).as_mut();
104 (slice.len(), slice.as_mut_ptr())
105 };
106 unsafe extern "C" fn read_all_async_trampoline<
107 B: AsMut<[u8]> + Send + 'static,
108 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
109 >(
110 _source_object: *mut glib::gobject_ffi::GObject,
111 res: *mut ffi::GAsyncResult,
112 user_data: glib::ffi::gpointer,
113 ) {
114 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
115 Box::from_raw(user_data as *mut _);
116 let (callback, buffer) = *user_data;
117 let callback = callback.into_inner();
118
119 let mut error = ptr::null_mut();
120 let mut bytes_read = mem::MaybeUninit::uninit();
121 let _ = ffi::g_input_stream_read_all_finish(
122 _source_object as *mut _,
123 res,
124 bytes_read.as_mut_ptr(),
125 &mut error,
126 );
127
128 let bytes_read = bytes_read.assume_init();
129 let result = if error.is_null() {
130 Ok((buffer, bytes_read, None))
131 } else if bytes_read != 0 {
132 Ok((buffer, bytes_read, Some(from_glib_full(error))))
133 } else {
134 Err((buffer, from_glib_full(error)))
135 };
136
137 callback(result);
138 }
139 let callback = read_all_async_trampoline::<B, Q>;
140 unsafe {
141 ffi::g_input_stream_read_all_async(
142 self.as_ref().to_glib_none().0,
143 buffer_ptr,
144 count,
145 io_priority.into_glib(),
146 gcancellable.0,
147 Some(callback),
148 Box::into_raw(user_data) as *mut _,
149 );
150 }
151 }
152
153 #[doc(alias = "g_input_stream_read_async")]
154 fn read_async<
155 B: AsMut<[u8]> + Send + 'static,
156 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
157 C: IsA<Cancellable>,
158 >(
159 &self,
160 buffer: B,
161 io_priority: Priority,
162 cancellable: Option<&C>,
163 callback: Q,
164 ) {
165 let main_context = glib::MainContext::ref_thread_default();
166 let is_main_context_owner = main_context.is_owner();
167 let has_acquired_main_context = (!is_main_context_owner)
168 .then(|| main_context.acquire().ok())
169 .flatten();
170 assert!(
171 is_main_context_owner || has_acquired_main_context.is_some(),
172 "Async operations only allowed if the thread is owning the MainContext"
173 );
174
175 let cancellable = cancellable.map(|c| c.as_ref());
176 let gcancellable = cancellable.to_glib_none();
177 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
178 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
179 let (count, buffer_ptr) = {
181 let buffer = &mut user_data.1;
182 let slice = (*buffer).as_mut();
183 (slice.len(), slice.as_mut_ptr())
184 };
185 unsafe extern "C" fn read_async_trampoline<
186 B: AsMut<[u8]> + Send + 'static,
187 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
188 >(
189 _source_object: *mut glib::gobject_ffi::GObject,
190 res: *mut ffi::GAsyncResult,
191 user_data: glib::ffi::gpointer,
192 ) {
193 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
194 Box::from_raw(user_data as *mut _);
195 let (callback, buffer) = *user_data;
196 let callback = callback.into_inner();
197
198 let mut error = ptr::null_mut();
199 let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
200
201 let result = if error.is_null() {
202 Ok((buffer, ret as usize))
203 } else {
204 Err((buffer, from_glib_full(error)))
205 };
206
207 callback(result);
208 }
209 let callback = read_async_trampoline::<B, Q>;
210 unsafe {
211 ffi::g_input_stream_read_async(
212 self.as_ref().to_glib_none().0,
213 buffer_ptr,
214 count,
215 io_priority.into_glib(),
216 gcancellable.0,
217 Some(callback),
218 Box::into_raw(user_data) as *mut _,
219 );
220 }
221 }
222
223 fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
224 &self,
225 buffer: B,
226 io_priority: Priority,
227 ) -> Pin<
228 Box<
229 dyn std::future::Future<
230 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
231 > + 'static,
232 >,
233 > {
234 Box::pin(crate::GioFuture::new(
235 self,
236 move |obj, cancellable, send| {
237 obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
238 send.resolve(res);
239 });
240 },
241 ))
242 }
243
244 fn read_future<B: AsMut<[u8]> + Send + 'static>(
245 &self,
246 buffer: B,
247 io_priority: Priority,
248 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
249 {
250 Box::pin(crate::GioFuture::new(
251 self,
252 move |obj, cancellable, send| {
253 obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
254 send.resolve(res);
255 });
256 },
257 ))
258 }
259
260 fn into_read(self) -> InputStreamRead<Self>
261 where
262 Self: IsA<InputStream>,
263 {
264 InputStreamRead(self)
265 }
266
267 fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
268 where
269 Self: IsA<InputStream>,
270 {
271 InputStreamAsyncBufRead::new(self, buffer_size)
272 }
273}
274
275impl<O: IsA<InputStream>> InputStreamExtManual for O {}
276
277#[derive(Debug)]
278pub struct InputStreamRead<T: IsA<InputStream>>(T);
279
280impl<T: IsA<InputStream>> InputStreamRead<T> {
281 pub fn into_input_stream(self) -> T {
282 self.0
283 }
284
285 pub fn input_stream(&self) -> &T {
286 &self.0
287 }
288}
289
290impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
291 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
292 let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
293 to_std_io_result(gio_result)
294 }
295}
296
297impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
298 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
299 let (pos, type_) = match pos {
300 io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
301 io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
302 io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
303 };
304 let seekable: &Seekable = self.0.as_ref();
305 let gio_result = seekable
306 .seek(pos, type_, crate::Cancellable::NONE)
307 .map(|_| seekable.tell() as u64);
308 to_std_io_result(gio_result)
309 }
310}
311
312enum State {
313 Waiting {
314 buffer: Vec<u8>,
315 },
316 Transitioning,
317 Reading {
318 pending: Pin<
319 Box<
320 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
321 + 'static,
322 >,
323 >,
324 },
325 HasData {
326 buffer: Vec<u8>,
327 valid: (usize, usize), },
329 Failed(crate::IOErrorEnum),
330}
331
332impl State {
333 fn into_buffer(self) -> Vec<u8> {
334 match self {
335 State::Waiting { buffer } => buffer,
336 _ => panic!("Invalid state"),
337 }
338 }
339
340 #[doc(alias = "get_pending")]
341 fn pending(
342 &mut self,
343 ) -> &mut Pin<
344 Box<
345 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
346 + 'static,
347 >,
348 > {
349 match self {
350 State::Reading { ref mut pending } => pending,
351 _ => panic!("Invalid state"),
352 }
353 }
354}
355pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
356 stream: T,
357 state: State,
358}
359
360impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
361 pub fn into_input_stream(self) -> T {
362 self.stream
363 }
364
365 pub fn input_stream(&self) -> &T {
366 &self.stream
367 }
368
369 fn new(stream: T, buffer_size: usize) -> Self {
370 let buffer = vec![0; buffer_size];
371
372 Self {
373 stream,
374 state: State::Waiting { buffer },
375 }
376 }
377 fn set_reading(
378 &mut self,
379 ) -> &mut Pin<
380 Box<
381 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
382 + 'static,
383 >,
384 > {
385 match self.state {
386 State::Waiting { .. } => {
387 let waiting = mem::replace(&mut self.state, State::Transitioning);
388 let buffer = waiting.into_buffer();
389 let pending = self.input_stream().read_future(buffer, Priority::default());
390 self.state = State::Reading { pending };
391 }
392 State::Reading { .. } => {}
393 _ => panic!("Invalid state"),
394 };
395
396 self.state.pending()
397 }
398
399 #[doc(alias = "get_data")]
400 fn data(&self) -> Poll<io::Result<&[u8]>> {
401 if let State::HasData {
402 ref buffer,
403 valid: (i, j),
404 } = self.state
405 {
406 return Poll::Ready(Ok(&buffer[i..j]));
407 }
408 panic!("Invalid state")
409 }
410
411 fn set_waiting(&mut self, buffer: Vec<u8>) {
412 match self.state {
413 State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
414 _ => panic!("Invalid state"),
415 }
416 }
417
418 fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
419 match self.state {
420 State::Reading { .. } | State::Transitioning => {
421 self.state = State::HasData { buffer, valid }
422 }
423 _ => panic!("Invalid state"),
424 }
425 }
426
427 fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
428 match self.state {
429 State::Failed(kind) => Poll::Ready(Err(io::Error::new(
430 io::ErrorKind::from(kind),
431 BufReadError::Failed,
432 ))),
433 State::HasData { .. } => self.data(),
434 State::Transitioning => panic!("Invalid state"),
435 State::Waiting { .. } | State::Reading { .. } => {
436 let pending = self.set_reading();
437 match Pin::new(pending).poll(cx) {
438 Poll::Ready(Ok((buffer, res))) => {
439 if res == 0 {
440 self.set_waiting(buffer);
441 Poll::Ready(Ok(&[]))
442 } else {
443 self.set_has_data(buffer, (0, res));
444 self.data()
445 }
446 }
447 Poll::Ready(Err((_, err))) => {
448 let kind = err
449 .kind::<crate::IOErrorEnum>()
450 .unwrap_or(crate::IOErrorEnum::Failed);
451 self.state = State::Failed(kind);
452 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
453 }
454 Poll::Pending => Poll::Pending,
455 }
456 }
457 }
458 }
459
460 fn consume(&mut self, amt: usize) {
461 if amt == 0 {
462 return;
463 }
464
465 if let State::HasData { .. } = self.state {
466 let has_data = mem::replace(&mut self.state, State::Transitioning);
467 if let State::HasData {
468 buffer,
469 valid: (i, j),
470 } = has_data
471 {
472 let available = j - i;
473 if amt > available {
474 panic!("Cannot consume {amt} bytes as only {available} are available",)
475 }
476 let remaining = available - amt;
477 if remaining == 0 {
478 return self.set_waiting(buffer);
479 } else {
480 return self.set_has_data(buffer, (i + amt, j));
481 }
482 }
483 }
484
485 panic!("Invalid state")
486 }
487}
488
489#[derive(Debug)]
490enum BufReadError {
491 Failed,
492}
493
494impl std::error::Error for BufReadError {}
495
496impl fmt::Display for BufReadError {
497 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
498 match self {
499 Self::Failed => fmt.write_str("Previous read operation failed"),
500 }
501 }
502}
503
504impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
505 fn poll_read(
506 self: Pin<&mut Self>,
507 cx: &mut Context,
508 out_buf: &mut [u8],
509 ) -> Poll<io::Result<usize>> {
510 let reader = self.get_mut();
511 let poll = reader.poll_fill_buf(cx);
512
513 let poll = poll.map_ok(|buffer| {
514 let copied = buffer.len().min(out_buf.len());
515 out_buf[..copied].copy_from_slice(&buffer[..copied]);
516 copied
517 });
518
519 if let Poll::Ready(Ok(consumed)) = poll {
520 reader.consume(consumed);
521 }
522 poll
523 }
524}
525
526impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
527 fn poll_fill_buf(
528 self: Pin<&mut Self>,
529 cx: &mut Context,
530 ) -> Poll<Result<&[u8], futures_io::Error>> {
531 self.get_mut().poll_fill_buf(cx)
532 }
533
534 fn consume(self: Pin<&mut Self>, amt: usize) {
535 self.get_mut().consume(amt);
536 }
537}
538
539impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
540
541#[cfg(test)]
542mod tests {
543 use std::io::Read;
544
545 use glib::Bytes;
546
547 use crate::{prelude::*, test_util::run_async, MemoryInputStream};
548
549 #[test]
550 fn read_all_async() {
551 let ret = run_async(|tx, l| {
552 let b = Bytes::from_owned(vec![1, 2, 3]);
553 let strm = MemoryInputStream::from_bytes(&b);
554
555 let buf = vec![0; 10];
556 strm.read_all_async(
557 buf,
558 glib::Priority::DEFAULT_IDLE,
559 crate::Cancellable::NONE,
560 move |ret| {
561 tx.send(ret).unwrap();
562 l.quit();
563 },
564 );
565 });
566
567 let (buf, count, err) = ret.unwrap();
568 assert_eq!(count, 3);
569 assert!(err.is_none());
570 assert_eq!(buf[0], 1);
571 assert_eq!(buf[1], 2);
572 assert_eq!(buf[2], 3);
573 }
574
575 #[test]
576 fn read_all() {
577 let b = Bytes::from_owned(vec![1, 2, 3]);
578 let strm = MemoryInputStream::from_bytes(&b);
579 let mut buf = vec![0; 10];
580
581 let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
582
583 assert_eq!(ret.0, 3);
584 assert!(ret.1.is_none());
585 assert_eq!(buf[0], 1);
586 assert_eq!(buf[1], 2);
587 assert_eq!(buf[2], 3);
588 }
589
590 #[test]
591 fn read() {
592 let b = Bytes::from_owned(vec![1, 2, 3]);
593 let strm = MemoryInputStream::from_bytes(&b);
594 let mut buf = vec![0; 10];
595
596 let ret = strm.read(&mut buf, crate::Cancellable::NONE);
597
598 assert_eq!(ret.unwrap(), 3);
599 assert_eq!(buf[0], 1);
600 assert_eq!(buf[1], 2);
601 assert_eq!(buf[2], 3);
602 }
603
604 #[test]
605 fn read_async() {
606 let ret = run_async(|tx, l| {
607 let b = Bytes::from_owned(vec![1, 2, 3]);
608 let strm = MemoryInputStream::from_bytes(&b);
609
610 let buf = vec![0; 10];
611 strm.read_async(
612 buf,
613 glib::Priority::DEFAULT_IDLE,
614 crate::Cancellable::NONE,
615 move |ret| {
616 tx.send(ret).unwrap();
617 l.quit();
618 },
619 );
620 });
621
622 let (buf, count) = ret.unwrap();
623 assert_eq!(count, 3);
624 assert_eq!(buf[0], 1);
625 assert_eq!(buf[1], 2);
626 assert_eq!(buf[2], 3);
627 }
628
629 #[test]
630 fn read_bytes_async() {
631 let ret = run_async(|tx, l| {
632 let b = Bytes::from_owned(vec![1, 2, 3]);
633 let strm = MemoryInputStream::from_bytes(&b);
634
635 strm.read_bytes_async(
636 10,
637 glib::Priority::DEFAULT_IDLE,
638 crate::Cancellable::NONE,
639 move |ret| {
640 tx.send(ret).unwrap();
641 l.quit();
642 },
643 );
644 });
645
646 let bytes = ret.unwrap();
647 assert_eq!(bytes, vec![1, 2, 3]);
648 }
649
650 #[test]
651 fn skip_async() {
652 let ret = run_async(|tx, l| {
653 let b = Bytes::from_owned(vec![1, 2, 3]);
654 let strm = MemoryInputStream::from_bytes(&b);
655
656 strm.skip_async(
657 10,
658 glib::Priority::DEFAULT_IDLE,
659 crate::Cancellable::NONE,
660 move |ret| {
661 tx.send(ret).unwrap();
662 l.quit();
663 },
664 );
665 });
666
667 let skipped = ret.unwrap();
668 assert_eq!(skipped, 3);
669 }
670
671 #[test]
672 fn std_io_read() {
673 let b = Bytes::from_owned(vec![1, 2, 3]);
674 let mut read = MemoryInputStream::from_bytes(&b).into_read();
675 let mut buf = [0u8; 10];
676
677 let ret = read.read(&mut buf);
678
679 assert_eq!(ret.unwrap(), 3);
680 assert_eq!(buf[0], 1);
681 assert_eq!(buf[1], 2);
682 assert_eq!(buf[2], 3);
683 }
684
685 #[test]
686 fn into_input_stream() {
687 let b = Bytes::from_owned(vec![1, 2, 3]);
688 let stream = MemoryInputStream::from_bytes(&b);
689 let stream_clone = stream.clone();
690 let stream = stream.into_read().into_input_stream();
691
692 assert_eq!(stream, stream_clone);
693 }
694}