detsys_ids_client/
recorder.rs

1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::{Checkin, Feature};
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::{CheckinStatus, ConfigurationProxySignal};
8use crate::identity::DistinctId;
9use crate::{Map, PersonProperties};
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13    Fact {
14        key: String,
15        value: serde_json::Value,
16    },
17    UpdateFeatureConfiguration(Option<Checkin>, FeatureFacts),
18    Event {
19        event_name: String,
20        properties: Option<Map>,
21    },
22    GetSessionProperties {
23        tx: tokio::sync::oneshot::Sender<Map>,
24    },
25    FlushNow,
26    Identify(DistinctId, IdentifyProperties),
27    SetPersonProperties(IdentifyProperties),
28    AddGroup {
29        group_name: String,
30        group_member_id: String,
31    },
32    Alias(String),
33    Reset,
34}
35
36#[derive(Default, Debug, serde::Serialize)]
37pub struct IdentifyProperties {
38    #[serde(rename = "$set")]
39    pub set: PersonProperties,
40    #[serde(rename = "$set_once")]
41    pub set_once: PersonProperties,
42}
43
44impl IdentifyProperties {
45    pub(crate) fn as_map(&self) -> Map {
46        let val = serde_json::to_value(self)
47            .inspect_err(|e| {
48                tracing::error!(
49                    self = ?&self,
50                    error = ?e,
51                    "IdentifyProperties cannot convert to a Map"
52                );
53            })
54            .unwrap_or_default();
55
56        let serde_json::Value::Object(map) = val else {
57            tracing::error!(
58                self = ?&self,
59                "IdentifyProperties did not serialize to an Object"
60            );
61            return Map::default();
62        };
63        map
64    }
65}
66
67#[derive(thiserror::Error, Debug)]
68pub enum RecorderError {
69    #[error("Timed out waiting for configuration to complete: {0:?}")]
70    WaitForConfiguration(#[from] tokio::time::error::Elapsed),
71
72    #[error("Failed to subscribe to the ConfigurationProxy for configuration changes")]
73    SubscribeFailed,
74
75    #[error(transparent)]
76    Subscription(#[from] tokio::sync::broadcast::error::RecvError),
77
78    #[error("Failed to signal the configuration proxy: '{0}'")]
79    SendToConfigurationProxy(String),
80
81    #[error(transparent)]
82    Response(#[from] tokio::sync::oneshot::error::RecvError),
83}
84
85pub struct Recorder {
86    outgoing: Sender<RawSignal>,
87    auto_refresh_config: bool,
88    to_configuration_proxy: Sender<ConfigurationProxySignal>,
89}
90
91impl Clone for Recorder {
92    fn clone(&self) -> Self {
93        Self {
94            outgoing: self.outgoing.clone(),
95            auto_refresh_config: true,
96            to_configuration_proxy: self.to_configuration_proxy.clone(),
97        }
98    }
99}
100
101impl std::fmt::Debug for Recorder {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("Recorder").finish()
104    }
105}
106
107impl Recorder {
108    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
109    pub(crate) fn new(
110        snapshotter_tx: Sender<RawSignal>,
111        to_configuration_proxy: Sender<ConfigurationProxySignal>,
112    ) -> Self {
113        Self {
114            outgoing: snapshotter_tx,
115            to_configuration_proxy,
116            auto_refresh_config: true,
117        }
118    }
119
120    // Execute a series of operations without triggering multiple configuration refreshes.
121    // Note: there are no atomic semantics, and configuration is refreshed at the end no matter what your function does.
122    pub async fn in_configuration_txn<F, T>(&self, f: F) -> T
123    where
124        F: AsyncFnOnce(&Recorder) -> T,
125    {
126        let mut rec = self.clone();
127
128        rec.auto_refresh_config = false;
129
130        let ret = f(self).await;
131
132        rec.auto_refresh_config = true;
133        rec.trigger_configuration_refresh().await;
134
135        ret
136    }
137
138    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
139    pub async fn get_feature_variant<
140        T: serde::ser::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send,
141    >(
142        &self,
143        key: impl Into<String> + std::fmt::Debug,
144    ) -> Option<T> {
145        serde_json::from_value(self.get_feature::<T>(key).await?.variant)
146            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
147            .ok()
148    }
149
150    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
151    pub async fn get_feature_ptr_variant<
152        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
153    >(
154        &self,
155        key: impl Into<String> + std::fmt::Debug,
156    ) -> Option<T> {
157        serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
158            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
159            .ok()
160    }
161
162    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
163    pub async fn get_feature_payload<
164        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
165    >(
166        &self,
167        key: impl Into<String> + std::fmt::Debug,
168    ) -> Option<T> {
169        self.get_feature::<T>(key).await?.payload
170    }
171
172    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
173    pub async fn get_feature_ptr_payload<
174        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
175    >(
176        &self,
177        key: impl Into<String> + std::fmt::Debug,
178    ) -> Option<T> {
179        self.get_feature_ptr::<T>(key).await?.payload
180    }
181
182    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
183    pub async fn get_feature_ptr<
184        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
185    >(
186        &self,
187        key: impl Into<String> + std::fmt::Debug,
188    ) -> Option<Feature<T>> {
189        let ptr = self.get_feature_payload::<String>(key).await?;
190        self.get_feature::<T>(ptr).await
191    }
192
193    pub async fn wait_for_checkin(
194        &self,
195        duration: Option<std::time::Duration>,
196    ) -> Result<(), RecorderError> {
197        let (tx, rx) = oneshot();
198
199        let subscription = self.subscribe_to_feature_changes().await;
200
201        self.to_configuration_proxy
202            .send(ConfigurationProxySignal::QueryIfCheckedIn(tx))
203            .instrument(tracing::trace_span!(
204                "requesting check in status from the configuration proxy"
205            ))
206            .await
207            .map_err(|e| RecorderError::SendToConfigurationProxy(format!("{e:?}")))?;
208
209        if rx.await? == CheckinStatus::CheckedIn {
210            return Ok(());
211        }
212
213        let Some(mut subscription) = subscription else {
214            return Err(RecorderError::SubscribeFailed);
215        };
216
217        if let Some(duration) = duration {
218            Ok(tokio::time::timeout(duration, subscription.recv()).await??)
219        } else {
220            Ok(subscription.recv().await?)
221        }
222    }
223
224    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
225    pub async fn get_feature<
226        T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
227    >(
228        &self,
229        key: impl Into<String> + std::fmt::Debug,
230    ) -> Option<Feature<T>> {
231        let key: String = key.into();
232        let (tx, rx) = oneshot();
233
234        self.to_configuration_proxy
235            .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
236            .instrument(tracing::trace_span!(
237                "requesting feature from the configuration proxy"
238            ))
239            .await
240            .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
241            .ok()?;
242
243        let feature = rx
244            .instrument(tracing::trace_span!("waiting for the feature"))
245            .await
246            .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
247            .ok()
248            .flatten()?;
249
250        self.record(
251            "$feature_flag_called",
252            Some(Map::from_iter([
253                ("$feature_flag".into(), key.into()),
254                ("$feature_flag_response".into(), feature.variant.clone()),
255            ])),
256        )
257        .await;
258
259        let variant = feature.variant.clone();
260        let payload = if let Some(ref p) = feature.payload {
261            let ret = serde_json::from_value(p.clone()).ok()?;
262            Some(ret)
263        } else {
264            None
265        };
266
267        Some(Feature { variant, payload })
268    }
269
270    pub async fn subscribe_to_feature_changes(
271        &self,
272    ) -> Option<tokio::sync::broadcast::Receiver<()>> {
273        let (tx, rx) = oneshot();
274
275        self.to_configuration_proxy
276            .send(ConfigurationProxySignal::Subscribe(tx))
277            .instrument(tracing::debug_span!("subscribe to feature changes"))
278            .await
279            .inspect_err(|e| {
280                tracing::error!(error = ?e, "Failed to request subscription to feature changes");
281            })
282            .ok()?;
283
284        rx.await
285            .inspect_err(|e| {
286                tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
287            })
288            .ok()
289    }
290
291    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
292    pub async fn set_fact(
293        &self,
294        key: impl Into<String> + std::fmt::Debug,
295        value: serde_json::Value,
296    ) {
297        if let Err(e) = self
298            .outgoing
299            .send(RawSignal::Fact {
300                key: key.into(),
301                value,
302            })
303            .await
304        {
305            tracing::error!(error = ?e, "Failed to enqueue a fact");
306        }
307    }
308
309    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
310    pub async fn record(
311        &self,
312        event: impl Into<String> + std::fmt::Debug,
313        properties: Option<Map>,
314    ) {
315        if let Err(e) = self
316            .outgoing
317            .send(RawSignal::Event {
318                event_name: event.into(),
319                properties,
320            })
321            .instrument(tracing::trace_span!("recording the event"))
322            .await
323        {
324            tracing::error!(error = ?e, "Failed to enqueue an event message");
325        }
326    }
327
328    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
329    pub async fn identify(&self, new: DistinctId) {
330        self.identify_with_properties(new, IdentifyProperties::default())
331            .await;
332    }
333
334    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
335    pub async fn identify_with_properties(&self, new: DistinctId, properties: IdentifyProperties) {
336        if let Err(e) = self
337            .outgoing
338            .send(RawSignal::Identify(new, properties))
339            .instrument(tracing::trace_span!("sending the Identify message"))
340            .await
341        {
342            tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
343        }
344
345        self.trigger_configuration_refresh()
346            .instrument(tracing::trace_span!("triggering a configuration refresh"))
347            .await;
348    }
349
350    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
351    pub async fn set_person_properties(&self, properties: IdentifyProperties) {
352        if let Err(e) = self
353            .outgoing
354            .send(RawSignal::SetPersonProperties(properties))
355            .instrument(tracing::trace_span!(
356                "sending the SetPersonProperties message"
357            ))
358            .await
359        {
360            tracing::error!(error = ?e, "Failed to enqueue set_person_properties message");
361        }
362
363        self.trigger_configuration_refresh()
364            .instrument(tracing::trace_span!("triggering a configuration refresh"))
365            .await;
366    }
367
368    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
369    pub async fn add_group(
370        &self,
371        group_name: impl Into<String> + std::fmt::Debug,
372        group_member_id: impl Into<String> + std::fmt::Debug,
373    ) {
374        if let Err(e) = self
375            .outgoing
376            .send(RawSignal::AddGroup {
377                group_name: group_name.into(),
378                group_member_id: group_member_id.into(),
379            })
380            .instrument(tracing::trace_span!("sending the AddGroup message"))
381            .await
382        {
383            tracing::error!(error = ?e, "Failed to enqueue AddGroup message");
384        }
385
386        self.trigger_configuration_refresh()
387            .instrument(tracing::trace_span!("triggering a configuration refresh"))
388            .await;
389    }
390
391    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
392    pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
393        if let Err(e) = self
394            .outgoing
395            .send(RawSignal::Alias(alias.into()))
396            .instrument(tracing::trace_span!("sending the Alias message"))
397            .await
398        {
399            tracing::error!(error = ?e, "Failed to enqueue Alias message");
400        }
401
402        self.trigger_configuration_refresh()
403            .instrument(tracing::trace_span!("triggering a configuration refresh"))
404            .await;
405    }
406
407    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
408    pub async fn reset(&self) {
409        if let Err(e) = self
410            .outgoing
411            .send(RawSignal::Reset)
412            .instrument(tracing::trace_span!("sending the Reset message"))
413            .await
414        {
415            tracing::error!(error = ?e, "Failed to enqueue reset message");
416        }
417
418        self.trigger_configuration_refresh()
419            .instrument(tracing::trace_span!("triggering a configuration refresh"))
420            .await;
421    }
422
423    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
424    async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
425        let (tx, rx) = tokio::sync::oneshot::channel();
426
427        self.outgoing
428            .send(RawSignal::GetSessionProperties { tx })
429            .instrument(tracing::trace_span!(
430                "sending the GetSessionProperties message"
431            ))
432            .await
433            .map_err(|_| FullDuplexError::SendError)?;
434
435        Ok(rx
436            .instrument(tracing::trace_span!("waiting for reply"))
437            .await?)
438    }
439
440    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
441    pub async fn flush_now(&self) {
442        if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
443            tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
444        }
445    }
446
447    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
448    pub(crate) async fn trigger_configuration_refresh(&self) {
449        if !self.auto_refresh_config {
450            tracing::trace!("Not refreshing configuration because it is paused");
451            return;
452        }
453
454        let (tx, rx) = oneshot();
455
456        let session_properties = self
457            .get_session_properties()
458            .instrument(tracing::debug_span!("request session properties"))
459            .await
460            .inspect_err(|e| {
461                tracing::debug!(%e, "Failed to get session properties");
462            })
463            .unwrap_or_default();
464
465        if let Err(e) = self
466            .to_configuration_proxy
467            .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
468            .instrument(tracing::debug_span!("request immediate check-in"))
469            .await
470        {
471            tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
472        }
473
474        let (config, feats) = match rx
475            .instrument(tracing::debug_span!("receive feature facts"))
476            .await
477        {
478            Ok((config, feats)) => (config, feats),
479            Err(e) => {
480                tracing::error!(error = ?e, "Failed to refresh the configuration");
481
482                return;
483            }
484        };
485
486        if let Err(e) = self
487            .outgoing
488            .send(RawSignal::UpdateFeatureConfiguration(config, feats))
489            .instrument(tracing::debug_span!("forward feature facts"))
490            .await
491        {
492            tracing::error!(%e, "Failed to forward updated feature facts");
493        }
494    }
495}
496
497#[derive(thiserror::Error, Debug)]
498pub enum FullDuplexError {
499    #[error("Failed to request session properties")]
500    SendError,
501
502    #[error("Error waiting for a reply: {0}")]
503    Recv(#[from] tokio::sync::oneshot::error::RecvError),
504}