gio/
pollable_output_stream.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7    stream::Stream,
8    task::{Context, Poll},
9    Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18pub trait PollableOutputStreamExtManual: IsA<PollableOutputStream> {
19    #[doc(alias = "g_pollable_output_stream_create_source")]
20    fn create_source<F, C>(
21        &self,
22        cancellable: Option<&C>,
23        name: Option<&str>,
24        priority: glib::Priority,
25        func: F,
26    ) -> glib::Source
27    where
28        F: FnMut(&Self) -> glib::ControlFlow + 'static,
29        C: IsA<Cancellable>,
30    {
31        unsafe extern "C" fn trampoline<
32            O: IsA<PollableOutputStream>,
33            F: FnMut(&O) -> glib::ControlFlow + 'static,
34        >(
35            stream: *mut ffi::GPollableOutputStream,
36            func: glib::ffi::gpointer,
37        ) -> glib::ffi::gboolean {
38            let func: &RefCell<F> = &*(func as *const RefCell<F>);
39            let mut func = func.borrow_mut();
40            (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
41        }
42        unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
43            let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
44        }
45        let cancellable = cancellable.map(|c| c.as_ref());
46        let gcancellable = cancellable.to_glib_none();
47        unsafe {
48            let source = ffi::g_pollable_output_stream_create_source(
49                self.as_ref().to_glib_none().0,
50                gcancellable.0,
51            );
52
53            let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
54            glib::ffi::g_source_set_callback(
55                source,
56                Some(transmute::<
57                    glib::ffi::gpointer,
58                    unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
59                >(trampoline)),
60                Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
61                Some(destroy_closure::<F>),
62            );
63            glib::ffi::g_source_set_priority(source, priority.into_glib());
64
65            if let Some(name) = name {
66                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
67            }
68
69            from_glib_full(source)
70        }
71    }
72
73    fn create_source_future<C: IsA<Cancellable>>(
74        &self,
75        cancellable: Option<&C>,
76        priority: glib::Priority,
77    ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
78        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
79
80        let obj = self.clone();
81        Box::pin(glib::SourceFuture::new(move |send| {
82            let mut send = Some(send);
83            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
84                let _ = send.take().unwrap().send(());
85                glib::ControlFlow::Break
86            })
87        }))
88    }
89
90    fn create_source_stream<C: IsA<Cancellable>>(
91        &self,
92        cancellable: Option<&C>,
93        priority: glib::Priority,
94    ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
95        let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
96
97        let obj = self.clone();
98        Box::pin(glib::SourceStream::new(move |send| {
99            let send = Some(send);
100            obj.create_source(cancellable.as_ref(), None, priority, move |_| {
101                if send.as_ref().unwrap().unbounded_send(()).is_err() {
102                    glib::ControlFlow::Break
103                } else {
104                    glib::ControlFlow::Continue
105                }
106            })
107        }))
108    }
109
110    #[cfg(feature = "v2_60")]
111    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
112    #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
113    fn writev_nonblocking(
114        &self,
115        vectors: &[OutputVector],
116        cancellable: Option<&impl IsA<Cancellable>>,
117    ) -> Result<(PollableReturn, usize), glib::Error> {
118        unsafe {
119            let mut error = std::ptr::null_mut();
120            let mut bytes_written = 0;
121
122            let ret = ffi::g_pollable_output_stream_writev_nonblocking(
123                self.as_ref().to_glib_none().0,
124                vectors.as_ptr() as *const _,
125                vectors.len(),
126                &mut bytes_written,
127                cancellable.map(|p| p.as_ref()).to_glib_none().0,
128                &mut error,
129            );
130            if error.is_null() {
131                Ok((from_glib(ret), bytes_written))
132            } else {
133                Err(from_glib_full(error))
134            }
135        }
136    }
137
138    fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
139    where
140        Self: IsA<PollableOutputStream>,
141    {
142        if self.can_poll() {
143            Ok(OutputStreamAsyncWrite(self, None))
144        } else {
145            Err(self)
146        }
147    }
148}
149
150impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
151
152#[derive(Debug)]
153pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
154    T,
155    Option<oneshot::Receiver<Result<(), glib::Error>>>,
156);
157
158impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
159    pub fn into_output_stream(self) -> T {
160        self.0
161    }
162
163    pub fn output_stream(&self) -> &T {
164        &self.0
165    }
166}
167
168impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
169    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
170        let stream = Pin::get_ref(self.as_ref());
171        let gio_result = stream
172            .0
173            .as_ref()
174            .write_nonblocking(buf, crate::Cancellable::NONE);
175
176        match gio_result {
177            Ok(size) => Poll::Ready(Ok(size as usize)),
178            Err(err) => {
179                let kind = err
180                    .kind::<crate::IOErrorEnum>()
181                    .unwrap_or(crate::IOErrorEnum::Failed);
182                if kind == crate::IOErrorEnum::WouldBlock {
183                    let mut waker = Some(cx.waker().clone());
184                    let source = stream.0.as_ref().create_source(
185                        crate::Cancellable::NONE,
186                        None,
187                        glib::Priority::default(),
188                        move |_| {
189                            if let Some(waker) = waker.take() {
190                                waker.wake();
191                            }
192                            glib::ControlFlow::Break
193                        },
194                    );
195                    let main_context = glib::MainContext::ref_thread_default();
196                    source.attach(Some(&main_context));
197
198                    Poll::Pending
199                } else {
200                    Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
201                }
202            }
203        }
204    }
205
206    #[cfg(feature = "v2_60")]
207    #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
208    fn poll_write_vectored(
209        self: Pin<&mut Self>,
210        cx: &mut Context<'_>,
211        bufs: &[io::IoSlice<'_>],
212    ) -> Poll<io::Result<usize>> {
213        let stream = Pin::get_ref(self.as_ref());
214        let vectors = bufs
215            .iter()
216            .map(|v| OutputVector::new(v))
217            .collect::<smallvec::SmallVec<[_; 2]>>();
218        let gio_result = stream
219            .0
220            .as_ref()
221            .writev_nonblocking(&vectors, crate::Cancellable::NONE);
222
223        match gio_result {
224            Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
225            Ok((PollableReturn::WouldBlock, _)) => {
226                let mut waker = Some(cx.waker().clone());
227                let source = stream.0.as_ref().create_source(
228                    crate::Cancellable::NONE,
229                    None,
230                    glib::Priority::default(),
231                    move |_| {
232                        if let Some(waker) = waker.take() {
233                            waker.wake();
234                        }
235                        glib::ControlFlow::Break
236                    },
237                );
238                let main_context = glib::MainContext::ref_thread_default();
239                source.attach(Some(&main_context));
240
241                Poll::Pending
242            }
243            Ok((_, _)) => unreachable!(),
244            Err(err) => Poll::Ready(Err(io::Error::new(
245                io::ErrorKind::from(
246                    err.kind::<crate::IOErrorEnum>()
247                        .unwrap_or(crate::IOErrorEnum::Failed),
248                ),
249                err,
250            ))),
251        }
252    }
253
254    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
255        let stream = unsafe { Pin::get_unchecked_mut(self) };
256
257        let rx = if let Some(ref mut rx) = stream.1 {
258            rx
259        } else {
260            let (tx, rx) = oneshot::channel();
261            stream.0.as_ref().flush_async(
262                glib::Priority::default(),
263                crate::Cancellable::NONE,
264                move |res| {
265                    let _ = tx.send(res);
266                },
267            );
268
269            stream.1 = Some(rx);
270            stream.1.as_mut().unwrap()
271        };
272
273        match Pin::new(rx).poll(cx) {
274            Poll::Ready(Ok(res)) => {
275                let _ = stream.1.take();
276                Poll::Ready(to_std_io_result(res))
277            }
278            Poll::Ready(Err(_)) => {
279                let _ = stream.1.take();
280                Poll::Ready(Ok(()))
281            }
282            Poll::Pending => Poll::Pending,
283        }
284    }
285
286    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
287        let stream = unsafe { Pin::get_unchecked_mut(self) };
288
289        let rx = if let Some(ref mut rx) = stream.1 {
290            rx
291        } else {
292            let (tx, rx) = oneshot::channel();
293            stream.0.as_ref().close_async(
294                glib::Priority::default(),
295                crate::Cancellable::NONE,
296                move |res| {
297                    let _ = tx.send(res);
298                },
299            );
300
301            stream.1 = Some(rx);
302            stream.1.as_mut().unwrap()
303        };
304
305        match Pin::new(rx).poll(cx) {
306            Poll::Ready(Ok(res)) => {
307                let _ = stream.1.take();
308                Poll::Ready(to_std_io_result(res))
309            }
310            Poll::Ready(Err(_)) => {
311                let _ = stream.1.take();
312                Poll::Ready(Ok(()))
313            }
314            Poll::Pending => Poll::Pending,
315        }
316    }
317}