1use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7 stream::Stream,
8 task::{Context, Poll},
9 Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18pub trait PollableOutputStreamExtManual: IsA<PollableOutputStream> {
19 #[doc(alias = "g_pollable_output_stream_create_source")]
20 fn create_source<F, C>(
21 &self,
22 cancellable: Option<&C>,
23 name: Option<&str>,
24 priority: glib::Priority,
25 func: F,
26 ) -> glib::Source
27 where
28 F: FnMut(&Self) -> glib::ControlFlow + 'static,
29 C: IsA<Cancellable>,
30 {
31 unsafe extern "C" fn trampoline<
32 O: IsA<PollableOutputStream>,
33 F: FnMut(&O) -> glib::ControlFlow + 'static,
34 >(
35 stream: *mut ffi::GPollableOutputStream,
36 func: glib::ffi::gpointer,
37 ) -> glib::ffi::gboolean {
38 let func: &RefCell<F> = &*(func as *const RefCell<F>);
39 let mut func = func.borrow_mut();
40 (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
41 }
42 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
43 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
44 }
45 let cancellable = cancellable.map(|c| c.as_ref());
46 let gcancellable = cancellable.to_glib_none();
47 unsafe {
48 let source = ffi::g_pollable_output_stream_create_source(
49 self.as_ref().to_glib_none().0,
50 gcancellable.0,
51 );
52
53 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
54 glib::ffi::g_source_set_callback(
55 source,
56 Some(transmute::<
57 glib::ffi::gpointer,
58 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
59 >(trampoline)),
60 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
61 Some(destroy_closure::<F>),
62 );
63 glib::ffi::g_source_set_priority(source, priority.into_glib());
64
65 if let Some(name) = name {
66 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
67 }
68
69 from_glib_full(source)
70 }
71 }
72
73 fn create_source_future<C: IsA<Cancellable>>(
74 &self,
75 cancellable: Option<&C>,
76 priority: glib::Priority,
77 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
78 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
79
80 let obj = self.clone();
81 Box::pin(glib::SourceFuture::new(move |send| {
82 let mut send = Some(send);
83 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
84 let _ = send.take().unwrap().send(());
85 glib::ControlFlow::Break
86 })
87 }))
88 }
89
90 fn create_source_stream<C: IsA<Cancellable>>(
91 &self,
92 cancellable: Option<&C>,
93 priority: glib::Priority,
94 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
95 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
96
97 let obj = self.clone();
98 Box::pin(glib::SourceStream::new(move |send| {
99 let send = Some(send);
100 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
101 if send.as_ref().unwrap().unbounded_send(()).is_err() {
102 glib::ControlFlow::Break
103 } else {
104 glib::ControlFlow::Continue
105 }
106 })
107 }))
108 }
109
110 #[cfg(feature = "v2_60")]
111 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
112 #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
113 fn writev_nonblocking(
114 &self,
115 vectors: &[OutputVector],
116 cancellable: Option<&impl IsA<Cancellable>>,
117 ) -> Result<(PollableReturn, usize), glib::Error> {
118 unsafe {
119 let mut error = std::ptr::null_mut();
120 let mut bytes_written = 0;
121
122 let ret = ffi::g_pollable_output_stream_writev_nonblocking(
123 self.as_ref().to_glib_none().0,
124 vectors.as_ptr() as *const _,
125 vectors.len(),
126 &mut bytes_written,
127 cancellable.map(|p| p.as_ref()).to_glib_none().0,
128 &mut error,
129 );
130 if error.is_null() {
131 Ok((from_glib(ret), bytes_written))
132 } else {
133 Err(from_glib_full(error))
134 }
135 }
136 }
137
138 fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
139 where
140 Self: IsA<PollableOutputStream>,
141 {
142 if self.can_poll() {
143 Ok(OutputStreamAsyncWrite(self, None))
144 } else {
145 Err(self)
146 }
147 }
148}
149
150impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
151
152#[derive(Debug)]
153pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
154 T,
155 Option<oneshot::Receiver<Result<(), glib::Error>>>,
156);
157
158impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
159 pub fn into_output_stream(self) -> T {
160 self.0
161 }
162
163 pub fn output_stream(&self) -> &T {
164 &self.0
165 }
166}
167
168impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
169 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
170 let stream = Pin::get_ref(self.as_ref());
171 let gio_result = stream
172 .0
173 .as_ref()
174 .write_nonblocking(buf, crate::Cancellable::NONE);
175
176 match gio_result {
177 Ok(size) => Poll::Ready(Ok(size as usize)),
178 Err(err) => {
179 let kind = err
180 .kind::<crate::IOErrorEnum>()
181 .unwrap_or(crate::IOErrorEnum::Failed);
182 if kind == crate::IOErrorEnum::WouldBlock {
183 let mut waker = Some(cx.waker().clone());
184 let source = stream.0.as_ref().create_source(
185 crate::Cancellable::NONE,
186 None,
187 glib::Priority::default(),
188 move |_| {
189 if let Some(waker) = waker.take() {
190 waker.wake();
191 }
192 glib::ControlFlow::Break
193 },
194 );
195 let main_context = glib::MainContext::ref_thread_default();
196 source.attach(Some(&main_context));
197
198 Poll::Pending
199 } else {
200 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
201 }
202 }
203 }
204 }
205
206 #[cfg(feature = "v2_60")]
207 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
208 fn poll_write_vectored(
209 self: Pin<&mut Self>,
210 cx: &mut Context<'_>,
211 bufs: &[io::IoSlice<'_>],
212 ) -> Poll<io::Result<usize>> {
213 let stream = Pin::get_ref(self.as_ref());
214 let vectors = bufs
215 .iter()
216 .map(|v| OutputVector::new(v))
217 .collect::<smallvec::SmallVec<[_; 2]>>();
218 let gio_result = stream
219 .0
220 .as_ref()
221 .writev_nonblocking(&vectors, crate::Cancellable::NONE);
222
223 match gio_result {
224 Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
225 Ok((PollableReturn::WouldBlock, _)) => {
226 let mut waker = Some(cx.waker().clone());
227 let source = stream.0.as_ref().create_source(
228 crate::Cancellable::NONE,
229 None,
230 glib::Priority::default(),
231 move |_| {
232 if let Some(waker) = waker.take() {
233 waker.wake();
234 }
235 glib::ControlFlow::Break
236 },
237 );
238 let main_context = glib::MainContext::ref_thread_default();
239 source.attach(Some(&main_context));
240
241 Poll::Pending
242 }
243 Ok((_, _)) => unreachable!(),
244 Err(err) => Poll::Ready(Err(io::Error::new(
245 io::ErrorKind::from(
246 err.kind::<crate::IOErrorEnum>()
247 .unwrap_or(crate::IOErrorEnum::Failed),
248 ),
249 err,
250 ))),
251 }
252 }
253
254 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
255 let stream = unsafe { Pin::get_unchecked_mut(self) };
256
257 let rx = if let Some(ref mut rx) = stream.1 {
258 rx
259 } else {
260 let (tx, rx) = oneshot::channel();
261 stream.0.as_ref().flush_async(
262 glib::Priority::default(),
263 crate::Cancellable::NONE,
264 move |res| {
265 let _ = tx.send(res);
266 },
267 );
268
269 stream.1 = Some(rx);
270 stream.1.as_mut().unwrap()
271 };
272
273 match Pin::new(rx).poll(cx) {
274 Poll::Ready(Ok(res)) => {
275 let _ = stream.1.take();
276 Poll::Ready(to_std_io_result(res))
277 }
278 Poll::Ready(Err(_)) => {
279 let _ = stream.1.take();
280 Poll::Ready(Ok(()))
281 }
282 Poll::Pending => Poll::Pending,
283 }
284 }
285
286 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
287 let stream = unsafe { Pin::get_unchecked_mut(self) };
288
289 let rx = if let Some(ref mut rx) = stream.1 {
290 rx
291 } else {
292 let (tx, rx) = oneshot::channel();
293 stream.0.as_ref().close_async(
294 glib::Priority::default(),
295 crate::Cancellable::NONE,
296 move |res| {
297 let _ = tx.send(res);
298 },
299 );
300
301 stream.1 = Some(rx);
302 stream.1.as_mut().unwrap()
303 };
304
305 match Pin::new(rx).poll(cx) {
306 Poll::Ready(Ok(res)) => {
307 let _ = stream.1.take();
308 Poll::Ready(to_std_io_result(res))
309 }
310 Poll::Ready(Err(_)) => {
311 let _ = stream.1.take();
312 Poll::Ready(Ok(()))
313 }
314 Poll::Pending => Poll::Pending,
315 }
316 }
317}