terrazzo_client/signal/reactive_closure/
reactive_closure_builder.rs

1use std::panic::Location;
2use std::sync::atomic::AtomicUsize;
3
4use autoclone::autoclone;
5use scopeguard::defer;
6
7use super::ReactiveClosure;
8use crate::debug_correlation_id::DebugCorrelationId;
9use crate::prelude::OrElseLog as _;
10use crate::prelude::diagnostics::debug;
11use crate::prelude::diagnostics::debug_span;
12use crate::prelude::diagnostics::trace;
13use crate::signal::ProducedSignal;
14use crate::signal::XSignal;
15use crate::signal::producers::consumer::Consumer;
16use crate::signal::producers::producer::Producer;
17use crate::signal::version::Version;
18use crate::string::XString;
19use crate::template::IsTemplate;
20use crate::utils::Ptr;
21
22/// A builder for ReactiveClosure.
23///
24/// The closure initially takes multiple parameters that must be bound to signals until we are left with a `Fn()`.
25#[must_use]
26pub struct ReactiveClosureBuilder<F> {
27    /// A name, for troubleshooting purposes only.
28    name: DebugCorrelationId<XString>,
29
30    /// The closure
31    reactive_closure: F,
32
33    /// The list of signals that the final closure needs to subscribe to.
34    producers: Vec<Producer<ProducedSignal>>,
35}
36
37/// Binds the parameter of a reactive closure to a signal.
38///
39/// This is mainly used by `#[template]` code-generated code.
40pub trait BindReactiveClosure<F, BF, I, O>
41where
42    F: Fn() -> BF,
43    BF: FnOnce(I) -> O,
44{
45    fn bind(self, signal: XSignal<I>) -> ReactiveClosureBuilder<impl Fn() -> O>;
46}
47
48impl<F, BF, I, O> BindReactiveClosure<F, BF, I, O> for ReactiveClosureBuilder<F>
49where
50    F: Fn() -> BF,
51    BF: FnOnce(I) -> O,
52    I: Clone + 'static,
53{
54    fn bind(self, signal: XSignal<I>) -> ReactiveClosureBuilder<impl Fn() -> O> {
55        let span = debug_span!("Bind", closure = %self.name, signal = %signal.0.producer.name());
56        let _span = span.clone().entered();
57        let reactive_closure = self.reactive_closure;
58        trace!("Bind");
59        let signal_weak = signal.downgrade();
60        let immutable_value = signal.0.immutable_value.clone();
61        let bound_closure = move || {
62            let current_value = {
63                if let Some(signal) = signal_weak.upgrade() {
64                    let lock = &signal.0.current_value.lock().or_throw("current_value");
65                    lock.value().clone()
66                } else {
67                    // Signal -> ReactiveClosure
68                    // ReactiveClosure -> Weak<Signal>: to read the value
69                    // ReactiveClosure -> Weak<Signal>: to unsubscribe if dropped
70                    let _span = span.enter();
71                    debug!("Signal is dropped, keep previous value");
72                    let immutable_value = immutable_value.lock().or_throw("immutable_value");
73                    immutable_value.as_ref().or_throw("immutable_value").clone()
74                }
75            };
76            reactive_closure()(current_value)
77        };
78        let mut producers = self.producers;
79        producers.push(signal.0.producer.clone());
80        return ReactiveClosureBuilder {
81            name: self.name,
82            reactive_closure: bound_closure,
83            producers,
84        };
85    }
86}
87
88impl<F: Fn() + 'static> ReactiveClosureBuilder<F> {
89    /// Subscribes the reactive closure to all its signals.
90    /// There is no way to call it manually. The only way to get the closure to run is the change the signals.
91    #[autoclone]
92    pub fn register(self, template: impl IsTemplate) -> Consumers {
93        let _span = debug_span!("Register", closure = %self.name).entered();
94        let Self {
95            name,
96            reactive_closure,
97            producers,
98        } = self;
99        let reactive_closure = Ptr::new(ReactiveClosure {
100            name,
101            reactive_closure,
102            last_version: AtomicUsize::new(0),
103        });
104        trace!("Call");
105        reactive_closure.call(Version::current());
106
107        defer!(trace!("Add consumers: Done."));
108        trace!("Add consumers");
109        let mut consumers = vec![];
110        let consumer_name: XString = template.debug_id().to_string().into();
111        for producer in producers {
112            consumers.push(producer.register(
113                DebugCorrelationId::new(|| consumer_name.clone()),
114                template.depth(),
115                move |version| {
116                    autoclone!(reactive_closure);
117                    reactive_closure.call(version)
118                },
119            ));
120        }
121        return Consumers(consumers);
122    }
123}
124
125/// Creates a new reactive closure builder.
126///
127/// This is mainly used by `#[template]` code-generated code.
128#[track_caller]
129pub fn make_reactive_closure() -> ReactiveClosureBuilderWantClosure {
130    ReactiveClosureBuilderWantClosure {
131        name: NameOrCallSite::CallSite(std::panic::Location::caller()),
132    }
133}
134
135#[must_use]
136pub struct ReactiveClosureBuilderWantClosure {
137    name: NameOrCallSite,
138}
139
140enum NameOrCallSite {
141    Name(XString),
142    CallSite(&'static Location<'static>),
143}
144
145impl ReactiveClosureBuilderWantClosure {
146    pub fn named(self, name: impl Into<XString>) -> Self {
147        Self {
148            name: NameOrCallSite::Name(name.into()),
149        }
150    }
151
152    pub fn closure<F>(self, closure: F) -> ReactiveClosureBuilder<F> {
153        closure.into_reactive_closure_builder(match self.name {
154            NameOrCallSite::Name(name) => name,
155            NameOrCallSite::CallSite(location) => {
156                format!("{}:{}", location.file(), location.line()).into()
157            }
158        })
159    }
160}
161
162/// Turns a closure into a reactive closure builder.
163trait ToReactiveClosureBuilder: Sized {
164    fn into_reactive_closure_builder(
165        self,
166        name: impl Into<XString>,
167    ) -> ReactiveClosureBuilder<Self>;
168}
169
170impl<F> ToReactiveClosureBuilder for F {
171    fn into_reactive_closure_builder(
172        self,
173        name: impl Into<XString>,
174    ) -> ReactiveClosureBuilder<Self> {
175        let name = DebugCorrelationId::new(|| name.into());
176        debug!(closure = %name, "ReactiveClosure new");
177        ReactiveClosureBuilder {
178            name,
179            reactive_closure: self,
180            producers: vec![],
181        }
182    }
183}
184
185/// A struct that holds consumers, i.e. callbacks that are executed when producers execute.
186///
187/// This is used by the signaling mechanism.
188#[derive(Default)]
189pub struct Consumers(pub(crate) Vec<Consumer<ProducedSignal>>);
190
191impl std::fmt::Debug for Consumers {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        let consumers: Vec<String> = self.0.iter().map(|c| format!("{c:?}")).collect();
194        write!(f, "[{}]", consumers.join(", "))
195    }
196}
197
198/// Safe because Javascript is single-threaded.
199unsafe impl Send for Consumers {}
200
201/// Safe because Javascript is single-threaded.
202unsafe impl Sync for Consumers {}
203
204impl Consumers {
205    pub fn append(mut self, mut other: Self) -> Self {
206        self.0.append(&mut other.0);
207        return self;
208    }
209}