terrazzo_client/
signal.rs1use std::sync::Mutex;
4
5use scopeguard::defer;
6
7use self::batch::Batch;
8use self::batch::NotBatched;
9use self::depth::Depth;
10use self::producers::producer::ProducedValue;
11use self::producers::producer::Producer;
12use self::version::Version;
13use super::string::XString;
14use crate::debug_correlation_id::DebugCorrelationId;
15use crate::prelude::Consumers;
16use crate::prelude::OrElseLog as _;
17use crate::prelude::diagnostics::debug;
18use crate::prelude::diagnostics::debug_span;
19use crate::prelude::diagnostics::trace;
20use crate::utils::Ptr;
21
22pub mod batch;
23pub mod depth;
24pub mod derive;
25pub mod mutable_signal;
26mod producers;
27pub mod reactive_closure;
28mod tests;
29mod version;
30pub mod weak;
31
32use self::inner::XSignalInner;
33
34pub struct XSignal<T>(Ptr<XSignalInner<T>>);
39
40mod inner {
41 use std::ops::Deref;
42 use std::sync::Mutex;
43
44 use super::ProducedSignal;
45 use super::XSignal;
46 use super::XSignalValue;
47 use super::producers::producer::Producer;
48 use crate::utils::Ptr;
49
50 pub struct XSignalInner<T> {
51 pub(super) current_value: Mutex<XSignalValue<T>>,
52 pub(super) producer: Producer<ProducedSignal>,
53 pub(super) immutable_value: Ptr<Mutex<Option<T>>>,
54 pub(super) on_drop: Mutex<Vec<Box<dyn FnOnce()>>>,
55 }
56
57 impl<T> Deref for XSignal<T> {
58 type Target = XSignalInner<T>;
59 fn deref(&self) -> &Self::Target {
60 &self.0
61 }
62 }
63}
64
65pub struct ProducedSignal;
66
67impl ProducedValue for ProducedSignal {
68 type SortKey = Depth;
70 type Value = Version;
71}
72
73#[derive(Clone, Debug)]
74struct XSignalValue<T> {
75 value: Option<T>,
76 version: Version,
77}
78
79impl<T> XSignalValue<T> {
80 fn value(&self) -> &T {
81 self.value
82 .as_ref()
83 .or_throw("Value should never be null until dropped")
84 }
85
86 fn value_mut(&mut self) -> &mut T {
87 self.value
88 .as_mut()
89 .or_throw("Value should never be null until dropped")
90 }
91}
92
93impl<T> XSignal<T> {
94 pub fn new(name: impl Into<XString>, value: T) -> Self {
98 Self(Ptr::new(XSignalInner {
99 current_value: Mutex::new(XSignalValue {
100 value: Some(value),
101 version: Version::current(),
102 }),
103 producer: Producer::new(name.into()),
104 immutable_value: Ptr::default(),
105 on_drop: Mutex::new(vec![]),
106 }))
107 }
108
109 #[must_use]
111 pub fn add_subscriber(&self, closure: impl Fn(T) + 'static) -> Consumers
112 where
113 T: Clone + 'static,
114 {
115 use std::sync::atomic::AtomicUsize;
116 use std::sync::atomic::Ordering::SeqCst;
117 let weak = self.downgrade();
118 let last_version = AtomicUsize::new(0);
119 let closure = move |version: Version| {
120 let Some(this) = weak.upgrade() else { return };
121 let version = version.number();
122 let last_version = last_version.swap(version, SeqCst);
123 if last_version < version {
124 defer!(trace!("End"));
125 trace!(last_version, version, "Start");
126 closure(this.get_value_untracked())
127 } else {
128 debug!(last_version, version, "Skip");
129 }
130 };
131 Consumers(vec![self.producer.register(
132 DebugCorrelationId::new(|| "[closure]".into()),
133 Depth::zero(),
134 closure,
135 )])
136 }
137
138 pub fn get_value_untracked(&self) -> T
142 where
143 T: Clone,
144 {
145 self.current_value
146 .lock()
147 .or_throw("get_value_untracked()")
148 .value()
149 .clone()
150 }
151}
152
153impl<T> Clone for XSignal<T> {
154 fn clone(&self) -> Self {
155 Self(self.0.clone())
156 }
157}
158
159impl<T: std::fmt::Debug + 'static> XSignal<T> {
160 pub fn set(&self, new_value: impl Into<T>)
162 where
163 T: Eq,
164 {
165 let _span = debug_span!("Set", signal = %self.producer.name()).entered();
166 self.update_impl(|old_value| {
167 let new_value = new_value.into();
168 (new_value != *old_value).then_some(new_value)
169 });
170 }
171
172 pub fn update<R, U>(&self, compute: impl FnOnce(&T) -> U) -> R
174 where
175 U: Into<UpdateSignalResult<Option<T>, R>>,
176 {
177 let _span = debug_span!("Update", signal = %self.producer.name()).entered();
178 self.update_impl(|t| compute(t))
179 }
180
181 pub fn update_mut<R, U>(&self, compute: impl FnOnce(&mut T) -> U) -> R
185 where
186 U: Into<UpdateSignalResult<T, R>>,
187 {
188 let _span = debug_span!("Update mut", signal = %self.producer.name()).entered();
189 self.update_impl(|t| {
190 let UpdateSignalResult { new_value, result } = compute(t).into();
191 UpdateSignalResult {
192 new_value: Some(new_value),
193 result,
194 }
195 })
196 }
197
198 fn update_impl<R, U>(&self, compute: impl FnOnce(&mut T) -> U) -> R
199 where
200 U: Into<UpdateSignalResult<Option<T>, R>>,
201 {
202 let (version, result) = {
203 let mut current = self.current_value.lock().or_throw("current_value");
204 let current_version = current.version.number();
205 let current_value = current.value_mut();
206 let UpdateSignalResult { new_value, result } = compute(current_value).into();
207 let Some(new_value) = new_value else {
208 debug! { "Signal value is not changing current@{current_version}={current_value:?}" };
209 return result;
210 };
211 let new_version: Version = Version::next();
212 debug!(
213 "Signal value has changed old@{}={current_value:?} vs new@{}={new_value:?}",
214 current_version,
215 new_version.number()
216 );
217 current.version = new_version;
218 current.value = Some(new_value);
219 (new_version, result)
220 };
221 self.process_or_batch(version);
222 return result;
223 }
224
225 pub fn force(&self, new_value: impl Into<T>) {
229 let _span = debug_span!("Force", signal = %self.producer.name()).entered();
230 let new_value = new_value.into();
231 let version = {
232 let mut current = self.current_value.lock().or_throw("current_value");
233 current.value = Some(new_value);
234 current.version = Version::next();
235 debug! { "Signal value was forced to version:{} value:{:?}", current.version.number(), current.value() };
236 current.version
237 };
238 self.process_or_batch(version);
239 }
240
241 fn process_or_batch(&self, version: Version) {
242 Batch::try_push(|| {
243 let this = self.to_owned();
244 trace!("Update is batched");
245 move |version| this.process(version)
246 })
247 .unwrap_or_else(|NotBatched { .. }| {
248 trace!("Update is applied immediately");
249 self.process(version)
250 });
251 }
252
253 fn process(&self, version: Version) {
254 self.producer.process(version);
255 }
256}
257
258pub struct UpdateSignalResult<T, R> {
264 pub new_value: T,
265 pub result: R,
266}
267
268impl<T> From<T> for UpdateSignalResult<T, ()> {
269 fn from(new_value: T) -> Self {
270 Self {
271 new_value,
272 result: (),
273 }
274 }
275}
276
277pub trait UpdateAndReturn {
292 type NewValue;
293 fn and_return<R>(self, result: R) -> UpdateSignalResult<Self::NewValue, R>;
294}
295
296impl<T> UpdateAndReturn for T {
297 type NewValue = T;
298
299 fn and_return<R>(self, result: R) -> UpdateSignalResult<Self::NewValue, R> {
300 UpdateSignalResult {
301 new_value: self,
302 result,
303 }
304 }
305}
306
307impl<T> Drop for XSignalInner<T> {
308 fn drop(&mut self) {
309 debug!(signal = %self.producer.name(), "Dropped");
310 if Ptr::strong_count(&self.immutable_value) > 1 {
311 let mut immutable_value = self.immutable_value.lock().or_throw("immutable_value");
312 *immutable_value = self.current_value.lock().or_throw("current").value.take();
313 }
314 let mut on_drop = self.on_drop.lock().or_throw("on_drop");
315 for on_drop in std::mem::take(&mut *on_drop) {
316 on_drop()
317 }
318 }
319}
320
321impl<T: std::fmt::Debug> std::fmt::Debug for XSignal<T> {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 f.debug_tuple("XSignal")
324 .field(self.current_value.lock().or_throw("current_value").value())
325 .finish()
326 }
327}
328
329unsafe impl<T: Send> Send for XSignal<T> {}
331
332unsafe impl<T: Sync> Sync for XSignal<T> {}