1use std::{cell::RefCell, io, mem::transmute, pin::Pin};
4
5use futures_channel::oneshot;
6use futures_core::{
7 stream::Stream,
8 task::{Context, Poll},
9 Future,
10};
11use futures_io::AsyncWrite;
12use glib::{prelude::*, translate::*};
13
14use crate::{error::to_std_io_result, ffi, prelude::*, Cancellable, PollableOutputStream};
15#[cfg(feature = "v2_60")]
16use crate::{OutputVector, PollableReturn};
17
18mod sealed {
19 pub trait Sealed {}
20 impl<T: super::IsA<super::PollableOutputStream>> Sealed for T {}
21}
22
23pub trait PollableOutputStreamExtManual: sealed::Sealed + IsA<PollableOutputStream> {
24 #[doc(alias = "g_pollable_output_stream_create_source")]
25 fn create_source<F, C>(
26 &self,
27 cancellable: Option<&C>,
28 name: Option<&str>,
29 priority: glib::Priority,
30 func: F,
31 ) -> glib::Source
32 where
33 F: FnMut(&Self) -> glib::ControlFlow + 'static,
34 C: IsA<Cancellable>,
35 {
36 unsafe extern "C" fn trampoline<
37 O: IsA<PollableOutputStream>,
38 F: FnMut(&O) -> glib::ControlFlow + 'static,
39 >(
40 stream: *mut ffi::GPollableOutputStream,
41 func: glib::ffi::gpointer,
42 ) -> glib::ffi::gboolean {
43 let func: &RefCell<F> = &*(func as *const RefCell<F>);
44 let mut func = func.borrow_mut();
45 (*func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()).into_glib()
46 }
47 unsafe extern "C" fn destroy_closure<F>(ptr: glib::ffi::gpointer) {
48 let _ = Box::<RefCell<F>>::from_raw(ptr as *mut _);
49 }
50 let cancellable = cancellable.map(|c| c.as_ref());
51 let gcancellable = cancellable.to_glib_none();
52 unsafe {
53 let source = ffi::g_pollable_output_stream_create_source(
54 self.as_ref().to_glib_none().0,
55 gcancellable.0,
56 );
57
58 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
59 glib::ffi::g_source_set_callback(
60 source,
61 Some(transmute::<
62 glib::ffi::gpointer,
63 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
64 >(trampoline)),
65 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
66 Some(destroy_closure::<F>),
67 );
68 glib::ffi::g_source_set_priority(source, priority.into_glib());
69
70 if let Some(name) = name {
71 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
72 }
73
74 from_glib_full(source)
75 }
76 }
77
78 fn create_source_future<C: IsA<Cancellable>>(
79 &self,
80 cancellable: Option<&C>,
81 priority: glib::Priority,
82 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
83 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
84
85 let obj = self.clone();
86 Box::pin(glib::SourceFuture::new(move |send| {
87 let mut send = Some(send);
88 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
89 let _ = send.take().unwrap().send(());
90 glib::ControlFlow::Break
91 })
92 }))
93 }
94
95 fn create_source_stream<C: IsA<Cancellable>>(
96 &self,
97 cancellable: Option<&C>,
98 priority: glib::Priority,
99 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
100 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
101
102 let obj = self.clone();
103 Box::pin(glib::SourceStream::new(move |send| {
104 let send = Some(send);
105 obj.create_source(cancellable.as_ref(), None, priority, move |_| {
106 if send.as_ref().unwrap().unbounded_send(()).is_err() {
107 glib::ControlFlow::Break
108 } else {
109 glib::ControlFlow::Continue
110 }
111 })
112 }))
113 }
114
115 #[cfg(feature = "v2_60")]
116 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
117 #[doc(alias = "g_pollable_output_stream_writev_nonblocking")]
118 fn writev_nonblocking(
119 &self,
120 vectors: &[OutputVector],
121 cancellable: Option<&impl IsA<Cancellable>>,
122 ) -> Result<(PollableReturn, usize), glib::Error> {
123 unsafe {
124 let mut error = std::ptr::null_mut();
125 let mut bytes_written = 0;
126
127 let ret = ffi::g_pollable_output_stream_writev_nonblocking(
128 self.as_ref().to_glib_none().0,
129 vectors.as_ptr() as *const _,
130 vectors.len(),
131 &mut bytes_written,
132 cancellable.map(|p| p.as_ref()).to_glib_none().0,
133 &mut error,
134 );
135 if error.is_null() {
136 Ok((from_glib(ret), bytes_written))
137 } else {
138 Err(from_glib_full(error))
139 }
140 }
141 }
142
143 fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
144 where
145 Self: IsA<PollableOutputStream>,
146 {
147 if self.can_poll() {
148 Ok(OutputStreamAsyncWrite(self, None))
149 } else {
150 Err(self)
151 }
152 }
153}
154
155impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {}
156
157#[derive(Debug)]
158pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
159 T,
160 Option<oneshot::Receiver<Result<(), glib::Error>>>,
161);
162
163impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
164 pub fn into_output_stream(self) -> T {
165 self.0
166 }
167
168 pub fn output_stream(&self) -> &T {
169 &self.0
170 }
171}
172
173impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
174 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
175 let stream = Pin::get_ref(self.as_ref());
176 let gio_result = stream
177 .0
178 .as_ref()
179 .write_nonblocking(buf, crate::Cancellable::NONE);
180
181 match gio_result {
182 Ok(size) => Poll::Ready(Ok(size as usize)),
183 Err(err) => {
184 let kind = err
185 .kind::<crate::IOErrorEnum>()
186 .unwrap_or(crate::IOErrorEnum::Failed);
187 if kind == crate::IOErrorEnum::WouldBlock {
188 let mut waker = Some(cx.waker().clone());
189 let source = stream.0.as_ref().create_source(
190 crate::Cancellable::NONE,
191 None,
192 glib::Priority::default(),
193 move |_| {
194 if let Some(waker) = waker.take() {
195 waker.wake();
196 }
197 glib::ControlFlow::Break
198 },
199 );
200 let main_context = glib::MainContext::ref_thread_default();
201 source.attach(Some(&main_context));
202
203 Poll::Pending
204 } else {
205 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
206 }
207 }
208 }
209 }
210
211 #[cfg(feature = "v2_60")]
212 #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
213 fn poll_write_vectored(
214 self: Pin<&mut Self>,
215 cx: &mut Context<'_>,
216 bufs: &[io::IoSlice<'_>],
217 ) -> Poll<io::Result<usize>> {
218 let stream = Pin::get_ref(self.as_ref());
219 let vectors = bufs
220 .iter()
221 .map(|v| OutputVector::new(v))
222 .collect::<smallvec::SmallVec<[_; 2]>>();
223 let gio_result = stream
224 .0
225 .as_ref()
226 .writev_nonblocking(&vectors, crate::Cancellable::NONE);
227
228 match gio_result {
229 Ok((PollableReturn::Ok, size)) => Poll::Ready(Ok(size)),
230 Ok((PollableReturn::WouldBlock, _)) => {
231 let mut waker = Some(cx.waker().clone());
232 let source = stream.0.as_ref().create_source(
233 crate::Cancellable::NONE,
234 None,
235 glib::Priority::default(),
236 move |_| {
237 if let Some(waker) = waker.take() {
238 waker.wake();
239 }
240 glib::ControlFlow::Break
241 },
242 );
243 let main_context = glib::MainContext::ref_thread_default();
244 source.attach(Some(&main_context));
245
246 Poll::Pending
247 }
248 Ok((_, _)) => unreachable!(),
249 Err(err) => Poll::Ready(Err(io::Error::new(
250 io::ErrorKind::from(
251 err.kind::<crate::IOErrorEnum>()
252 .unwrap_or(crate::IOErrorEnum::Failed),
253 ),
254 err,
255 ))),
256 }
257 }
258
259 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
260 let stream = unsafe { Pin::get_unchecked_mut(self) };
261
262 let rx = if let Some(ref mut rx) = stream.1 {
263 rx
264 } else {
265 let (tx, rx) = oneshot::channel();
266 stream.0.as_ref().flush_async(
267 glib::Priority::default(),
268 crate::Cancellable::NONE,
269 move |res| {
270 let _ = tx.send(res);
271 },
272 );
273
274 stream.1 = Some(rx);
275 stream.1.as_mut().unwrap()
276 };
277
278 match Pin::new(rx).poll(cx) {
279 Poll::Ready(Ok(res)) => {
280 let _ = stream.1.take();
281 Poll::Ready(to_std_io_result(res))
282 }
283 Poll::Ready(Err(_)) => {
284 let _ = stream.1.take();
285 Poll::Ready(Ok(()))
286 }
287 Poll::Pending => Poll::Pending,
288 }
289 }
290
291 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
292 let stream = unsafe { Pin::get_unchecked_mut(self) };
293
294 let rx = if let Some(ref mut rx) = stream.1 {
295 rx
296 } else {
297 let (tx, rx) = oneshot::channel();
298 stream.0.as_ref().close_async(
299 glib::Priority::default(),
300 crate::Cancellable::NONE,
301 move |res| {
302 let _ = tx.send(res);
303 },
304 );
305
306 stream.1 = Some(rx);
307 stream.1.as_mut().unwrap()
308 };
309
310 match Pin::new(rx).poll(cx) {
311 Poll::Ready(Ok(res)) => {
312 let _ = stream.1.take();
313 Poll::Ready(to_std_io_result(res))
314 }
315 Poll::Ready(Err(_)) => {
316 let _ = stream.1.take();
317 Poll::Ready(Ok(()))
318 }
319 Poll::Pending => Poll::Pending,
320 }
321 }
322}