terrazzo_client/signal/reactive_closure/
reactive_closure_builder.rs1use std::panic::Location;
2use std::sync::atomic::AtomicUsize;
3
4use autoclone::autoclone;
5use scopeguard::defer;
6
7use super::ReactiveClosure;
8use crate::debug_correlation_id::DebugCorrelationId;
9use crate::prelude::OrElseLog as _;
10use crate::prelude::diagnostics::debug;
11use crate::prelude::diagnostics::debug_span;
12use crate::prelude::diagnostics::trace;
13use crate::signal::ProducedSignal;
14use crate::signal::XSignal;
15use crate::signal::producers::consumer::Consumer;
16use crate::signal::producers::producer::Producer;
17use crate::signal::version::Version;
18use crate::string::XString;
19use crate::template::IsTemplate;
20use crate::utils::Ptr;
21
22#[must_use]
26pub struct ReactiveClosureBuilder<F> {
27 name: DebugCorrelationId<XString>,
29
30 reactive_closure: F,
32
33 producers: Vec<Producer<ProducedSignal>>,
35}
36
37pub trait BindReactiveClosure<F, BF, I, O>
41where
42 F: Fn() -> BF,
43 BF: FnOnce(I) -> O,
44{
45 fn bind(self, signal: XSignal<I>) -> ReactiveClosureBuilder<impl Fn() -> O>;
46}
47
48impl<F, BF, I, O> BindReactiveClosure<F, BF, I, O> for ReactiveClosureBuilder<F>
49where
50 F: Fn() -> BF,
51 BF: FnOnce(I) -> O,
52 I: Clone + 'static,
53{
54 fn bind(self, signal: XSignal<I>) -> ReactiveClosureBuilder<impl Fn() -> O> {
55 let span = debug_span!("Bind", closure = %self.name, signal = %signal.0.producer.name());
56 let _span = span.clone().entered();
57 let reactive_closure = self.reactive_closure;
58 trace!("Bind");
59 let signal_weak = signal.downgrade();
60 let immutable_value = signal.0.immutable_value.clone();
61 let bound_closure = move || {
62 let current_value = {
63 if let Some(signal) = signal_weak.upgrade() {
64 let lock = &signal.0.current_value.lock().or_throw("current_value");
65 lock.value().clone()
66 } else {
67 let _span = span.enter();
71 debug!("Signal is dropped, keep previous value");
72 let immutable_value = immutable_value.lock().or_throw("immutable_value");
73 immutable_value.as_ref().or_throw("immutable_value").clone()
74 }
75 };
76 reactive_closure()(current_value)
77 };
78 let mut producers = self.producers;
79 producers.push(signal.0.producer.clone());
80 return ReactiveClosureBuilder {
81 name: self.name,
82 reactive_closure: bound_closure,
83 producers,
84 };
85 }
86}
87
88impl<F: Fn() + 'static> ReactiveClosureBuilder<F> {
89 #[autoclone]
92 pub fn register(self, template: impl IsTemplate) -> Consumers {
93 let _span = debug_span!("Register", closure = %self.name).entered();
94 let Self {
95 name,
96 reactive_closure,
97 producers,
98 } = self;
99 let reactive_closure = Ptr::new(ReactiveClosure {
100 name,
101 reactive_closure,
102 last_version: AtomicUsize::new(0),
103 });
104 trace!("Call");
105 reactive_closure.call(Version::current());
106
107 defer!(trace!("Add consumers: Done."));
108 trace!("Add consumers");
109 let mut consumers = vec![];
110 let consumer_name: XString = template.debug_id().to_string().into();
111 for producer in producers {
112 consumers.push(producer.register(
113 DebugCorrelationId::new(|| consumer_name.clone()),
114 template.depth(),
115 move |version| {
116 autoclone!(reactive_closure);
117 reactive_closure.call(version)
118 },
119 ));
120 }
121 return Consumers(consumers);
122 }
123}
124
125#[track_caller]
129pub fn make_reactive_closure() -> ReactiveClosureBuilderWantClosure {
130 ReactiveClosureBuilderWantClosure {
131 name: NameOrCallSite::CallSite(std::panic::Location::caller()),
132 }
133}
134
135#[must_use]
136pub struct ReactiveClosureBuilderWantClosure {
137 name: NameOrCallSite,
138}
139
140enum NameOrCallSite {
141 Name(XString),
142 CallSite(&'static Location<'static>),
143}
144
145impl ReactiveClosureBuilderWantClosure {
146 pub fn named(self, name: impl Into<XString>) -> Self {
147 Self {
148 name: NameOrCallSite::Name(name.into()),
149 }
150 }
151
152 pub fn closure<F>(self, closure: F) -> ReactiveClosureBuilder<F> {
153 closure.into_reactive_closure_builder(match self.name {
154 NameOrCallSite::Name(name) => name,
155 NameOrCallSite::CallSite(location) => {
156 format!("{}:{}", location.file(), location.line()).into()
157 }
158 })
159 }
160}
161
162trait ToReactiveClosureBuilder: Sized {
164 fn into_reactive_closure_builder(
165 self,
166 name: impl Into<XString>,
167 ) -> ReactiveClosureBuilder<Self>;
168}
169
170impl<F> ToReactiveClosureBuilder for F {
171 fn into_reactive_closure_builder(
172 self,
173 name: impl Into<XString>,
174 ) -> ReactiveClosureBuilder<Self> {
175 let name = DebugCorrelationId::new(|| name.into());
176 debug!(closure = %name, "ReactiveClosure new");
177 ReactiveClosureBuilder {
178 name,
179 reactive_closure: self,
180 producers: vec![],
181 }
182 }
183}
184
185#[derive(Default)]
189pub struct Consumers(pub(crate) Vec<Consumer<ProducedSignal>>);
190
191impl std::fmt::Debug for Consumers {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 let consumers: Vec<String> = self.0.iter().map(|c| format!("{c:?}")).collect();
194 write!(f, "[{}]", consumers.join(", "))
195 }
196}
197
198unsafe impl Send for Consumers {}
200
201unsafe impl Sync for Consumers {}
203
204impl Consumers {
205 pub fn append(mut self, mut other: Self) -> Self {
206 self.0.append(&mut other.0);
207 return self;
208 }
209}