1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::Feature;
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::ConfigurationProxySignal;
8use crate::identity::DistinctId;
9use crate::Map;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13 Fact {
14 key: String,
15 value: serde_json::Value,
16 },
17 UpdateFeatureFacts(FeatureFacts),
18 Event {
19 event_name: String,
20 properties: Option<Map>,
21 },
22 GetSessionProperties {
23 tx: tokio::sync::oneshot::Sender<Map>,
24 },
25 FlushNow,
26 Identify(DistinctId),
27 Alias(String),
28 Reset,
29}
30
31#[derive(Clone)]
32pub struct Recorder {
33 outgoing: Sender<RawSignal>,
34 to_configuration_proxy: Sender<ConfigurationProxySignal>,
35}
36
37impl std::fmt::Debug for Recorder {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("Recorder").finish()
40 }
41}
42
43impl Recorder {
44 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
45 pub(crate) fn new(
46 snapshotter_tx: Sender<RawSignal>,
47 to_configuration_proxy: Sender<ConfigurationProxySignal>,
48 ) -> Self {
49 Self {
50 outgoing: snapshotter_tx,
51 to_configuration_proxy,
52 }
53 }
54
55 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
56 pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
57 &self,
58 key: impl Into<String> + std::fmt::Debug,
59 ) -> Option<T> {
60 serde_json::from_value(self.get_feature::<T>(key).await?.variant)
61 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
62 .ok()
63 }
64
65 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
66 pub async fn get_feature_ptr_variant<
67 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
68 >(
69 &self,
70 key: impl Into<String> + std::fmt::Debug,
71 ) -> Option<T> {
72 serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
73 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
74 .ok()
75 }
76
77 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
78 pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
79 &self,
80 key: impl Into<String> + std::fmt::Debug,
81 ) -> Option<T> {
82 self.get_feature::<T>(key).await?.payload
83 }
84
85 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
86 pub async fn get_feature_ptr_payload<
87 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
88 >(
89 &self,
90 key: impl Into<String> + std::fmt::Debug,
91 ) -> Option<T> {
92 self.get_feature_ptr::<T>(key).await?.payload
93 }
94
95 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
96 pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
97 &self,
98 key: impl Into<String> + std::fmt::Debug,
99 ) -> Option<Feature<T>> {
100 let ptr = self.get_feature_payload::<String>(key).await?;
101 self.get_feature::<T>(ptr).await
102 }
103
104 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
105 pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
106 &self,
107 key: impl Into<String> + std::fmt::Debug,
108 ) -> Option<Feature<T>> {
109 let key: String = key.into();
110 let (tx, rx) = oneshot();
111
112 self.to_configuration_proxy
113 .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
114 .instrument(tracing::trace_span!(
115 "requesting feature from the configuration proxy"
116 ))
117 .await
118 .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
119 .ok()?;
120
121 let feature = rx
122 .instrument(tracing::trace_span!("waiting for the feature"))
123 .await
124 .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
125 .ok()
126 .flatten()?;
127
128 self.record(
129 "$feature_flag_called",
130 Some(Map::from_iter([
131 ("$feature_flag".into(), key.into()),
132 ("$feature_flag_response".into(), feature.variant.clone()),
133 ])),
134 )
135 .await;
136
137 let variant = feature.variant.clone();
138 let payload = if let Some(ref p) = feature.payload {
139 let ret = serde_json::from_value(p.clone()).ok()?;
140 Some(ret)
141 } else {
142 None
143 };
144
145 Some(Feature { variant, payload })
146 }
147
148 pub async fn subscribe_to_feature_changes(
149 &self,
150 ) -> Option<tokio::sync::broadcast::Receiver<()>> {
151 let (tx, rx) = oneshot();
152
153 self.to_configuration_proxy
154 .send(ConfigurationProxySignal::Subscribe(tx))
155 .instrument(tracing::debug_span!("subscribe to feature changes"))
156 .await
157 .inspect_err(|e| {
158 tracing::error!(error = ?e, "Failed to request subscription to feature changes");
159 })
160 .ok()?;
161
162 rx.await
163 .inspect_err(|e| {
164 tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
165 })
166 .ok()
167 }
168
169 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
170 pub async fn add_fact(&self, key: &str, value: serde_json::Value) {
171 if let Err(e) = self
172 .outgoing
173 .send(RawSignal::Fact {
174 key: key.to_string(),
175 value,
176 })
177 .await
178 {
179 tracing::error!(error = ?e, "Failed to enqueue a fact");
180 }
181 }
182
183 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
184 pub async fn record(&self, event: &str, properties: Option<Map>) {
185 if let Err(e) = self
186 .outgoing
187 .send(RawSignal::Event {
188 event_name: event.to_string(),
189 properties,
190 })
191 .instrument(tracing::trace_span!("recording the event"))
192 .await
193 {
194 tracing::error!(error = ?e, "Failed to enqueue an event message");
195 }
196 }
197
198 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
199 pub async fn identify(&self, new: DistinctId) {
200 if let Err(e) = self
201 .outgoing
202 .send(RawSignal::Identify(new))
203 .instrument(tracing::trace_span!("sending the Identify message"))
204 .await
205 {
206 tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
207 }
208
209 self.trigger_configuration_refresh()
210 .instrument(tracing::trace_span!("triggering a configuration refresh"))
211 .await;
212 }
213
214 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
215 pub async fn alias(&self, alias: String) {
216 if let Err(e) = self
217 .outgoing
218 .send(RawSignal::Alias(alias))
219 .instrument(tracing::trace_span!("sending the Alias message"))
220 .await
221 {
222 tracing::error!(error = ?e, "Failed to enqueue Alias message");
223 }
224
225 self.trigger_configuration_refresh()
226 .instrument(tracing::trace_span!("triggering a configuration refresh"))
227 .await;
228 }
229
230 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
231 pub async fn reset(&self) {
232 if let Err(e) = self
233 .outgoing
234 .send(RawSignal::Reset)
235 .instrument(tracing::trace_span!("sending the Reset message"))
236 .await
237 {
238 tracing::error!(error = ?e, "Failed to enqueue reset message");
239 }
240
241 self.trigger_configuration_refresh()
242 .instrument(tracing::trace_span!("triggering a configuration refresh"))
243 .await;
244 }
245
246 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
247 async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
248 let (tx, rx) = tokio::sync::oneshot::channel();
249
250 self.outgoing
251 .send(RawSignal::GetSessionProperties { tx })
252 .instrument(tracing::trace_span!(
253 "sending the GetSessionProperties message"
254 ))
255 .await
256 .map_err(|_| FullDuplexError::SendError)?;
257
258 Ok(rx
259 .instrument(tracing::trace_span!("waiting for reply"))
260 .await?)
261 }
262
263 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
264 pub async fn flush_now(&self) {
265 if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
266 tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
267 }
268 }
269
270 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
271 pub(crate) async fn trigger_configuration_refresh(&self) {
272 let (tx, rx) = oneshot();
273
274 let session_properties = self
275 .get_session_properties()
276 .instrument(tracing::debug_span!("request session properties"))
277 .await
278 .inspect_err(|e| {
279 tracing::debug!(%e, "Failed to get session properties");
280 })
281 .unwrap_or_default();
282
283 if let Err(e) = self
284 .to_configuration_proxy
285 .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
286 .instrument(tracing::debug_span!("request immediate check-in"))
287 .await
288 {
289 tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
290 }
291
292 let feats = match rx
293 .instrument(tracing::debug_span!("receive feature facts"))
294 .await
295 {
296 Ok(feats) => feats,
297 Err(e) => {
298 tracing::error!(error = ?e, "Failed to refresh the configuration");
299
300 return;
301 }
302 };
303
304 if let Err(e) = self
305 .outgoing
306 .send(RawSignal::UpdateFeatureFacts(feats))
307 .instrument(tracing::debug_span!("forward feature facts"))
308 .await
309 {
310 tracing::error!(%e, "Failed to forward updated feature facts");
311 }
312 }
313}
314
315#[derive(thiserror::Error, Debug)]
316pub enum FullDuplexError {
317 #[error("Failed to request session properties")]
318 SendError,
319
320 #[error("Error waiting for a reply: {0}")]
321 Recv(#[from] tokio::sync::oneshot::error::RecvError),
322}