1use {
2 crate::{BorrowedObject, ConnectDetails, DetailedSignal, FromValues, ObjectSignalExt, ToValueOption},
3 futures_channel::mpsc,
4 futures_core::{ready, FusedFuture, FusedStream, Stream},
5 glib::{
6 g_warning,
7 object::{ObjectExt, ObjectType},
8 value::FromValue,
9 Closure, SignalHandlerId, Value, WeakRef,
10 },
11 std::{
12 error::Error, fmt, future::Future, hint::unreachable_unchecked, io, mem::ManuallyDrop, pin::Pin, ptr, task::Poll,
13 },
14};
15
16#[must_use]
17#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
18#[derive(Debug)]
19pub struct SignalStream<O: ObjectType, T> {
20 rx: mpsc::UnboundedReceiver<T>,
21 target: WeakRef<O>,
22 handle: Option<SignalHandlerId>,
23}
24
25impl<O: ObjectType, T> SignalStream<O, T> {
26 pub fn connect<F, S>(target: &O, signal: ConnectDetails<S>, res: F) -> Self
27 where
28 S: DetailedSignal<Arguments = T>,
29 T: for<'a> FromValues<'a> + 'static,
30 F: Fn(&O, &T) -> <<S as DetailedSignal>::Return as ToValueOption>::Type + 'static,
31 for<'a> BorrowedObject<'a, O>: FromValue<'a>,
32 {
33 let (tx, rx) = futures_channel::mpsc::unbounded();
34 let callback = move |values: &[Value]| {
35 let (this, args) = values.split_first().unwrap();
36 let this: BorrowedObject<O> = this.get().unwrap();
37 let args = FromValues::from_values(args).unwrap();
38 let res = res(&this, &args);
39 match tx.unbounded_send(args) {
40 Ok(()) => (),
41 Err(e) => {
42 g_warning!("glib-signal", "Failed to signal {:?}: {:?}", signal, e);
43 },
44 }
45 res.into().to_value_option()
46 };
47 let handle = unsafe { target.handle_closure(&signal.normalize(), &Closure::new_unsafe(callback)) }.unwrap();
48
49 SignalStream {
50 rx,
51 target: target.downgrade(),
52 handle: Some(handle),
53 }
54 }
55
56 pub fn once(self) -> OnceFuture<O, T> {
57 OnceFuture::new(self)
58 }
59
60 pub fn disconnect(&mut self) {
61 if let Some(handle) = self.handle.take() {
62 if let Some(target) = self.target.upgrade() {
63 target.disconnect(handle);
64 }
65 }
66 }
67
68 pub fn into_target(self) -> WeakRef<O> {
69 let mut this = ManuallyDrop::new(self);
70 this.disconnect();
71 unsafe { ptr::read(&this.target) }
72 }
73
74 pub fn target(&self) -> &WeakRef<O> {
75 &self.target
76 }
77
78 pub fn attach_target(self) -> SignalStreamSelf<O, T> {
79 SignalStreamSelf::from(self)
80 }
81}
82
83impl<O: ObjectType, T> Stream for SignalStream<O, T> {
84 type Item = T;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
87 let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
88 rx.poll_next(cx)
89 }
90
91 fn size_hint(&self) -> (usize, Option<usize>) {
92 self.rx.size_hint()
93 }
94}
95
96impl<O: ObjectType, T> FusedStream for SignalStream<O, T> {
97 fn is_terminated(&self) -> bool {
98 self.rx.is_terminated()
99 }
100}
101
102impl<O: ObjectType, T> Drop for SignalStream<O, T> {
103 fn drop(&mut self) {
104 self.disconnect();
105 }
106}
107
108#[derive(Debug, Copy, Clone)]
109pub struct ConnectEof;
110
111impl fmt::Display for ConnectEof {
112 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
113 write!(f, "unexpected connect handle EOF")
114 }
115}
116
117impl Error for ConnectEof {}
118
119impl From<ConnectEof> for io::Error {
120 fn from(eof: ConnectEof) -> Self {
121 io::Error::new(io::ErrorKind::UnexpectedEof, eof)
122 }
123}
124
125impl From<ConnectEof> for glib::Error {
126 fn from(eof: ConnectEof) -> Self {
127 glib::Error::new(glib::FileError::Pipe, &format!("{:?}", eof))
128 }
129}
130
131pub struct OnceFuture<O: ObjectType, T> {
132 stream: Option<SignalStream<O, T>>,
133}
134
135impl<O: ObjectType, T> OnceFuture<O, T> {
136 pub fn new(stream: SignalStream<O, T>) -> Self {
137 Self { stream: Some(stream) }
138 }
139
140 pub fn into_inner(self) -> SignalStream<O, T> {
142 self.stream.unwrap()
143 }
144}
145
146impl<O: ObjectType, T> Future for OnceFuture<O, T> {
147 type Output = Result<(T, WeakRef<O>), ConnectEof>;
148
149 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
150 let (res, stream) = unsafe {
152 let mut stream = self.map_unchecked_mut(|this| &mut this.stream);
153 let res = match stream.as_mut().as_pin_mut() {
154 Some(stream) => ready!(stream.poll_next(cx)),
155 None => return Poll::Pending,
156 };
157 (res, match stream.get_unchecked_mut().take() {
158 Some(s) => s,
159 None => unreachable_unchecked(),
160 })
161 };
162 let obj = stream.into_target();
163 Poll::Ready(match res {
164 Some(res) => Ok((res, obj)),
165 None => Err(ConnectEof),
166 })
167 }
168}
169
170impl<O: ObjectType, T> FusedFuture for OnceFuture<O, T> {
171 fn is_terminated(&self) -> bool {
172 self.stream.as_ref().map(|s| s.is_terminated()).unwrap_or(true)
173 }
174}
175
176pub struct SignalStreamSelf<O: ObjectType, T> {
177 inner: SignalStream<O, T>,
178}
179
180impl<O: ObjectType, T> From<SignalStream<O, T>> for SignalStreamSelf<O, T> {
181 fn from(inner: SignalStream<O, T>) -> Self {
182 Self { inner }
183 }
184}
185
186impl<O: ObjectType, T> Stream for SignalStreamSelf<O, T> {
187 type Item = (Option<O>, T);
188
189 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
190 let mut inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
191 Poll::Ready(ready!(inner.as_mut().poll_next(cx)).map(|res| (inner.target().upgrade(), res)))
192 }
193
194 fn size_hint(&self) -> (usize, Option<usize>) {
195 self.inner.size_hint()
196 }
197}
198
199impl<O: ObjectType, T> FusedStream for SignalStreamSelf<O, T> {
200 fn is_terminated(&self) -> bool {
201 self.inner.is_terminated()
202 }
203}