terrazzo_client/
signal.rs

1//! Reactive signals
2
3use std::sync::Mutex;
4
5use scopeguard::defer;
6
7use self::batch::Batch;
8use self::batch::NotBatched;
9use self::depth::Depth;
10use self::producers::producer::ProducedValue;
11use self::producers::producer::Producer;
12use self::version::Version;
13use super::string::XString;
14use crate::debug_correlation_id::DebugCorrelationId;
15use crate::prelude::Consumers;
16use crate::prelude::OrElseLog as _;
17use crate::prelude::diagnostics::debug;
18use crate::prelude::diagnostics::debug_span;
19use crate::prelude::diagnostics::trace;
20use crate::utils::Ptr;
21
22pub mod batch;
23pub mod depth;
24pub mod derive;
25pub mod mutable_signal;
26mod producers;
27pub mod reactive_closure;
28mod tests;
29mod version;
30pub mod weak;
31
32use self::inner::XSignalInner;
33
34/// A mutable value that callbacks can subscribe to.
35///
36/// - Derived signals
37/// - ReactiveClosures re-compute and update HTML nodes when signals change
38pub struct XSignal<T>(Ptr<XSignalInner<T>>);
39
40mod inner {
41    use std::ops::Deref;
42    use std::sync::Mutex;
43
44    use super::ProducedSignal;
45    use super::XSignal;
46    use super::XSignalValue;
47    use super::producers::producer::Producer;
48    use crate::utils::Ptr;
49
50    pub struct XSignalInner<T> {
51        pub(super) current_value: Mutex<XSignalValue<T>>,
52        pub(super) producer: Producer<ProducedSignal>,
53        pub(super) immutable_value: Ptr<Mutex<Option<T>>>,
54        pub(super) on_drop: Mutex<Vec<Box<dyn FnOnce()>>>,
55    }
56
57    impl<T> Deref for XSignal<T> {
58        type Target = XSignalInner<T>;
59        fn deref(&self) -> &Self::Target {
60            &self.0
61        }
62    }
63}
64
65pub struct ProducedSignal;
66
67impl ProducedValue for ProducedSignal {
68    /// The SortKey is [Depth], ensuring that parent nodes are recomputed before child nodes.
69    type SortKey = Depth;
70    type Value = Version;
71}
72
73#[derive(Clone, Debug)]
74struct XSignalValue<T> {
75    value: Option<T>,
76    version: Version,
77}
78
79impl<T> XSignalValue<T> {
80    fn value(&self) -> &T {
81        self.value
82            .as_ref()
83            .or_throw("Value should never be null until dropped")
84    }
85
86    fn value_mut(&mut self) -> &mut T {
87        self.value
88            .as_mut()
89            .or_throw("Value should never be null until dropped")
90    }
91}
92
93impl<T> XSignal<T> {
94    /// Create a new signal.
95    ///
96    /// The name of the signal is used in console logs, does not have to be unique.
97    pub fn new(name: impl Into<XString>, value: T) -> Self {
98        Self(Ptr::new(XSignalInner {
99            current_value: Mutex::new(XSignalValue {
100                value: Some(value),
101                version: Version::current(),
102            }),
103            producer: Producer::new(name.into()),
104            immutable_value: Ptr::default(),
105            on_drop: Mutex::new(vec![]),
106        }))
107    }
108
109    /// Registers a callback that will trigger when the signal is updated.
110    #[must_use]
111    pub fn add_subscriber(&self, closure: impl Fn(T) + 'static) -> Consumers
112    where
113        T: Clone + 'static,
114    {
115        use std::sync::atomic::AtomicUsize;
116        use std::sync::atomic::Ordering::SeqCst;
117        let weak = self.downgrade();
118        let last_version = AtomicUsize::new(0);
119        let closure = move |version: Version| {
120            let Some(this) = weak.upgrade() else { return };
121            let version = version.number();
122            let last_version = last_version.swap(version, SeqCst);
123            if last_version < version {
124                defer!(trace!("End"));
125                trace!(last_version, version, "Start");
126                closure(this.get_value_untracked())
127            } else {
128                debug!(last_version, version, "Skip");
129            }
130        };
131        Consumers(vec![self.producer.register(
132            DebugCorrelationId::new(|| "[closure]".into()),
133            Depth::zero(),
134            closure,
135        )])
136    }
137
138    /// Gets the current value of the signal.
139    ///
140    /// Reactive behavior should use [XSignal::add_subscriber].
141    pub fn get_value_untracked(&self) -> T
142    where
143        T: Clone,
144    {
145        self.current_value
146            .lock()
147            .or_throw("get_value_untracked()")
148            .value()
149            .clone()
150    }
151}
152
153impl<T> Clone for XSignal<T> {
154    fn clone(&self) -> Self {
155        Self(self.0.clone())
156    }
157}
158
159impl<T: std::fmt::Debug + 'static> XSignal<T> {
160    /// Updates the signal by setting a new value.
161    pub fn set(&self, new_value: impl Into<T>)
162    where
163        T: Eq,
164    {
165        let _span = debug_span!("Set", signal = %self.producer.name()).entered();
166        self.update_impl(|old_value| {
167            let new_value = new_value.into();
168            (new_value != *old_value).then_some(new_value)
169        });
170    }
171
172    /// Updates the signal by computing a new value from the old one.
173    pub fn update<R, U>(&self, compute: impl FnOnce(&T) -> U) -> R
174    where
175        U: Into<UpdateSignalResult<Option<T>, R>>,
176    {
177        let _span = debug_span!("Update", signal = %self.producer.name()).entered();
178        self.update_impl(|t| compute(t))
179    }
180
181    /// Updates the signal by computing a new value from the old one.
182    ///
183    /// The old value is mutable and can be reused to compute the new value.
184    pub fn update_mut<R, U>(&self, compute: impl FnOnce(&mut T) -> U) -> R
185    where
186        U: Into<UpdateSignalResult<T, R>>,
187    {
188        let _span = debug_span!("Update mut", signal = %self.producer.name()).entered();
189        self.update_impl(|t| {
190            let UpdateSignalResult { new_value, result } = compute(t).into();
191            UpdateSignalResult {
192                new_value: Some(new_value),
193                result,
194            }
195        })
196    }
197
198    fn update_impl<R, U>(&self, compute: impl FnOnce(&mut T) -> U) -> R
199    where
200        U: Into<UpdateSignalResult<Option<T>, R>>,
201    {
202        let (version, result) = {
203            let mut current = self.current_value.lock().or_throw("current_value");
204            let current_version = current.version.number();
205            let current_value = current.value_mut();
206            let UpdateSignalResult { new_value, result } = compute(current_value).into();
207            let Some(new_value) = new_value else {
208                debug! { "Signal value is not changing current@{current_version}={current_value:?}" };
209                return result;
210            };
211            let new_version: Version = Version::next();
212            debug!(
213                "Signal value has changed old@{}={current_value:?} vs new@{}={new_value:?}",
214                current_version,
215                new_version.number()
216            );
217            current.version = new_version;
218            current.value = Some(new_value);
219            (new_version, result)
220        };
221        self.process_or_batch(version);
222        return result;
223    }
224
225    /// Updates the signal by setting a new value.
226    ///
227    /// Contrary to [XSignal::set], the signal triggers even if the value didn't change.
228    pub fn force(&self, new_value: impl Into<T>) {
229        let _span = debug_span!("Force", signal = %self.producer.name()).entered();
230        let new_value = new_value.into();
231        let version = {
232            let mut current = self.current_value.lock().or_throw("current_value");
233            current.value = Some(new_value);
234            current.version = Version::next();
235            debug! { "Signal value was forced to version:{} value:{:?}", current.version.number(), current.value() };
236            current.version
237        };
238        self.process_or_batch(version);
239    }
240
241    fn process_or_batch(&self, version: Version) {
242        Batch::try_push(|| {
243            let this = self.to_owned();
244            trace!("Update is batched");
245            move |version| this.process(version)
246        })
247        .unwrap_or_else(|NotBatched { .. }| {
248            trace!("Update is applied immediately");
249            self.process(version)
250        });
251    }
252
253    fn process(&self, version: Version) {
254        self.producer.process(version);
255    }
256}
257
258/// A struct that represents the result of [updating a signal].
259///
260/// By default, updating a signal means assigning some new value and returning `()`.
261///
262/// [updating a signal]: XSignal::update
263pub struct UpdateSignalResult<T, R> {
264    pub new_value: T,
265    pub result: R,
266}
267
268impl<T> From<T> for UpdateSignalResult<T, ()> {
269    fn from(new_value: T) -> Self {
270        Self {
271            new_value,
272            result: (),
273        }
274    }
275}
276
277/// A shortcut to run some update logic on a signal and return a non-void value.
278///
279/// ```
280/// # use terrazzo_client::prelude::*;
281/// let signal = XSignal::new("signal", "1".to_owned());
282/// let new = signal.update(|old| {
283///     let old = old.parse::<i32>().unwrap();
284///     let new = old + 1;
285///     return Some(new.to_string()).and_return(new);
286/// });
287///
288/// // We got the updated value as an integer while the signal contains a string.
289/// assert_eq!(new, 2);
290/// ```
291pub trait UpdateAndReturn {
292    type NewValue;
293    fn and_return<R>(self, result: R) -> UpdateSignalResult<Self::NewValue, R>;
294}
295
296impl<T> UpdateAndReturn for T {
297    type NewValue = T;
298
299    fn and_return<R>(self, result: R) -> UpdateSignalResult<Self::NewValue, R> {
300        UpdateSignalResult {
301            new_value: self,
302            result,
303        }
304    }
305}
306
307impl<T> Drop for XSignalInner<T> {
308    fn drop(&mut self) {
309        debug!(signal = %self.producer.name(), "Dropped");
310        if Ptr::strong_count(&self.immutable_value) > 1 {
311            let mut immutable_value = self.immutable_value.lock().or_throw("immutable_value");
312            *immutable_value = self.current_value.lock().or_throw("current").value.take();
313        }
314        let mut on_drop = self.on_drop.lock().or_throw("on_drop");
315        for on_drop in std::mem::take(&mut *on_drop) {
316            on_drop()
317        }
318    }
319}
320
321impl<T: std::fmt::Debug> std::fmt::Debug for XSignal<T> {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        f.debug_tuple("XSignal")
324            .field(self.current_value.lock().or_throw("current_value").value())
325            .finish()
326    }
327}
328
329/// Safe because Javascript is single-threaded.
330unsafe impl<T: Send> Send for XSignal<T> {}
331
332/// Safe because Javascript is single-threaded.
333unsafe impl<T: Sync> Sync for XSignal<T> {}