Skip to main content

detsys_ids_client/
recorder.rs

1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::{Checkin, Feature};
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::{CheckinStatus, ConfigurationProxySignal};
8use crate::identity::DistinctId;
9use crate::{Map, PersonProperties};
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13    Fact {
14        key: String,
15        value: serde_json::Value,
16    },
17    UpdateFeatureConfiguration(Option<Checkin>, FeatureFacts),
18    Event {
19        event_name: String,
20        properties: Option<Map>,
21    },
22    GetSessionProperties {
23        tx: tokio::sync::oneshot::Sender<Map>,
24    },
25    FlushNow,
26    Identify(DistinctId, IdentifyProperties),
27    SetPersonProperties(IdentifyProperties),
28    AddGroup {
29        group_name: String,
30        group_member_id: String,
31    },
32    Alias(String),
33    Reset,
34}
35
36#[derive(Default, Debug, serde::Serialize)]
37pub struct IdentifyProperties {
38    #[serde(rename = "$set")]
39    pub set: PersonProperties,
40    #[serde(rename = "$set_once")]
41    pub set_once: PersonProperties,
42}
43
44impl IdentifyProperties {
45    pub(crate) fn as_map(&self) -> Map {
46        let val = serde_json::to_value(self)
47            .inspect_err(|e| {
48                tracing::error!(
49                    self = ?&self,
50                    error = ?e,
51                    "IdentifyProperties cannot convert to a Map"
52                );
53            })
54            .unwrap_or_default();
55
56        let serde_json::Value::Object(map) = val else {
57            tracing::error!(
58                self = ?&self,
59                "IdentifyProperties did not serialize to an Object"
60            );
61            return Map::default();
62        };
63        map
64    }
65}
66
67#[derive(thiserror::Error, Debug)]
68pub enum RecorderError {
69    #[error("Timed out waiting for configuration to complete: {0:?}")]
70    WaitForConfiguration(#[from] tokio::time::error::Elapsed),
71
72    #[error("Failed to subscribe to the ConfigurationProxy for configuration changes")]
73    SubscribeFailed,
74
75    #[error(transparent)]
76    Subscription(#[from] tokio::sync::broadcast::error::RecvError),
77
78    #[error("Failed to signal the configuration proxy: '{0}'")]
79    SendToConfigurationProxy(String),
80
81    #[error(transparent)]
82    Response(#[from] tokio::sync::oneshot::error::RecvError),
83}
84
85pub struct Recorder {
86    outgoing: Sender<RawSignal>,
87    auto_refresh_config: bool,
88    to_configuration_proxy: Sender<ConfigurationProxySignal>,
89}
90
91impl Clone for Recorder {
92    fn clone(&self) -> Self {
93        Self {
94            outgoing: self.outgoing.clone(),
95            auto_refresh_config: true,
96            to_configuration_proxy: self.to_configuration_proxy.clone(),
97        }
98    }
99}
100
101impl std::fmt::Debug for Recorder {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("Recorder").finish()
104    }
105}
106
107impl Recorder {
108    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
109    pub(crate) fn new(
110        snapshotter_tx: Sender<RawSignal>,
111        to_configuration_proxy: Sender<ConfigurationProxySignal>,
112    ) -> Self {
113        Self {
114            outgoing: snapshotter_tx,
115            to_configuration_proxy,
116            auto_refresh_config: true,
117        }
118    }
119
120    // Execute a series of operations without triggering multiple configuration refreshes.
121    // Note: there are no atomic semantics, and configuration is refreshed at the end no matter what your function does.
122    pub async fn in_configuration_txn<F, T>(&self, f: F) -> T
123    where
124        F: AsyncFnOnce(&Recorder) -> T,
125    {
126        let mut rec = self.clone();
127
128        rec.auto_refresh_config = false;
129
130        let ret = f(self).await;
131
132        rec.auto_refresh_config = true;
133        rec.trigger_configuration_refresh().await;
134
135        ret
136    }
137
138    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
139    pub async fn get_feature_variant<
140        T: serde::ser::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send,
141    >(
142        &self,
143        key: impl Into<String> + std::fmt::Debug,
144    ) -> Option<T> {
145        serde_json::from_value(self.get_feature::<T>(key).await?.variant)
146            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
147            .ok()
148    }
149
150    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
151    pub async fn get_feature_ptr_variant<
152        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
153    >(
154        &self,
155        key: impl Into<String> + std::fmt::Debug,
156    ) -> Option<T> {
157        serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
158            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
159            .ok()
160    }
161
162    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
163    pub async fn get_feature_payload<
164        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
165    >(
166        &self,
167        key: impl Into<String> + std::fmt::Debug,
168    ) -> Option<T> {
169        self.get_feature::<T>(key).await?.payload
170    }
171
172    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
173    pub async fn get_feature_ptr_payload<
174        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
175    >(
176        &self,
177        key: impl Into<String> + std::fmt::Debug,
178    ) -> Option<T> {
179        self.get_feature_ptr::<T>(key).await?.payload
180    }
181
182    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
183    pub async fn get_feature_ptr<
184        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
185    >(
186        &self,
187        key: impl Into<String> + std::fmt::Debug,
188    ) -> Option<Feature<T>> {
189        let ptr = self.get_feature_payload::<String>(key).await?;
190        self.get_feature::<T>(ptr).await
191    }
192
193    pub async fn wait_for_checkin(
194        &self,
195        duration: Option<std::time::Duration>,
196    ) -> Result<(), RecorderError> {
197        let (tx, rx) = oneshot();
198
199        let subscription = self.subscribe_to_feature_changes().await;
200
201        self.to_configuration_proxy
202            .send(ConfigurationProxySignal::QueryIfCheckedIn(tx))
203            .instrument(tracing::trace_span!(
204                "requesting check in status from the configuration proxy"
205            ))
206            .await
207            .map_err(|e| RecorderError::SendToConfigurationProxy(format!("{e:?}")))?;
208
209        if rx.await? == CheckinStatus::CheckedIn {
210            tracing::debug!("Already checked in!");
211            return Ok(());
212        }
213
214        let Some(mut subscription) = subscription else {
215            return Err(RecorderError::SubscribeFailed);
216        };
217
218        if let Some(duration) = duration {
219            Ok(tokio::time::timeout(duration, subscription.recv()).await??)
220        } else {
221            Ok(subscription.recv().await?)
222        }
223    }
224
225    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
226    pub async fn get_feature<
227        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
228    >(
229        &self,
230        key: impl Into<String> + std::fmt::Debug,
231    ) -> Option<Feature<T>> {
232        let key: String = key.into();
233        let (tx, rx) = oneshot();
234
235        self.to_configuration_proxy
236            .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
237            .instrument(tracing::trace_span!(
238                "requesting feature from the configuration proxy"
239            ))
240            .await
241            .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
242            .ok()?;
243
244        let feature = rx
245            .instrument(tracing::trace_span!("waiting for the feature"))
246            .await
247            .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
248            .ok()
249            .flatten()?;
250
251        self.record(
252            "$feature_flag_called",
253            Some(Map::from_iter([
254                ("$feature_flag".into(), key.into()),
255                ("$feature_flag_response".into(), feature.variant.clone()),
256            ])),
257        )
258        .await;
259
260        let variant = feature.variant.clone();
261        let payload = if let Some(ref p) = feature.payload {
262            let ret = serde_json::from_value(p.clone()).ok()?;
263            Some(ret)
264        } else {
265            None
266        };
267
268        Some(Feature { variant, payload })
269    }
270
271    pub async fn subscribe_to_feature_changes(
272        &self,
273    ) -> Option<tokio::sync::broadcast::Receiver<()>> {
274        let (tx, rx) = oneshot();
275
276        self.to_configuration_proxy
277            .send(ConfigurationProxySignal::Subscribe(tx))
278            .instrument(tracing::debug_span!("subscribe to feature changes"))
279            .await
280            .inspect_err(|e| {
281                tracing::error!(error = ?e, "Failed to request subscription to feature changes");
282            })
283            .ok()?;
284
285        rx.await
286            .inspect_err(|e| {
287                tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
288            })
289            .ok()
290    }
291
292    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
293    pub async fn set_fact(
294        &self,
295        key: impl Into<String> + std::fmt::Debug,
296        value: serde_json::Value,
297    ) {
298        if let Err(e) = self
299            .outgoing
300            .send(RawSignal::Fact {
301                key: key.into(),
302                value,
303            })
304            .await
305        {
306            tracing::error!(error = ?e, "Failed to enqueue a fact");
307        }
308    }
309
310    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
311    pub async fn record(
312        &self,
313        event: impl Into<String> + std::fmt::Debug,
314        properties: Option<Map>,
315    ) {
316        if let Err(e) = self
317            .outgoing
318            .send(RawSignal::Event {
319                event_name: event.into(),
320                properties,
321            })
322            .instrument(tracing::trace_span!("recording the event"))
323            .await
324        {
325            tracing::error!(error = ?e, "Failed to enqueue an event message");
326        }
327    }
328
329    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
330    pub async fn identify(&self, new: DistinctId) {
331        self.identify_with_properties(new, IdentifyProperties::default())
332            .await;
333    }
334
335    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
336    pub async fn identify_with_properties(&self, new: DistinctId, properties: IdentifyProperties) {
337        if let Err(e) = self
338            .outgoing
339            .send(RawSignal::Identify(new, properties))
340            .instrument(tracing::trace_span!("sending the Identify message"))
341            .await
342        {
343            tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
344        }
345
346        self.trigger_configuration_refresh()
347            .instrument(tracing::trace_span!("triggering a configuration refresh"))
348            .await;
349    }
350
351    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
352    pub async fn set_person_properties(&self, properties: IdentifyProperties) {
353        if let Err(e) = self
354            .outgoing
355            .send(RawSignal::SetPersonProperties(properties))
356            .instrument(tracing::trace_span!(
357                "sending the SetPersonProperties message"
358            ))
359            .await
360        {
361            tracing::error!(error = ?e, "Failed to enqueue set_person_properties message");
362        }
363
364        self.trigger_configuration_refresh()
365            .instrument(tracing::trace_span!("triggering a configuration refresh"))
366            .await;
367    }
368
369    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
370    pub async fn add_group(
371        &self,
372        group_name: impl Into<String> + std::fmt::Debug,
373        group_member_id: impl Into<String> + std::fmt::Debug,
374    ) {
375        if let Err(e) = self
376            .outgoing
377            .send(RawSignal::AddGroup {
378                group_name: group_name.into(),
379                group_member_id: group_member_id.into(),
380            })
381            .instrument(tracing::trace_span!("sending the AddGroup message"))
382            .await
383        {
384            tracing::error!(error = ?e, "Failed to enqueue AddGroup message");
385        }
386
387        self.trigger_configuration_refresh()
388            .instrument(tracing::trace_span!("triggering a configuration refresh"))
389            .await;
390    }
391
392    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
393    pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
394        if let Err(e) = self
395            .outgoing
396            .send(RawSignal::Alias(alias.into()))
397            .instrument(tracing::trace_span!("sending the Alias message"))
398            .await
399        {
400            tracing::error!(error = ?e, "Failed to enqueue Alias message");
401        }
402
403        self.trigger_configuration_refresh()
404            .instrument(tracing::trace_span!("triggering a configuration refresh"))
405            .await;
406    }
407
408    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
409    pub async fn reset(&self) {
410        if let Err(e) = self
411            .outgoing
412            .send(RawSignal::Reset)
413            .instrument(tracing::trace_span!("sending the Reset message"))
414            .await
415        {
416            tracing::error!(error = ?e, "Failed to enqueue reset message");
417        }
418
419        self.trigger_configuration_refresh()
420            .instrument(tracing::trace_span!("triggering a configuration refresh"))
421            .await;
422    }
423
424    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
425    async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
426        let (tx, rx) = tokio::sync::oneshot::channel();
427
428        self.outgoing
429            .send(RawSignal::GetSessionProperties { tx })
430            .instrument(tracing::trace_span!(
431                "sending the GetSessionProperties message"
432            ))
433            .await
434            .map_err(|_| FullDuplexError::SendError)?;
435
436        Ok(rx
437            .instrument(tracing::trace_span!("waiting for reply"))
438            .await?)
439    }
440
441    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
442    pub async fn flush_now(&self) {
443        if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
444            tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
445        }
446    }
447
448    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
449    pub(crate) async fn trigger_configuration_refresh(&self) {
450        if !self.auto_refresh_config {
451            tracing::trace!("Not refreshing configuration because it is paused");
452            return;
453        }
454
455        let (tx, rx) = oneshot();
456
457        let session_properties = self
458            .get_session_properties()
459            .instrument(tracing::debug_span!("request session properties"))
460            .await
461            .inspect_err(|e| {
462                tracing::debug!(%e, "Failed to get session properties");
463            })
464            .unwrap_or_default();
465
466        if let Err(e) = self
467            .to_configuration_proxy
468            .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
469            .instrument(tracing::debug_span!("request immediate check-in"))
470            .await
471        {
472            tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
473        }
474
475        let (config, feats) = match rx
476            .instrument(tracing::debug_span!("receive feature facts"))
477            .await
478        {
479            Ok((config, feats)) => (config, feats),
480            Err(e) => {
481                tracing::error!(error = ?e, "Failed to refresh the configuration");
482
483                return;
484            }
485        };
486
487        if let Err(e) = self
488            .outgoing
489            .send(RawSignal::UpdateFeatureConfiguration(config, feats))
490            .instrument(tracing::debug_span!("forward feature facts"))
491            .await
492        {
493            tracing::error!(%e, "Failed to forward updated feature facts");
494        }
495    }
496}
497
498#[derive(thiserror::Error, Debug)]
499pub enum FullDuplexError {
500    #[error("Failed to request session properties")]
501    SendError,
502
503    #[error("Error waiting for a reply: {0}")]
504    Recv(#[from] tokio::sync::oneshot::error::RecvError),
505}