glib_signal/
signal_stream.rs

1use {
2	crate::{BorrowedObject, ConnectDetails, DetailedSignal, FromValues, ObjectSignalExt, ToValueOption},
3	futures_channel::mpsc,
4	futures_core::{ready, FusedFuture, FusedStream, Stream},
5	glib::{
6		g_warning,
7		object::{ObjectExt, ObjectType},
8		value::FromValue,
9		Closure, SignalHandlerId, Value, WeakRef,
10	},
11	std::{
12		error::Error, fmt, future::Future, hint::unreachable_unchecked, io, mem::ManuallyDrop, pin::Pin, ptr, task::Poll,
13	},
14};
15
16#[must_use]
17#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
18#[derive(Debug)]
19pub struct SignalStream<O: ObjectType, T> {
20	rx: mpsc::UnboundedReceiver<T>,
21	target: WeakRef<O>,
22	handle: Option<SignalHandlerId>,
23}
24
25impl<O: ObjectType, T> SignalStream<O, T> {
26	pub fn connect<F, S>(target: &O, signal: ConnectDetails<S>, res: F) -> Self
27	where
28		S: DetailedSignal<Arguments = T>,
29		T: for<'a> FromValues<'a> + 'static,
30		F: Fn(&O, &T) -> <<S as DetailedSignal>::Return as ToValueOption>::Type + 'static,
31		for<'a> BorrowedObject<'a, O>: FromValue<'a>,
32	{
33		let (tx, rx) = futures_channel::mpsc::unbounded();
34		let callback = move |values: &[Value]| {
35			let (this, args) = values.split_first().unwrap();
36			let this: BorrowedObject<O> = this.get().unwrap();
37			let args = FromValues::from_values(args).unwrap();
38			let res = res(&this, &args);
39			match tx.unbounded_send(args) {
40				Ok(()) => (),
41				Err(e) => {
42					g_warning!("glib-signal", "Failed to signal {:?}: {:?}", signal, e);
43				},
44			}
45			res.into().to_value_option()
46		};
47		let handle = unsafe { target.handle_closure(&signal.normalize(), &Closure::new_unsafe(callback)) }.unwrap();
48
49		SignalStream {
50			rx,
51			target: target.downgrade(),
52			handle: Some(handle),
53		}
54	}
55
56	pub fn once(self) -> OnceFuture<O, T> {
57		OnceFuture::new(self)
58	}
59
60	pub fn disconnect(&mut self) {
61		if let Some(handle) = self.handle.take() {
62			if let Some(target) = self.target.upgrade() {
63				target.disconnect(handle);
64			}
65		}
66	}
67
68	pub fn into_target(self) -> WeakRef<O> {
69		let mut this = ManuallyDrop::new(self);
70		this.disconnect();
71		unsafe { ptr::read(&this.target) }
72	}
73
74	pub fn target(&self) -> &WeakRef<O> {
75		&self.target
76	}
77
78	pub fn attach_target(self) -> SignalStreamSelf<O, T> {
79		SignalStreamSelf::from(self)
80	}
81}
82
83impl<O: ObjectType, T> Stream for SignalStream<O, T> {
84	type Item = T;
85
86	fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
87		let rx = unsafe { self.map_unchecked_mut(|s| &mut s.rx) };
88		rx.poll_next(cx)
89	}
90
91	fn size_hint(&self) -> (usize, Option<usize>) {
92		self.rx.size_hint()
93	}
94}
95
96impl<O: ObjectType, T> FusedStream for SignalStream<O, T> {
97	fn is_terminated(&self) -> bool {
98		self.rx.is_terminated()
99	}
100}
101
102impl<O: ObjectType, T> Drop for SignalStream<O, T> {
103	fn drop(&mut self) {
104		self.disconnect();
105	}
106}
107
108#[derive(Debug, Copy, Clone)]
109pub struct ConnectEof;
110
111impl fmt::Display for ConnectEof {
112	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
113		write!(f, "unexpected connect handle EOF")
114	}
115}
116
117impl Error for ConnectEof {}
118
119impl From<ConnectEof> for io::Error {
120	fn from(eof: ConnectEof) -> Self {
121		io::Error::new(io::ErrorKind::UnexpectedEof, eof)
122	}
123}
124
125impl From<ConnectEof> for glib::Error {
126	fn from(eof: ConnectEof) -> Self {
127		glib::Error::new(glib::FileError::Pipe, &format!("{:?}", eof))
128	}
129}
130
131pub struct OnceFuture<O: ObjectType, T> {
132	stream: Option<SignalStream<O, T>>,
133}
134
135impl<O: ObjectType, T> OnceFuture<O, T> {
136	pub fn new(stream: SignalStream<O, T>) -> Self {
137		Self { stream: Some(stream) }
138	}
139
140	/// check `is_terminated` first!
141	pub fn into_inner(self) -> SignalStream<O, T> {
142		self.stream.unwrap()
143	}
144}
145
146impl<O: ObjectType, T> Future for OnceFuture<O, T> {
147	type Output = Result<(T, WeakRef<O>), ConnectEof>;
148
149	fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
150		//let this = unsafe { self.get_unchecked_mut() };
151		let (res, stream) = unsafe {
152			let mut stream = self.map_unchecked_mut(|this| &mut this.stream);
153			let res = match stream.as_mut().as_pin_mut() {
154				Some(stream) => ready!(stream.poll_next(cx)),
155				None => return Poll::Pending,
156			};
157			(res, match stream.get_unchecked_mut().take() {
158				Some(s) => s,
159				None => unreachable_unchecked(),
160			})
161		};
162		let obj = stream.into_target();
163		Poll::Ready(match res {
164			Some(res) => Ok((res, obj)),
165			None => Err(ConnectEof),
166		})
167	}
168}
169
170impl<O: ObjectType, T> FusedFuture for OnceFuture<O, T> {
171	fn is_terminated(&self) -> bool {
172		self.stream.as_ref().map(|s| s.is_terminated()).unwrap_or(true)
173	}
174}
175
176pub struct SignalStreamSelf<O: ObjectType, T> {
177	inner: SignalStream<O, T>,
178}
179
180impl<O: ObjectType, T> From<SignalStream<O, T>> for SignalStreamSelf<O, T> {
181	fn from(inner: SignalStream<O, T>) -> Self {
182		Self { inner }
183	}
184}
185
186impl<O: ObjectType, T> Stream for SignalStreamSelf<O, T> {
187	type Item = (Option<O>, T);
188
189	fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
190		let mut inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
191		Poll::Ready(ready!(inner.as_mut().poll_next(cx)).map(|res| (inner.target().upgrade(), res)))
192	}
193
194	fn size_hint(&self) -> (usize, Option<usize>) {
195		self.inner.size_hint()
196	}
197}
198
199impl<O: ObjectType, T> FusedStream for SignalStreamSelf<O, T> {
200	fn is_terminated(&self) -> bool {
201		self.inner.is_terminated()
202	}
203}