gio/
pollable_output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7    stream::Stream,
8    task::{Context, Poll},
9    Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18mod sealed {
19    pub trait Sealed {}
20    impl<T: super::IsA<super::PollableOutputStream>> Sealed for T {}
21}
22
23pub trait PollableOutputStreamExtManual: sealed::Sealed + IsA<PollableOutputStream> {
24    #[doc(alias = "g_pollable_output_stream_create_source")]
25    fn create_source<F, C>(
26        &self,
27        cancellable: Option<&C>,
28        name: Option<&str>,
29        priority: glib::Priority,
30        func: F,
31    ) -> glib::Source
32    where
33        F: FnMut(&Self) -> glib::ControlFlow + 'static,
34        C: IsA<Cancellable>,
35    {
36        unsafe extern "C" fn trampoline<
37            O: IsA<PollableOutputStream>,
38            F: FnMut(&O) -> glib::ControlFlow + 'static,
39        >(
40            stream: *mut ffi::GPollableOutputStream,
41            func: glib::ffi::gpointer,
42        ) -> glib::ffi::gboolean {
43            let func: &RefCell<F> = &*(func as *const RefCell<F>);
44            let mut func = func.borrow_mut();
45            (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
46        }
47        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
48            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
49        }
50        let cancellable = cancellable.map(|c| c.as_ref());
51        let gcancellable = cancellable.to_glib_none();
52        unsafe {
53            let source = ffi::g_pollable_output_stream_create_source(
54                self.as_ref().to_glib_none().0,
55                gcancellable.0,
56            );
57
58            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
59            glib::ffi::g_source_set_callback(
60                source,
61                Some(transmute::<
62                    glib::ffi::gpointer,
63                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
64                >(trampoline)),
65                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
66                Some(destroy_closure::<F>),
67            );
68            glib::ffi::g_source_set_priority(source, priority.into_glib());
69
70            if let Some(name) = name {
71                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
72            }
73
74            from_glib_full(source)
75        }
76    }
77
78    fn create_source_future<C: IsA<Cancellable>>(
79        &self,
80        cancellable: Option<&C>,
81        priority: glib::Priority,
82    ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
83        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
84
85        let obj = self.clone();
86        Box::pin(glib::SourceFuture::new(move |send| {
87            let mut send = Some(send);
88            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
89                let _ = send.take().unwrap().send(());
90                glib::ControlFlow::Break
91            })
92        }))
93    }
94
95    fn create_source_stream<C: IsA<Cancellable>>(
96        &self,
97        cancellable: Option<&C>,
98        priority: glib::Priority,
99    ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
100        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
101
102        let obj = self.clone();
103        Box::pin(glib::SourceStream::new(move |send| {
104            let send = Some(send);
105            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
106                if send.as_ref().unwrap().unbounded_send(()).is_err() {
107                    glib::ControlFlow::Break
108                } else {
109                    glib::ControlFlow::Continue
110                }
111            })
112        }))
113    }
114
115    #[cfg(feature = "v2_60")]
116    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
117    #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
118    fn writev_nonblocking(
119        &self,
120        vectors: &[OutputVector],
121        cancellable: Option<&impl IsA<Cancellable>>,
122    ) -> Result<(PollableReturn, usize), glib::Error> {
123        unsafe {
124            let mut error = std::ptr::null_mut();
125            let mut bytes_written = 0;
126
127            let ret = ffi::g_pollable_output_stream_writev_nonblocking(
128                self.as_ref().to_glib_none().0,
129                vectors.as_ptr() as *const _,
130                vectors.len(),
131                &mut bytes_written,
132                cancellable.map(|p| p.as_ref()).to_glib_none().0,
133                &mut error,
134            );
135            if error.is_null() {
136                Ok((from_glib(ret), bytes_written))
137            } else {
138                Err(from_glib_full(error))
139            }
140        }
141    }
142
143    fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
144    where
145        Self: IsA<PollableOutputStream>,
146    {
147        if self.can_poll() {
148            Ok(OutputStreamAsyncWrite(self, None))
149        } else {
150            Err(self)
151        }
152    }
153}
154
155impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
156
157#[derive(Debug)]
158pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
159    T,
160    Option<oneshot::Receiver<Result<(), glib::Error>>>,
161);
162
163impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
164    pub fn into_output_stream(self) -> T {
165        self.0
166    }
167
168    pub fn output_stream(&self) -> &T {
169        &self.0
170    }
171}
172
173impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
174    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
175        let stream = Pin::get_ref(self.as_ref());
176        let gio_result = stream
177            .0
178            .as_ref()
179            .write_nonblocking(buf, crate::Cancellable::NONE);
180
181        match gio_result {
182            Ok(size) => Poll::Ready(Ok(size as usize)),
183            Err(err) => {
184                let kind = err
185                    .kind::<crate::IOErrorEnum>()
186                    .unwrap_or(crate::IOErrorEnum::Failed);
187                if kind == crate::IOErrorEnum::WouldBlock {
188                    let mut waker = Some(cx.waker().clone());
189                    let source = stream.0.as_ref().create_source(
190                        crate::Cancellable::NONE,
191                        None,
192                        glib::Priority::default(),
193                        move |_| {
194                            if let Some(waker) = waker.take() {
195                                waker.wake();
196                            }
197                            glib::ControlFlow::Break
198                        },
199                    );
200                    let main_context = glib::MainContext::ref_thread_default();
201                    source.attach(Some(&main_context));
202
203                    Poll::Pending
204                } else {
205                    Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
206                }
207            }
208        }
209    }
210
211    #[cfg(feature = "v2_60")]
212    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
213    fn poll_write_vectored(
214        self: Pin<&mut Self>,
215        cx: &mut Context<'_>,
216        bufs: &[io::IoSlice<'_>],
217    ) -> Poll<io::Result<usize>> {
218        let stream = Pin::get_ref(self.as_ref());
219        let vectors = bufs
220            .iter()
221            .map(|v| OutputVector::new(v))
222            .collect::<smallvec::SmallVec<[_; 2]>>();
223        let gio_result = stream
224            .0
225            .as_ref()
226            .writev_nonblocking(&vectors, crate::Cancellable::NONE);
227
228        match gio_result {
229            Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
230            Ok((PollableReturn::WouldBlock, _)) => {
231                let mut waker = Some(cx.waker().clone());
232                let source = stream.0.as_ref().create_source(
233                    crate::Cancellable::NONE,
234                    None,
235                    glib::Priority::default(),
236                    move |_| {
237                        if let Some(waker) = waker.take() {
238                            waker.wake();
239                        }
240                        glib::ControlFlow::Break
241                    },
242                );
243                let main_context = glib::MainContext::ref_thread_default();
244                source.attach(Some(&main_context));
245
246                Poll::Pending
247            }
248            Ok((_, _)) => unreachable!(),
249            Err(err) => Poll::Ready(Err(io::Error::new(
250                io::ErrorKind::from(
251                    err.kind::<crate::IOErrorEnum>()
252                        .unwrap_or(crate::IOErrorEnum::Failed),
253                ),
254                err,
255            ))),
256        }
257    }
258
259    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
260        let stream = unsafe { Pin::get_unchecked_mut(self) };
261
262        let rx = if let Some(ref mut rx) = stream.1 {
263            rx
264        } else {
265            let (tx, rx) = oneshot::channel();
266            stream.0.as_ref().flush_async(
267                glib::Priority::default(),
268                crate::Cancellable::NONE,
269                move |res| {
270                    let _ = tx.send(res);
271                },
272            );
273
274            stream.1 = Some(rx);
275            stream.1.as_mut().unwrap()
276        };
277
278        match Pin::new(rx).poll(cx) {
279            Poll::Ready(Ok(res)) => {
280                let _ = stream.1.take();
281                Poll::Ready(to_std_io_result(res))
282            }
283            Poll::Ready(Err(_)) => {
284                let _ = stream.1.take();
285                Poll::Ready(Ok(()))
286            }
287            Poll::Pending => Poll::Pending,
288        }
289    }
290
291    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
292        let stream = unsafe { Pin::get_unchecked_mut(self) };
293
294        let rx = if let Some(ref mut rx) = stream.1 {
295            rx
296        } else {
297            let (tx, rx) = oneshot::channel();
298            stream.0.as_ref().close_async(
299                glib::Priority::default(),
300                crate::Cancellable::NONE,
301                move |res| {
302                    let _ = tx.send(res);
303                },
304            );
305
306            stream.1 = Some(rx);
307            stream.1.as_mut().unwrap()
308        };
309
310        match Pin::new(rx).poll(cx) {
311            Poll::Ready(Ok(res)) => {
312                let _ = stream.1.take();
313                Poll::Ready(to_std_io_result(res))
314            }
315            Poll::Ready(Err(_)) => {
316                let _ = stream.1.take();
317                Poll::Ready(Ok(()))
318            }
319            Poll::Pending => Poll::Pending,
320        }
321    }
322}