1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::{Checkin, Feature};
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::{CheckinStatus, ConfigurationProxySignal};
8use crate::identity::DistinctId;
9use crate::{Map, PersonProperties};
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13 Fact {
14 key: String,
15 value: serde_json::Value,
16 },
17 UpdateFeatureConfiguration(Option<Checkin>, FeatureFacts),
18 Event {
19 event_name: String,
20 properties: Option<Map>,
21 },
22 GetSessionProperties {
23 tx: tokio::sync::oneshot::Sender<Map>,
24 },
25 FlushNow,
26 Identify(DistinctId, IdentifyProperties),
27 SetPersonProperties(IdentifyProperties),
28 AddGroup {
29 group_name: String,
30 group_member_id: String,
31 },
32 Alias(String),
33 Reset,
34}
35
36#[derive(Default, Debug, serde::Serialize)]
37pub struct IdentifyProperties {
38 #[serde(rename = "$set")]
39 pub set: PersonProperties,
40 #[serde(rename = "$set_once")]
41 pub set_once: PersonProperties,
42}
43
44impl IdentifyProperties {
45 pub(crate) fn as_map(&self) -> Map {
46 let val = serde_json::to_value(self)
47 .inspect_err(|e| {
48 tracing::error!(
49 self = ?&self,
50 error = ?e,
51 "IdentifyProperties cannot convert to a Map"
52 );
53 })
54 .unwrap_or_default();
55
56 let serde_json::Value::Object(map) = val else {
57 tracing::error!(
58 self = ?&self,
59 "IdentifyProperties did not serialize to an Object"
60 );
61 return Map::default();
62 };
63 map
64 }
65}
66
67#[derive(thiserror::Error, Debug)]
68pub enum RecorderError {
69 #[error("Timed out waiting for configuration to complete: {0:?}")]
70 WaitForConfiguration(#[from] tokio::time::error::Elapsed),
71
72 #[error("Failed to subscribe to the ConfigurationProxy for configuration changes")]
73 SubscribeFailed,
74
75 #[error(transparent)]
76 Subscription(#[from] tokio::sync::broadcast::error::RecvError),
77
78 #[error("Failed to signal the configuration proxy: '{0}'")]
79 SendToConfigurationProxy(String),
80
81 #[error(transparent)]
82 Response(#[from] tokio::sync::oneshot::error::RecvError),
83}
84
85pub struct Recorder {
86 outgoing: Sender<RawSignal>,
87 auto_refresh_config: bool,
88 to_configuration_proxy: Sender<ConfigurationProxySignal>,
89}
90
91impl Clone for Recorder {
92 fn clone(&self) -> Self {
93 Self {
94 outgoing: self.outgoing.clone(),
95 auto_refresh_config: true,
96 to_configuration_proxy: self.to_configuration_proxy.clone(),
97 }
98 }
99}
100
101impl std::fmt::Debug for Recorder {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("Recorder").finish()
104 }
105}
106
107impl Recorder {
108 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
109 pub(crate) fn new(
110 snapshotter_tx: Sender<RawSignal>,
111 to_configuration_proxy: Sender<ConfigurationProxySignal>,
112 ) -> Self {
113 Self {
114 outgoing: snapshotter_tx,
115 to_configuration_proxy,
116 auto_refresh_config: true,
117 }
118 }
119
120 pub async fn in_configuration_txn<F, T>(&self, f: F) -> T
123 where
124 F: AsyncFnOnce(&Recorder) -> T,
125 {
126 let mut rec = self.clone();
127
128 rec.auto_refresh_config = false;
129
130 let ret = f(self).await;
131
132 rec.auto_refresh_config = true;
133 rec.trigger_configuration_refresh().await;
134
135 ret
136 }
137
138 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
139 pub async fn get_feature_variant<
140 T: serde::ser::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send,
141 >(
142 &self,
143 key: impl Into<String> + std::fmt::Debug,
144 ) -> Option<T> {
145 serde_json::from_value(self.get_feature::<T>(key).await?.variant)
146 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
147 .ok()
148 }
149
150 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
151 pub async fn get_feature_ptr_variant<
152 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
153 >(
154 &self,
155 key: impl Into<String> + std::fmt::Debug,
156 ) -> Option<T> {
157 serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
158 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
159 .ok()
160 }
161
162 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
163 pub async fn get_feature_payload<
164 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
165 >(
166 &self,
167 key: impl Into<String> + std::fmt::Debug,
168 ) -> Option<T> {
169 self.get_feature::<T>(key).await?.payload
170 }
171
172 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
173 pub async fn get_feature_ptr_payload<
174 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
175 >(
176 &self,
177 key: impl Into<String> + std::fmt::Debug,
178 ) -> Option<T> {
179 self.get_feature_ptr::<T>(key).await?.payload
180 }
181
182 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
183 pub async fn get_feature_ptr<
184 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
185 >(
186 &self,
187 key: impl Into<String> + std::fmt::Debug,
188 ) -> Option<Feature<T>> {
189 let ptr = self.get_feature_payload::<String>(key).await?;
190 self.get_feature::<T>(ptr).await
191 }
192
193 pub async fn wait_for_checkin(
194 &self,
195 duration: Option<std::time::Duration>,
196 ) -> Result<(), RecorderError> {
197 let (tx, rx) = oneshot();
198
199 let subscription = self.subscribe_to_feature_changes().await;
200
201 self.to_configuration_proxy
202 .send(ConfigurationProxySignal::QueryIfCheckedIn(tx))
203 .instrument(tracing::trace_span!(
204 "requesting check in status from the configuration proxy"
205 ))
206 .await
207 .map_err(|e| RecorderError::SendToConfigurationProxy(format!("{e:?}")))?;
208
209 if rx.await? == CheckinStatus::CheckedIn {
210 return Ok(());
211 }
212
213 let Some(mut subscription) = subscription else {
214 return Err(RecorderError::SubscribeFailed);
215 };
216
217 if let Some(duration) = duration {
218 Ok(tokio::time::timeout(duration, subscription.recv()).await??)
219 } else {
220 Ok(subscription.recv().await?)
221 }
222 }
223
224 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
225 pub async fn get_feature<
226 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
227 >(
228 &self,
229 key: impl Into<String> + std::fmt::Debug,
230 ) -> Option<Feature<T>> {
231 let key: String = key.into();
232 let (tx, rx) = oneshot();
233
234 self.to_configuration_proxy
235 .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
236 .instrument(tracing::trace_span!(
237 "requesting feature from the configuration proxy"
238 ))
239 .await
240 .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
241 .ok()?;
242
243 let feature = rx
244 .instrument(tracing::trace_span!("waiting for the feature"))
245 .await
246 .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
247 .ok()
248 .flatten()?;
249
250 self.record(
251 "$feature_flag_called",
252 Some(Map::from_iter([
253 ("$feature_flag".into(), key.into()),
254 ("$feature_flag_response".into(), feature.variant.clone()),
255 ])),
256 )
257 .await;
258
259 let variant = feature.variant.clone();
260 let payload = if let Some(ref p) = feature.payload {
261 let ret = serde_json::from_value(p.clone()).ok()?;
262 Some(ret)
263 } else {
264 None
265 };
266
267 Some(Feature { variant, payload })
268 }
269
270 pub async fn subscribe_to_feature_changes(
271 &self,
272 ) -> Option<tokio::sync::broadcast::Receiver<()>> {
273 let (tx, rx) = oneshot();
274
275 self.to_configuration_proxy
276 .send(ConfigurationProxySignal::Subscribe(tx))
277 .instrument(tracing::debug_span!("subscribe to feature changes"))
278 .await
279 .inspect_err(|e| {
280 tracing::error!(error = ?e, "Failed to request subscription to feature changes");
281 })
282 .ok()?;
283
284 rx.await
285 .inspect_err(|e| {
286 tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
287 })
288 .ok()
289 }
290
291 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
292 pub async fn set_fact(
293 &self,
294 key: impl Into<String> + std::fmt::Debug,
295 value: serde_json::Value,
296 ) {
297 if let Err(e) = self
298 .outgoing
299 .send(RawSignal::Fact {
300 key: key.into(),
301 value,
302 })
303 .await
304 {
305 tracing::error!(error = ?e, "Failed to enqueue a fact");
306 }
307 }
308
309 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
310 pub async fn record(
311 &self,
312 event: impl Into<String> + std::fmt::Debug,
313 properties: Option<Map>,
314 ) {
315 if let Err(e) = self
316 .outgoing
317 .send(RawSignal::Event {
318 event_name: event.into(),
319 properties,
320 })
321 .instrument(tracing::trace_span!("recording the event"))
322 .await
323 {
324 tracing::error!(error = ?e, "Failed to enqueue an event message");
325 }
326 }
327
328 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
329 pub async fn identify(&self, new: DistinctId) {
330 self.identify_with_properties(new, IdentifyProperties::default())
331 .await;
332 }
333
334 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
335 pub async fn identify_with_properties(&self, new: DistinctId, properties: IdentifyProperties) {
336 if let Err(e) = self
337 .outgoing
338 .send(RawSignal::Identify(new, properties))
339 .instrument(tracing::trace_span!("sending the Identify message"))
340 .await
341 {
342 tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
343 }
344
345 self.trigger_configuration_refresh()
346 .instrument(tracing::trace_span!("triggering a configuration refresh"))
347 .await;
348 }
349
350 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
351 pub async fn set_person_properties(&self, properties: IdentifyProperties) {
352 if let Err(e) = self
353 .outgoing
354 .send(RawSignal::SetPersonProperties(properties))
355 .instrument(tracing::trace_span!(
356 "sending the SetPersonProperties message"
357 ))
358 .await
359 {
360 tracing::error!(error = ?e, "Failed to enqueue set_person_properties message");
361 }
362
363 self.trigger_configuration_refresh()
364 .instrument(tracing::trace_span!("triggering a configuration refresh"))
365 .await;
366 }
367
368 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
369 pub async fn add_group(
370 &self,
371 group_name: impl Into<String> + std::fmt::Debug,
372 group_member_id: impl Into<String> + std::fmt::Debug,
373 ) {
374 if let Err(e) = self
375 .outgoing
376 .send(RawSignal::AddGroup {
377 group_name: group_name.into(),
378 group_member_id: group_member_id.into(),
379 })
380 .instrument(tracing::trace_span!("sending the AddGroup message"))
381 .await
382 {
383 tracing::error!(error = ?e, "Failed to enqueue AddGroup message");
384 }
385
386 self.trigger_configuration_refresh()
387 .instrument(tracing::trace_span!("triggering a configuration refresh"))
388 .await;
389 }
390
391 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
392 pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
393 if let Err(e) = self
394 .outgoing
395 .send(RawSignal::Alias(alias.into()))
396 .instrument(tracing::trace_span!("sending the Alias message"))
397 .await
398 {
399 tracing::error!(error = ?e, "Failed to enqueue Alias message");
400 }
401
402 self.trigger_configuration_refresh()
403 .instrument(tracing::trace_span!("triggering a configuration refresh"))
404 .await;
405 }
406
407 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
408 pub async fn reset(&self) {
409 if let Err(e) = self
410 .outgoing
411 .send(RawSignal::Reset)
412 .instrument(tracing::trace_span!("sending the Reset message"))
413 .await
414 {
415 tracing::error!(error = ?e, "Failed to enqueue reset message");
416 }
417
418 self.trigger_configuration_refresh()
419 .instrument(tracing::trace_span!("triggering a configuration refresh"))
420 .await;
421 }
422
423 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
424 async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
425 let (tx, rx) = tokio::sync::oneshot::channel();
426
427 self.outgoing
428 .send(RawSignal::GetSessionProperties { tx })
429 .instrument(tracing::trace_span!(
430 "sending the GetSessionProperties message"
431 ))
432 .await
433 .map_err(|_| FullDuplexError::SendError)?;
434
435 Ok(rx
436 .instrument(tracing::trace_span!("waiting for reply"))
437 .await?)
438 }
439
440 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
441 pub async fn flush_now(&self) {
442 if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
443 tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
444 }
445 }
446
447 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
448 pub(crate) async fn trigger_configuration_refresh(&self) {
449 if !self.auto_refresh_config {
450 tracing::trace!("Not refreshing configuration because it is paused");
451 return;
452 }
453
454 let (tx, rx) = oneshot();
455
456 let session_properties = self
457 .get_session_properties()
458 .instrument(tracing::debug_span!("request session properties"))
459 .await
460 .inspect_err(|e| {
461 tracing::debug!(%e, "Failed to get session properties");
462 })
463 .unwrap_or_default();
464
465 if let Err(e) = self
466 .to_configuration_proxy
467 .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
468 .instrument(tracing::debug_span!("request immediate check-in"))
469 .await
470 {
471 tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
472 }
473
474 let (config, feats) = match rx
475 .instrument(tracing::debug_span!("receive feature facts"))
476 .await
477 {
478 Ok((config, feats)) => (config, feats),
479 Err(e) => {
480 tracing::error!(error = ?e, "Failed to refresh the configuration");
481
482 return;
483 }
484 };
485
486 if let Err(e) = self
487 .outgoing
488 .send(RawSignal::UpdateFeatureConfiguration(config, feats))
489 .instrument(tracing::debug_span!("forward feature facts"))
490 .await
491 {
492 tracing::error!(%e, "Failed to forward updated feature facts");
493 }
494 }
495}
496
497#[derive(thiserror::Error, Debug)]
498pub enum FullDuplexError {
499 #[error("Failed to request session properties")]
500 SendError,
501
502 #[error("Error waiting for a reply: {0}")]
503 Recv(#[from] tokio::sync::oneshot::error::RecvError),
504}