gio/
output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{io, mem, pin::Pin, ptr};
4
5use glib::{prelude::*, translate::*, Priority};
6
7#[cfg(feature = "v2_60")]
8use crate::OutputVector;
9use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, OutputStream, Seekable};
10
11pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
12    #[doc(alias = "g_output_stream_write_async")]
13    fn write_async<
14        B: AsRef<[u8]> + Send + 'static,
15        Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
16        C: IsA<Cancellable>,
17    >(
18        &self,
19        buffer: B,
20        io_priority: Priority,
21        cancellable: Option<&C>,
22        callback: Q,
23    ) {
24        let main_context = glib::MainContext::ref_thread_default();
25        let is_main_context_owner = main_context.is_owner();
26        let has_acquired_main_context = (!is_main_context_owner)
27            .then(|| main_context.acquire().ok())
28            .flatten();
29        assert!(
30            is_main_context_owner || has_acquired_main_context.is_some(),
31            "Async operations only allowed if the thread is owning the MainContext"
32        );
33
34        let cancellable = cancellable.map(|c| c.as_ref());
35        let gcancellable = cancellable.to_glib_none();
36        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
37            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
38        // Need to do this after boxing as the contents pointer might change by moving into the box
39        let (count, buffer_ptr) = {
40            let buffer = &user_data.1;
41            let slice = buffer.as_ref();
42            (slice.len(), slice.as_ptr())
43        };
44        unsafe extern "C" fn write_async_trampoline<
45            B: AsRef<[u8]> + Send + 'static,
46            Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + 'static,
47        >(
48            _source_object: *mut glib::gobject_ffi::GObject,
49            res: *mut ffi::GAsyncResult,
50            user_data: glib::ffi::gpointer,
51        ) {
52            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
53                Box::from_raw(user_data as *mut _);
54            let (callback, buffer) = *user_data;
55            let callback = callback.into_inner();
56
57            let mut error = ptr::null_mut();
58            let ret = ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
59            let result = if error.is_null() {
60                Ok((buffer, ret as usize))
61            } else {
62                Err((buffer, from_glib_full(error)))
63            };
64            callback(result);
65        }
66        let callback = write_async_trampoline::<B, Q>;
67        unsafe {
68            ffi::g_output_stream_write_async(
69                self.as_ref().to_glib_none().0,
70                mut_override(buffer_ptr),
71                count,
72                io_priority.into_glib(),
73                gcancellable.0,
74                Some(callback),
75                Box::into_raw(user_data) as *mut _,
76            );
77        }
78    }
79
80    #[doc(alias = "g_output_stream_write_all")]
81    fn write_all<C: IsA<Cancellable>>(
82        &self,
83        buffer: &[u8],
84        cancellable: Option<&C>,
85    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
86        let cancellable = cancellable.map(|c| c.as_ref());
87        let gcancellable = cancellable.to_glib_none();
88        let count = buffer.len();
89        unsafe {
90            let mut bytes_written = mem::MaybeUninit::uninit();
91            let mut error = ptr::null_mut();
92            let _ = ffi::g_output_stream_write_all(
93                self.as_ref().to_glib_none().0,
94                buffer.to_glib_none().0,
95                count,
96                bytes_written.as_mut_ptr(),
97                gcancellable.0,
98                &mut error,
99            );
100
101            let bytes_written = bytes_written.assume_init();
102            if error.is_null() {
103                Ok((bytes_written, None))
104            } else if bytes_written != 0 {
105                Ok((bytes_written, Some(from_glib_full(error))))
106            } else {
107                Err(from_glib_full(error))
108            }
109        }
110    }
111
112    #[doc(alias = "g_output_stream_write_all_async")]
113    fn write_all_async<
114        B: AsRef<[u8]> + Send + 'static,
115        Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
116        C: IsA<Cancellable>,
117    >(
118        &self,
119        buffer: B,
120        io_priority: Priority,
121        cancellable: Option<&C>,
122        callback: Q,
123    ) {
124        let main_context = glib::MainContext::ref_thread_default();
125        let is_main_context_owner = main_context.is_owner();
126        let has_acquired_main_context = (!is_main_context_owner)
127            .then(|| main_context.acquire().ok())
128            .flatten();
129        assert!(
130            is_main_context_owner || has_acquired_main_context.is_some(),
131            "Async operations only allowed if the thread is owning the MainContext"
132        );
133
134        let cancellable = cancellable.map(|c| c.as_ref());
135        let gcancellable = cancellable.to_glib_none();
136        let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
137            Box::new((glib::thread_guard::ThreadGuard::new(callback), buffer));
138        // Need to do this after boxing as the contents pointer might change by moving into the box
139        let (count, buffer_ptr) = {
140            let buffer = &user_data.1;
141            let slice = buffer.as_ref();
142            (slice.len(), slice.as_ptr())
143        };
144        unsafe extern "C" fn write_all_async_trampoline<
145            B: AsRef<[u8]> + Send + 'static,
146            Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
147        >(
148            _source_object: *mut glib::gobject_ffi::GObject,
149            res: *mut ffi::GAsyncResult,
150            user_data: glib::ffi::gpointer,
151        ) {
152            let user_data: Box<(glib::thread_guard::ThreadGuard<Q>, B)> =
153                Box::from_raw(user_data as *mut _);
154            let (callback, buffer) = *user_data;
155            let callback = callback.into_inner();
156
157            let mut error = ptr::null_mut();
158            let mut bytes_written = mem::MaybeUninit::uninit();
159            let _ = ffi::g_output_stream_write_all_finish(
160                _source_object as *mut _,
161                res,
162                bytes_written.as_mut_ptr(),
163                &mut error,
164            );
165            let bytes_written = bytes_written.assume_init();
166            let result = if error.is_null() {
167                Ok((buffer, bytes_written, None))
168            } else if bytes_written != 0 {
169                Ok((buffer, bytes_written, from_glib_full(error)))
170            } else {
171                Err((buffer, from_glib_full(error)))
172            };
173            callback(result);
174        }
175        let callback = write_all_async_trampoline::<B, Q>;
176        unsafe {
177            ffi::g_output_stream_write_all_async(
178                self.as_ref().to_glib_none().0,
179                mut_override(buffer_ptr),
180                count,
181                io_priority.into_glib(),
182                gcancellable.0,
183                Some(callback),
184                Box::into_raw(user_data) as *mut _,
185            );
186        }
187    }
188
189    fn write_future<B: AsRef<[u8]> + Send + 'static>(
190        &self,
191        buffer: B,
192        io_priority: Priority,
193    ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
194    {
195        Box::pin(crate::GioFuture::new(
196            self,
197            move |obj, cancellable, send| {
198                obj.write_async(buffer, io_priority, Some(cancellable), move |res| {
199                    send.resolve(res);
200                });
201            },
202        ))
203    }
204
205    fn write_all_future<B: AsRef<[u8]> + Send + 'static>(
206        &self,
207        buffer: B,
208        io_priority: Priority,
209    ) -> Pin<
210        Box<
211            dyn std::future::Future<
212                    Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
213                > + 'static,
214        >,
215    > {
216        Box::pin(crate::GioFuture::new(
217            self,
218            move |obj, cancellable, send| {
219                obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| {
220                    send.resolve(res);
221                });
222            },
223        ))
224    }
225
226    #[cfg(feature = "v2_60")]
227    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
228    #[doc(alias = "g_output_stream_writev")]
229    fn writev(
230        &self,
231        vectors: &[OutputVector],
232        cancellable: Option<&impl IsA<Cancellable>>,
233    ) -> Result<usize, glib::Error> {
234        unsafe {
235            let mut error = ptr::null_mut();
236            let mut bytes_written = mem::MaybeUninit::uninit();
237
238            ffi::g_output_stream_writev(
239                self.as_ref().to_glib_none().0,
240                vectors.as_ptr() as *const _,
241                vectors.len(),
242                bytes_written.as_mut_ptr(),
243                cancellable.map(|p| p.as_ref()).to_glib_none().0,
244                &mut error,
245            );
246            if error.is_null() {
247                Ok(bytes_written.assume_init())
248            } else {
249                Err(from_glib_full(error))
250            }
251        }
252    }
253
254    #[cfg(feature = "v2_60")]
255    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
256    #[doc(alias = "g_output_stream_writev_async")]
257    fn writev_async<
258        B: AsRef<[u8]> + Send + 'static,
259        P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
260    >(
261        &self,
262        vectors: impl IntoIterator<Item = B> + 'static,
263        io_priority: glib::Priority,
264        cancellable: Option<&impl IsA<Cancellable>>,
265        callback: P,
266    ) {
267        let main_context = glib::MainContext::ref_thread_default();
268        let is_main_context_owner = main_context.is_owner();
269        let has_acquired_main_context = (!is_main_context_owner)
270            .then(|| main_context.acquire().ok())
271            .flatten();
272        assert!(
273            is_main_context_owner || has_acquired_main_context.is_some(),
274            "Async operations only allowed if the thread is owning the MainContext"
275        );
276
277        let cancellable = cancellable.map(|c| c.as_ref());
278        let gcancellable = cancellable.to_glib_none();
279        let buffers = vectors.into_iter().collect::<Vec<_>>();
280        let vectors = buffers
281            .iter()
282            .map(|v| ffi::GOutputVector {
283                buffer: v.as_ref().as_ptr() as *const _,
284                size: v.as_ref().len(),
285            })
286            .collect::<Vec<_>>();
287        let vectors_ptr = vectors.as_ptr();
288        let num_vectors = vectors.len();
289        let user_data: Box<(
290            glib::thread_guard::ThreadGuard<P>,
291            Vec<B>,
292            Vec<ffi::GOutputVector>,
293        )> = Box::new((
294            glib::thread_guard::ThreadGuard::new(callback),
295            buffers,
296            vectors,
297        ));
298
299        unsafe extern "C" fn writev_async_trampoline<
300            B: AsRef<[u8]> + Send + 'static,
301            P: FnOnce(Result<(Vec<B>, usize), (Vec<B>, glib::Error)>) + 'static,
302        >(
303            _source_object: *mut glib::gobject_ffi::GObject,
304            res: *mut ffi::GAsyncResult,
305            user_data: glib::ffi::gpointer,
306        ) {
307            let user_data: Box<(
308                glib::thread_guard::ThreadGuard<P>,
309                Vec<B>,
310                Vec<ffi::GOutputVector>,
311            )> = Box::from_raw(user_data as *mut _);
312            let (callback, buffers, _) = *user_data;
313            let callback = callback.into_inner();
314
315            let mut error = ptr::null_mut();
316            let mut bytes_written = mem::MaybeUninit::uninit();
317            ffi::g_output_stream_writev_finish(
318                _source_object as *mut _,
319                res,
320                bytes_written.as_mut_ptr(),
321                &mut error,
322            );
323            let bytes_written = bytes_written.assume_init();
324            let result = if error.is_null() {
325                Ok((buffers, bytes_written))
326            } else {
327                Err((buffers, from_glib_full(error)))
328            };
329            callback(result);
330        }
331        let callback = writev_async_trampoline::<B, P>;
332        unsafe {
333            ffi::g_output_stream_writev_async(
334                self.as_ref().to_glib_none().0,
335                vectors_ptr,
336                num_vectors,
337                io_priority.into_glib(),
338                gcancellable.0,
339                Some(callback),
340                Box::into_raw(user_data) as *mut _,
341            );
342        }
343    }
344
345    #[cfg(feature = "v2_60")]
346    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
347    fn writev_future<B: AsRef<[u8]> + Send + 'static>(
348        &self,
349        vectors: impl IntoIterator<Item = B> + 'static,
350        io_priority: glib::Priority,
351    ) -> Pin<
352        Box<
353            dyn std::future::Future<Output = Result<(Vec<B>, usize), (Vec<B>, glib::Error)>>
354                + 'static,
355        >,
356    > {
357        Box::pin(crate::GioFuture::new(
358            self,
359            move |obj, cancellable, send| {
360                obj.writev_async(vectors, io_priority, Some(cancellable), move |res| {
361                    send.resolve(res);
362                });
363            },
364        ))
365    }
366
367    #[cfg(feature = "v2_60")]
368    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
369    #[doc(alias = "g_output_stream_writev_all")]
370    fn writev_all(
371        &self,
372        vectors: &[OutputVector],
373        cancellable: Option<&impl IsA<Cancellable>>,
374    ) -> Result<(usize, Option<glib::Error>), glib::Error> {
375        unsafe {
376            let mut error = ptr::null_mut();
377            let mut bytes_written = mem::MaybeUninit::uninit();
378
379            ffi::g_output_stream_writev_all(
380                self.as_ref().to_glib_none().0,
381                mut_override(vectors.as_ptr() as *const _),
382                vectors.len(),
383                bytes_written.as_mut_ptr(),
384                cancellable.map(|p| p.as_ref()).to_glib_none().0,
385                &mut error,
386            );
387            let bytes_written = bytes_written.assume_init();
388            if error.is_null() {
389                Ok((bytes_written, None))
390            } else if bytes_written != 0 {
391                Ok((bytes_written, Some(from_glib_full(error))))
392            } else {
393                Err(from_glib_full(error))
394            }
395        }
396    }
397
398    #[cfg(feature = "v2_60")]
399    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
400    #[doc(alias = "g_output_stream_writev_all_async")]
401    fn writev_all_async<
402        B: AsRef<[u8]> + Send + 'static,
403        P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
404    >(
405        &self,
406        vectors: impl IntoIterator<Item = B> + 'static,
407        io_priority: glib::Priority,
408        cancellable: Option<&impl IsA<Cancellable>>,
409        callback: P,
410    ) {
411        let main_context = glib::MainContext::ref_thread_default();
412        let is_main_context_owner = main_context.is_owner();
413        let has_acquired_main_context = (!is_main_context_owner)
414            .then(|| main_context.acquire().ok())
415            .flatten();
416        assert!(
417            is_main_context_owner || has_acquired_main_context.is_some(),
418            "Async operations only allowed if the thread is owning the MainContext"
419        );
420
421        let cancellable = cancellable.map(|c| c.as_ref());
422        let gcancellable = cancellable.to_glib_none();
423        let buffers = vectors.into_iter().collect::<Vec<_>>();
424        let vectors = buffers
425            .iter()
426            .map(|v| ffi::GOutputVector {
427                buffer: v.as_ref().as_ptr() as *const _,
428                size: v.as_ref().len(),
429            })
430            .collect::<Vec<_>>();
431        let vectors_ptr = vectors.as_ptr();
432        let num_vectors = vectors.len();
433        let user_data: Box<(
434            glib::thread_guard::ThreadGuard<P>,
435            Vec<B>,
436            Vec<ffi::GOutputVector>,
437        )> = Box::new((
438            glib::thread_guard::ThreadGuard::new(callback),
439            buffers,
440            vectors,
441        ));
442
443        unsafe extern "C" fn writev_all_async_trampoline<
444            B: AsRef<[u8]> + Send + 'static,
445            P: FnOnce(Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>) + 'static,
446        >(
447            _source_object: *mut glib::gobject_ffi::GObject,
448            res: *mut ffi::GAsyncResult,
449            user_data: glib::ffi::gpointer,
450        ) {
451            let user_data: Box<(
452                glib::thread_guard::ThreadGuard<P>,
453                Vec<B>,
454                Vec<ffi::GOutputVector>,
455            )> = Box::from_raw(user_data as *mut _);
456            let (callback, buffers, _) = *user_data;
457            let callback = callback.into_inner();
458
459            let mut error = ptr::null_mut();
460            let mut bytes_written = mem::MaybeUninit::uninit();
461            ffi::g_output_stream_writev_all_finish(
462                _source_object as *mut _,
463                res,
464                bytes_written.as_mut_ptr(),
465                &mut error,
466            );
467            let bytes_written = bytes_written.assume_init();
468            let result = if error.is_null() {
469                Ok((buffers, bytes_written, None))
470            } else if bytes_written != 0 {
471                Ok((buffers, bytes_written, from_glib_full(error)))
472            } else {
473                Err((buffers, from_glib_full(error)))
474            };
475            callback(result);
476        }
477        let callback = writev_all_async_trampoline::<B, P>;
478        unsafe {
479            ffi::g_output_stream_writev_all_async(
480                self.as_ref().to_glib_none().0,
481                mut_override(vectors_ptr),
482                num_vectors,
483                io_priority.into_glib(),
484                gcancellable.0,
485                Some(callback),
486                Box::into_raw(user_data) as *mut _,
487            );
488        }
489    }
490
491    #[cfg(feature = "v2_60")]
492    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
493    fn writev_all_future<B: AsRef<[u8]> + Send + 'static>(
494        &self,
495        vectors: impl IntoIterator<Item = B> + 'static,
496        io_priority: glib::Priority,
497    ) -> Pin<
498        Box<
499            dyn std::future::Future<
500                    Output = Result<(Vec<B>, usize, Option<glib::Error>), (Vec<B>, glib::Error)>,
501                > + 'static,
502        >,
503    > {
504        Box::pin(crate::GioFuture::new(
505            self,
506            move |obj, cancellable, send| {
507                obj.writev_all_async(vectors, io_priority, Some(cancellable), move |res| {
508                    send.resolve(res);
509                });
510            },
511        ))
512    }
513
514    fn into_write(self) -> OutputStreamWrite<Self>
515    where
516        Self: IsA<OutputStream>,
517    {
518        OutputStreamWrite(self)
519    }
520}
521
522impl<O: IsA<OutputStream>> OutputStreamExtManual for O {}
523
524#[derive(Debug)]
525pub struct OutputStreamWrite<T: IsA<OutputStream>>(T);
526
527impl<T: IsA<OutputStream>> OutputStreamWrite<T> {
528    pub fn into_output_stream(self) -> T {
529        self.0
530    }
531
532    pub fn output_stream(&self) -> &T {
533        &self.0
534    }
535}
536
537impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> {
538    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
539        let result = self
540            .0
541            .as_ref()
542            .write(buf, crate::Cancellable::NONE)
543            .map(|size| size as usize);
544        to_std_io_result(result)
545    }
546
547    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
548        let result = self
549            .0
550            .as_ref()
551            .write_all(buf, crate::Cancellable::NONE)
552            .and_then(|(_, e)| e.map(Err).unwrap_or(Ok(())));
553        to_std_io_result(result)
554    }
555
556    #[cfg(feature = "v2_60")]
557    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
558    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
559        let vectors = bufs
560            .iter()
561            .map(|v| OutputVector::new(v))
562            .collect::<smallvec::SmallVec<[_; 2]>>();
563        let result = self.0.as_ref().writev(&vectors, crate::Cancellable::NONE);
564        to_std_io_result(result)
565    }
566
567    fn flush(&mut self) -> io::Result<()> {
568        let gio_result = self.0.as_ref().flush(crate::Cancellable::NONE);
569        to_std_io_result(gio_result)
570    }
571}
572
573impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> {
574    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
575        let (pos, type_) = match pos {
576            io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
577            io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
578            io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
579        };
580        let seekable: &Seekable = self.0.as_ref();
581        let gio_result = seekable
582            .seek(pos, type_, crate::Cancellable::NONE)
583            .map(|_| seekable.tell() as u64);
584        to_std_io_result(gio_result)
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use std::io::Write;
591
592    use glib::Bytes;
593
594    #[cfg(feature = "v2_60")]
595    use crate::OutputVector;
596    use crate::{prelude::*, test_util::run_async, MemoryInputStream, MemoryOutputStream};
597
598    #[test]
599    fn splice_async() {
600        let ret = run_async(|tx, l| {
601            let input = MemoryInputStream::new();
602            let b = Bytes::from_owned(vec![1, 2, 3]);
603            input.add_bytes(&b);
604
605            let strm = MemoryOutputStream::new_resizable();
606            strm.splice_async(
607                &input,
608                crate::OutputStreamSpliceFlags::CLOSE_SOURCE,
609                glib::Priority::DEFAULT_IDLE,
610                crate::Cancellable::NONE,
611                move |ret| {
612                    tx.send(ret).unwrap();
613                    l.quit();
614                },
615            );
616        });
617
618        assert_eq!(ret.unwrap(), 3);
619    }
620
621    #[test]
622    fn write_async() {
623        let ret = run_async(|tx, l| {
624            let strm = MemoryOutputStream::new_resizable();
625
626            let buf = vec![1, 2, 3];
627            strm.write_async(
628                buf,
629                glib::Priority::DEFAULT_IDLE,
630                crate::Cancellable::NONE,
631                move |ret| {
632                    tx.send(ret).unwrap();
633                    l.quit();
634                },
635            );
636        });
637
638        let (buf, size) = ret.unwrap();
639        assert_eq!(buf, vec![1, 2, 3]);
640        assert_eq!(size, 3);
641    }
642
643    #[test]
644    fn write_all_async() {
645        let ret = run_async(|tx, l| {
646            let strm = MemoryOutputStream::new_resizable();
647
648            let buf = vec![1, 2, 3];
649            strm.write_all_async(
650                buf,
651                glib::Priority::DEFAULT_IDLE,
652                crate::Cancellable::NONE,
653                move |ret| {
654                    tx.send(ret).unwrap();
655                    l.quit();
656                },
657            );
658        });
659
660        let (buf, size, err) = ret.unwrap();
661        assert_eq!(buf, vec![1, 2, 3]);
662        assert_eq!(size, 3);
663        assert!(err.is_none());
664    }
665
666    #[test]
667    fn write_bytes_async() {
668        let ret = run_async(|tx, l| {
669            let strm = MemoryOutputStream::new_resizable();
670
671            let b = Bytes::from_owned(vec![1, 2, 3]);
672            strm.write_bytes_async(
673                &b,
674                glib::Priority::DEFAULT_IDLE,
675                crate::Cancellable::NONE,
676                move |ret| {
677                    tx.send(ret).unwrap();
678                    l.quit();
679                },
680            );
681        });
682
683        assert_eq!(ret.unwrap(), 3);
684    }
685
686    #[test]
687    fn std_io_write() {
688        let b = Bytes::from_owned(vec![1, 2, 3]);
689        let mut write = MemoryOutputStream::new_resizable().into_write();
690
691        let ret = write.write(&b);
692
693        let stream = write.into_output_stream();
694        stream.close(crate::Cancellable::NONE).unwrap();
695        assert_eq!(ret.unwrap(), 3);
696        assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref());
697    }
698
699    #[test]
700    fn into_output_stream() {
701        let stream = MemoryOutputStream::new_resizable();
702        let stream_clone = stream.clone();
703        let stream = stream.into_write().into_output_stream();
704
705        assert_eq!(stream, stream_clone);
706    }
707
708    #[test]
709    #[cfg(feature = "v2_60")]
710    fn writev() {
711        let stream = MemoryOutputStream::new_resizable();
712
713        let ret = stream.writev(
714            &[OutputVector::new(&[1, 2, 3]), OutputVector::new(&[4, 5, 6])],
715            crate::Cancellable::NONE,
716        );
717        assert_eq!(ret.unwrap(), 6);
718        stream.close(crate::Cancellable::NONE).unwrap();
719        assert_eq!(stream.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
720    }
721
722    #[test]
723    #[cfg(feature = "v2_60")]
724    fn writev_async() {
725        let ret = run_async(|tx, l| {
726            let strm = MemoryOutputStream::new_resizable();
727
728            let strm_clone = strm.clone();
729            strm.writev_async(
730                [vec![1, 2, 3], vec![4, 5, 6]],
731                glib::Priority::DEFAULT_IDLE,
732                crate::Cancellable::NONE,
733                move |ret| {
734                    tx.send(ret).unwrap();
735                    strm_clone.close(crate::Cancellable::NONE).unwrap();
736                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
737                    l.quit();
738                },
739            );
740        });
741
742        let (buf, size) = ret.unwrap();
743        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
744        assert_eq!(size, 6);
745    }
746
747    #[test]
748    #[cfg(feature = "v2_60")]
749    fn writev_all_async() {
750        let ret = run_async(|tx, l| {
751            let strm = MemoryOutputStream::new_resizable();
752
753            let strm_clone = strm.clone();
754            strm.writev_all_async(
755                [vec![1, 2, 3], vec![4, 5, 6]],
756                glib::Priority::DEFAULT_IDLE,
757                crate::Cancellable::NONE,
758                move |ret| {
759                    tx.send(ret).unwrap();
760                    strm_clone.close(crate::Cancellable::NONE).unwrap();
761                    assert_eq!(strm_clone.steal_as_bytes(), [1, 2, 3, 4, 5, 6].as_ref());
762                    l.quit();
763                },
764            );
765        });
766
767        let (buf, size, err) = ret.unwrap();
768        assert_eq!(buf, [[1, 2, 3], [4, 5, 6]]);
769        assert_eq!(size, 6);
770        assert!(err.is_none());
771    }
772}