1use std::{fmt, future::Future, io, mem, pin::Pin, ptr};
4
5use futures_core::task::{Context, Poll};
6use futures_io::{AsyncBufRead, AsyncRead};
7use glib::{Priority, prelude::*, translate::*};
8
9use crate::{Cancellable, InputStream, Seekable, error::to_std_io_result, ffi, prelude::*};
10
11pub trait InputStreamExtManual: IsA<InputStream> + Sized {
12 #[doc(alias = "g_input_stream_read")]
13 fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
14 &self,
15 mut buffer: B,
16 cancellable: Option<&C>,
17 ) -> Result<usize, glib::Error> {
18 let cancellable = cancellable.map(|c| c.as_ref());
19 let gcancellable = cancellable.to_glib_none();
20 let buffer = buffer.as_mut();
21 let buffer_ptr = buffer.as_mut_ptr();
22 let count = buffer.len();
23 unsafe {
24 let mut error = ptr::null_mut();
25 let ret = ffi::g_input_stream_read(
26 self.as_ref().to_glib_none().0,
27 buffer_ptr,
28 count,
29 gcancellable.0,
30 &mut error,
31 );
32 if error.is_null() {
33 Ok(ret as usize)
34 } else {
35 Err(from_glib_full(error))
36 }
37 }
38 }
39
40 #[doc(alias = "g_input_stream_read_all")]
41 fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
42 &self,
43 mut buffer: B,
44 cancellable: Option<&C>,
45 ) -> Result<(usize, Option<glib::Error>), glib::Error> {
46 let cancellable = cancellable.map(|c| c.as_ref());
47 let gcancellable = cancellable.to_glib_none();
48 let buffer = buffer.as_mut();
49 let buffer_ptr = buffer.as_mut_ptr();
50 let count = buffer.len();
51 unsafe {
52 let mut bytes_read = mem::MaybeUninit::uninit();
53 let mut error = ptr::null_mut();
54 let _ = ffi::g_input_stream_read_all(
55 self.as_ref().to_glib_none().0,
56 buffer_ptr,
57 count,
58 bytes_read.as_mut_ptr(),
59 gcancellable.0,
60 &mut error,
61 );
62
63 let bytes_read = bytes_read.assume_init();
64 if error.is_null() {
65 Ok((bytes_read, None))
66 } else if bytes_read != 0 {
67 Ok((bytes_read, Some(from_glib_full(error))))
68 } else {
69 Err(from_glib_full(error))
70 }
71 }
72 }
73
74 #[doc(alias = "g_input_stream_read_all_async")]
75 fn read_all_async<
76 B: AsMut<[u8]> + Send + 'static,
77 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
78 C: IsA<Cancellable>,
79 >(
80 &self,
81 buffer: B,
82 io_priority: Priority,
83 cancellable: Option<&C>,
84 callback: Q,
85 ) {
86 let main_context = glib::MainContext::ref_thread_default();
87 let is_main_context_owner = main_context.is_owner();
88 let has_acquired_main_context = (!is_main_context_owner)
89 .then(|| main_context.acquire().ok())
90 .flatten();
91 assert!(
92 is_main_context_owner || has_acquired_main_context.is_some(),
93 "Async operations only allowed if the thread is owning the MainContext"
94 );
95
96 let cancellable = cancellable.map(|c| c.as_ref());
97 let gcancellable = cancellable.to_glib_none();
98 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
99 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
100 let (count, buffer_ptr) = {
102 let buffer = &mut user_data.1;
103 let slice = (*buffer).as_mut();
104 (slice.len(), slice.as_mut_ptr())
105 };
106 unsafe extern "C" fn read_all_async_trampoline<
107 B: AsMut<[u8]> + Send + 'static,
108 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
109 >(
110 _source_object: *mut glib::gobject_ffi::GObject,
111 res: *mut ffi::GAsyncResult,
112 user_data: glib::ffi::gpointer,
113 ) {
114 unsafe {
115 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
116 Box::from_raw(user_data as *mut _);
117 let (callback, buffer) = *user_data;
118 let callback = callback.into_inner();
119
120 let mut error = ptr::null_mut();
121 let mut bytes_read = mem::MaybeUninit::uninit();
122 let _ = ffi::g_input_stream_read_all_finish(
123 _source_object as *mut _,
124 res,
125 bytes_read.as_mut_ptr(),
126 &mut error,
127 );
128
129 let bytes_read = bytes_read.assume_init();
130 let result = if error.is_null() {
131 Ok((buffer, bytes_read, None))
132 } else if bytes_read != 0 {
133 Ok((buffer, bytes_read, Some(from_glib_full(error))))
134 } else {
135 Err((buffer, from_glib_full(error)))
136 };
137
138 callback(result);
139 }
140 }
141 let callback = read_all_async_trampoline::<B, Q>;
142 unsafe {
143 ffi::g_input_stream_read_all_async(
144 self.as_ref().to_glib_none().0,
145 buffer_ptr,
146 count,
147 io_priority.into_glib(),
148 gcancellable.0,
149 Some(callback),
150 Box::into_raw(user_data) as *mut _,
151 );
152 }
153 }
154
155 #[doc(alias = "g_input_stream_read_async")]
156 fn read_async<
157 B: AsMut<[u8]> + Send + 'static,
158 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
159 C: IsA<Cancellable>,
160 >(
161 &self,
162 buffer: B,
163 io_priority: Priority,
164 cancellable: Option<&C>,
165 callback: Q,
166 ) {
167 let main_context = glib::MainContext::ref_thread_default();
168 let is_main_context_owner = main_context.is_owner();
169 let has_acquired_main_context = (!is_main_context_owner)
170 .then(|| main_context.acquire().ok())
171 .flatten();
172 assert!(
173 is_main_context_owner || has_acquired_main_context.is_some(),
174 "Async operations only allowed if the thread is owning the MainContext"
175 );
176
177 let cancellable = cancellable.map(|c| c.as_ref());
178 let gcancellable = cancellable.to_glib_none();
179 let mut user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
180 Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
181 let (count, buffer_ptr) = {
183 let buffer = &mut user_data.1;
184 let slice = (*buffer).as_mut();
185 (slice.len(), slice.as_mut_ptr())
186 };
187 unsafe extern "C" fn read_async_trampoline<
188 B: AsMut<[u8]> + Send + 'static,
189 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
190 >(
191 _source_object: *mut glib::gobject_ffi::GObject,
192 res: *mut ffi::GAsyncResult,
193 user_data: glib::ffi::gpointer,
194 ) {
195 unsafe {
196 let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
197 Box::from_raw(user_data as *mut _);
198 let (callback, buffer) = *user_data;
199 let callback = callback.into_inner();
200
201 let mut error = ptr::null_mut();
202 let ret =
203 ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
204
205 let result = if error.is_null() {
206 Ok((buffer, ret as usize))
207 } else {
208 Err((buffer, from_glib_full(error)))
209 };
210
211 callback(result);
212 }
213 }
214 let callback = read_async_trampoline::<B, Q>;
215 unsafe {
216 ffi::g_input_stream_read_async(
217 self.as_ref().to_glib_none().0,
218 buffer_ptr,
219 count,
220 io_priority.into_glib(),
221 gcancellable.0,
222 Some(callback),
223 Box::into_raw(user_data) as *mut _,
224 );
225 }
226 }
227
228 fn read_all_future<B: AsMut<[u8]> + Send + 'static>(
229 &self,
230 buffer: B,
231 io_priority: Priority,
232 ) -> Pin<
233 Box<
234 dyn std::future::Future<
235 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
236 > + 'static,
237 >,
238 > {
239 Box::pin(crate::GioFuture::new(
240 self,
241 move |obj, cancellable, send| {
242 obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
243 send.resolve(res);
244 });
245 },
246 ))
247 }
248
249 fn read_future<B: AsMut<[u8]> + Send + 'static>(
250 &self,
251 buffer: B,
252 io_priority: Priority,
253 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
254 {
255 Box::pin(crate::GioFuture::new(
256 self,
257 move |obj, cancellable, send| {
258 obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
259 send.resolve(res);
260 });
261 },
262 ))
263 }
264
265 fn into_read(self) -> InputStreamRead<Self>
266 where
267 Self: IsA<InputStream>,
268 {
269 InputStreamRead(self)
270 }
271
272 fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
273 where
274 Self: IsA<InputStream>,
275 {
276 InputStreamAsyncBufRead::new(self, buffer_size)
277 }
278}
279
280impl<O: IsA<InputStream>> InputStreamExtManual for O {}
281
282#[derive(Debug)]
283pub struct InputStreamRead<T: IsA<InputStream>>(T);
284
285impl<T: IsA<InputStream>> InputStreamRead<T> {
286 pub fn into_input_stream(self) -> T {
287 self.0
288 }
289
290 pub fn input_stream(&self) -> &T {
291 &self.0
292 }
293}
294
295impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
296 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
297 let gio_result = self.0.as_ref().read(buf, crate::Cancellable::NONE);
298 to_std_io_result(gio_result)
299 }
300}
301
302impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
303 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
304 let (pos, type_) = match pos {
305 io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
306 io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
307 io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
308 };
309 let seekable: &Seekable = self.0.as_ref();
310 let gio_result = seekable
311 .seek(pos, type_, crate::Cancellable::NONE)
312 .map(|_| seekable.tell() as u64);
313 to_std_io_result(gio_result)
314 }
315}
316
317enum State {
318 Waiting {
319 buffer: Vec<u8>,
320 },
321 Transitioning,
322 Reading {
323 pending: Pin<
324 Box<
325 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
326 + 'static,
327 >,
328 >,
329 },
330 HasData {
331 buffer: Vec<u8>,
332 valid: (usize, usize), },
334 Failed(crate::IOErrorEnum),
335}
336
337impl State {
338 fn into_buffer(self) -> Vec<u8> {
339 match self {
340 State::Waiting { buffer } => buffer,
341 _ => panic!("Invalid state"),
342 }
343 }
344
345 #[doc(alias = "get_pending")]
346 fn pending(
347 &mut self,
348 ) -> &mut Pin<
349 Box<
350 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
351 + 'static,
352 >,
353 > {
354 match self {
355 State::Reading { pending } => pending,
356 _ => panic!("Invalid state"),
357 }
358 }
359}
360pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
361 stream: T,
362 state: State,
363}
364
365impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
366 pub fn into_input_stream(self) -> T {
367 self.stream
368 }
369
370 pub fn input_stream(&self) -> &T {
371 &self.stream
372 }
373
374 fn new(stream: T, buffer_size: usize) -> Self {
375 let buffer = vec![0; buffer_size];
376
377 Self {
378 stream,
379 state: State::Waiting { buffer },
380 }
381 }
382 fn set_reading(
383 &mut self,
384 ) -> &mut Pin<
385 Box<
386 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
387 + 'static,
388 >,
389 > {
390 match self.state {
391 State::Waiting { .. } => {
392 let waiting = mem::replace(&mut self.state, State::Transitioning);
393 let buffer = waiting.into_buffer();
394 let pending = self.input_stream().read_future(buffer, Priority::default());
395 self.state = State::Reading { pending };
396 }
397 State::Reading { .. } => {}
398 _ => panic!("Invalid state"),
399 };
400
401 self.state.pending()
402 }
403
404 #[doc(alias = "get_data")]
405 fn data(&self) -> Poll<io::Result<&[u8]>> {
406 if let State::HasData {
407 ref buffer,
408 valid: (i, j),
409 } = self.state
410 {
411 return Poll::Ready(Ok(&buffer[i..j]));
412 }
413 panic!("Invalid state")
414 }
415
416 fn set_waiting(&mut self, buffer: Vec<u8>) {
417 match self.state {
418 State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
419 _ => panic!("Invalid state"),
420 }
421 }
422
423 fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
424 match self.state {
425 State::Reading { .. } | State::Transitioning => {
426 self.state = State::HasData { buffer, valid }
427 }
428 _ => panic!("Invalid state"),
429 }
430 }
431
432 fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
433 match self.state {
434 State::Failed(kind) => Poll::Ready(Err(io::Error::new(
435 io::ErrorKind::from(kind),
436 BufReadError::Failed,
437 ))),
438 State::HasData { .. } => self.data(),
439 State::Transitioning => panic!("Invalid state"),
440 State::Waiting { .. } | State::Reading { .. } => {
441 let pending = self.set_reading();
442 match Pin::new(pending).poll(cx) {
443 Poll::Ready(Ok((buffer, res))) => {
444 if res == 0 {
445 self.set_waiting(buffer);
446 Poll::Ready(Ok(&[]))
447 } else {
448 self.set_has_data(buffer, (0, res));
449 self.data()
450 }
451 }
452 Poll::Ready(Err((_, err))) => {
453 let kind = err
454 .kind::<crate::IOErrorEnum>()
455 .unwrap_or(crate::IOErrorEnum::Failed);
456 self.state = State::Failed(kind);
457 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
458 }
459 Poll::Pending => Poll::Pending,
460 }
461 }
462 }
463 }
464
465 fn consume(&mut self, amt: usize) {
466 if amt == 0 {
467 return;
468 }
469
470 if let State::HasData { .. } = self.state {
471 let has_data = mem::replace(&mut self.state, State::Transitioning);
472 if let State::HasData {
473 buffer,
474 valid: (i, j),
475 } = has_data
476 {
477 let available = j - i;
478 if amt > available {
479 panic!("Cannot consume {amt} bytes as only {available} are available",)
480 }
481 let remaining = available - amt;
482 if remaining == 0 {
483 return self.set_waiting(buffer);
484 } else {
485 return self.set_has_data(buffer, (i + amt, j));
486 }
487 }
488 }
489
490 panic!("Invalid state")
491 }
492}
493
494#[derive(Debug)]
495enum BufReadError {
496 Failed,
497}
498
499impl std::error::Error for BufReadError {}
500
501impl fmt::Display for BufReadError {
502 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
503 match self {
504 Self::Failed => fmt.write_str("Previous read operation failed"),
505 }
506 }
507}
508
509impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
510 fn poll_read(
511 self: Pin<&mut Self>,
512 cx: &mut Context,
513 out_buf: &mut [u8],
514 ) -> Poll<io::Result<usize>> {
515 let reader = self.get_mut();
516 let poll = reader.poll_fill_buf(cx);
517
518 let poll = poll.map_ok(|buffer| {
519 let copied = buffer.len().min(out_buf.len());
520 out_buf[..copied].copy_from_slice(&buffer[..copied]);
521 copied
522 });
523
524 if let Poll::Ready(Ok(consumed)) = poll {
525 reader.consume(consumed);
526 }
527 poll
528 }
529}
530
531impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
532 fn poll_fill_buf(
533 self: Pin<&mut Self>,
534 cx: &mut Context,
535 ) -> Poll<Result<&[u8], futures_io::Error>> {
536 self.get_mut().poll_fill_buf(cx)
537 }
538
539 fn consume(self: Pin<&mut Self>, amt: usize) {
540 self.get_mut().consume(amt);
541 }
542}
543
544impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
545
546#[cfg(test)]
547mod tests {
548 use std::io::Read;
549
550 use glib::Bytes;
551
552 use crate::{MemoryInputStream, prelude::*, test_util::run_async};
553
554 #[test]
555 fn read_all_async() {
556 let ret = run_async(|tx, l| {
557 let b = Bytes::from_owned(vec![1, 2, 3]);
558 let strm = MemoryInputStream::from_bytes(&b);
559
560 let buf = vec![0; 10];
561 strm.read_all_async(
562 buf,
563 glib::Priority::DEFAULT_IDLE,
564 crate::Cancellable::NONE,
565 move |ret| {
566 tx.send(ret).unwrap();
567 l.quit();
568 },
569 );
570 });
571
572 let (buf, count, err) = ret.unwrap();
573 assert_eq!(count, 3);
574 assert!(err.is_none());
575 assert_eq!(buf[0], 1);
576 assert_eq!(buf[1], 2);
577 assert_eq!(buf[2], 3);
578 }
579
580 #[test]
581 fn read_all() {
582 let b = Bytes::from_owned(vec![1, 2, 3]);
583 let strm = MemoryInputStream::from_bytes(&b);
584 let mut buf = vec![0; 10];
585
586 let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
587
588 assert_eq!(ret.0, 3);
589 assert!(ret.1.is_none());
590 assert_eq!(buf[0], 1);
591 assert_eq!(buf[1], 2);
592 assert_eq!(buf[2], 3);
593 }
594
595 #[test]
596 fn read() {
597 let b = Bytes::from_owned(vec![1, 2, 3]);
598 let strm = MemoryInputStream::from_bytes(&b);
599 let mut buf = vec![0; 10];
600
601 let ret = strm.read(&mut buf, crate::Cancellable::NONE);
602
603 assert_eq!(ret.unwrap(), 3);
604 assert_eq!(buf[0], 1);
605 assert_eq!(buf[1], 2);
606 assert_eq!(buf[2], 3);
607 }
608
609 #[test]
610 fn read_async() {
611 let ret = run_async(|tx, l| {
612 let b = Bytes::from_owned(vec![1, 2, 3]);
613 let strm = MemoryInputStream::from_bytes(&b);
614
615 let buf = vec![0; 10];
616 strm.read_async(
617 buf,
618 glib::Priority::DEFAULT_IDLE,
619 crate::Cancellable::NONE,
620 move |ret| {
621 tx.send(ret).unwrap();
622 l.quit();
623 },
624 );
625 });
626
627 let (buf, count) = ret.unwrap();
628 assert_eq!(count, 3);
629 assert_eq!(buf[0], 1);
630 assert_eq!(buf[1], 2);
631 assert_eq!(buf[2], 3);
632 }
633
634 #[test]
635 fn read_bytes_async() {
636 let ret = run_async(|tx, l| {
637 let b = Bytes::from_owned(vec![1, 2, 3]);
638 let strm = MemoryInputStream::from_bytes(&b);
639
640 strm.read_bytes_async(
641 10,
642 glib::Priority::DEFAULT_IDLE,
643 crate::Cancellable::NONE,
644 move |ret| {
645 tx.send(ret).unwrap();
646 l.quit();
647 },
648 );
649 });
650
651 let bytes = ret.unwrap();
652 assert_eq!(bytes, vec![1, 2, 3]);
653 }
654
655 #[test]
656 fn skip_async() {
657 let ret = run_async(|tx, l| {
658 let b = Bytes::from_owned(vec![1, 2, 3]);
659 let strm = MemoryInputStream::from_bytes(&b);
660
661 strm.skip_async(
662 10,
663 glib::Priority::DEFAULT_IDLE,
664 crate::Cancellable::NONE,
665 move |ret| {
666 tx.send(ret).unwrap();
667 l.quit();
668 },
669 );
670 });
671
672 let skipped = ret.unwrap();
673 assert_eq!(skipped, 3);
674 }
675
676 #[test]
677 fn std_io_read() {
678 let b = Bytes::from_owned(vec![1, 2, 3]);
679 let mut read = MemoryInputStream::from_bytes(&b).into_read();
680 let mut buf = [0u8; 10];
681
682 let ret = read.read(&mut buf);
683
684 assert_eq!(ret.unwrap(), 3);
685 assert_eq!(buf[0], 1);
686 assert_eq!(buf[1], 2);
687 assert_eq!(buf[2], 3);
688 }
689
690 #[test]
691 fn into_input_stream() {
692 let b = Bytes::from_owned(vec![1, 2, 3]);
693 let stream = MemoryInputStream::from_bytes(&b);
694 let stream_clone = stream.clone();
695 let stream = stream.into_read().into_input_stream();
696
697 assert_eq!(stream, stream_clone);
698 }
699}