detsys_ids_client/
recorder.rs

1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::Feature;
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::ConfigurationProxySignal;
8use crate::identity::DistinctId;
9use crate::Map;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13    Fact {
14        key: String,
15        value: serde_json::Value,
16    },
17    UpdateFeatureFacts(FeatureFacts),
18    Event {
19        event_name: String,
20        properties: Option<Map>,
21    },
22    GetSessionProperties {
23        tx: tokio::sync::oneshot::Sender<Map>,
24    },
25    FlushNow,
26    Identify(DistinctId),
27    Alias(String),
28    Reset,
29}
30
31#[derive(Clone)]
32pub struct Recorder {
33    outgoing: Sender<RawSignal>,
34    to_configuration_proxy: Sender<ConfigurationProxySignal>,
35}
36
37impl std::fmt::Debug for Recorder {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("Recorder").finish()
40    }
41}
42
43impl Recorder {
44    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
45    pub(crate) fn new(
46        snapshotter_tx: Sender<RawSignal>,
47        to_configuration_proxy: Sender<ConfigurationProxySignal>,
48    ) -> Self {
49        Self {
50            outgoing: snapshotter_tx,
51            to_configuration_proxy,
52        }
53    }
54
55    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
56    pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
57        &self,
58        key: impl Into<String> + std::fmt::Debug,
59    ) -> Option<T> {
60        serde_json::from_value(self.get_feature::<T>(key).await?.variant)
61            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
62            .ok()
63    }
64
65    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
66    pub async fn get_feature_ptr_variant<
67        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
68    >(
69        &self,
70        key: impl Into<String> + std::fmt::Debug,
71    ) -> Option<T> {
72        serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
73            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
74            .ok()
75    }
76
77    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
78    pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
79        &self,
80        key: impl Into<String> + std::fmt::Debug,
81    ) -> Option<T> {
82        self.get_feature::<T>(key).await?.payload
83    }
84
85    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
86    pub async fn get_feature_ptr_payload<
87        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
88    >(
89        &self,
90        key: impl Into<String> + std::fmt::Debug,
91    ) -> Option<T> {
92        self.get_feature_ptr::<T>(key).await?.payload
93    }
94
95    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
96    pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
97        &self,
98        key: impl Into<String> + std::fmt::Debug,
99    ) -> Option<Feature<T>> {
100        let ptr = self.get_feature_payload::<String>(key).await?;
101        self.get_feature::<T>(ptr).await
102    }
103
104    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
105    pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
106        &self,
107        key: impl Into<String> + std::fmt::Debug,
108    ) -> Option<Feature<T>> {
109        let key: String = key.into();
110        let (tx, rx) = oneshot();
111
112        self.to_configuration_proxy
113            .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
114            .instrument(tracing::trace_span!(
115                "requesting feature from the configuration proxy"
116            ))
117            .await
118            .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
119            .ok()?;
120
121        let feature = rx
122            .instrument(tracing::trace_span!("waiting for the feature"))
123            .await
124            .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
125            .ok()
126            .flatten()?;
127
128        self.record(
129            "$feature_flag_called",
130            Some(Map::from_iter([
131                ("$feature_flag".into(), key.into()),
132                ("$feature_flag_response".into(), feature.variant.clone()),
133            ])),
134        )
135        .await;
136
137        let variant = feature.variant.clone();
138        let payload = if let Some(ref p) = feature.payload {
139            let ret = serde_json::from_value(p.clone()).ok()?;
140            Some(ret)
141        } else {
142            None
143        };
144
145        Some(Feature { variant, payload })
146    }
147
148    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
149    pub async fn add_fact(&self, key: &str, value: serde_json::Value) {
150        if let Err(e) = self
151            .outgoing
152            .send(RawSignal::Fact {
153                key: key.to_string(),
154                value,
155            })
156            .await
157        {
158            tracing::error!(error = ?e, "Failed to enqueue a fact");
159        }
160    }
161
162    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
163    pub async fn record(&self, event: &str, properties: Option<Map>) {
164        if let Err(e) = self
165            .outgoing
166            .send(RawSignal::Event {
167                event_name: event.to_string(),
168                properties,
169            })
170            .instrument(tracing::trace_span!("recording the event"))
171            .await
172        {
173            tracing::error!(error = ?e, "Failed to enqueue an event message");
174        }
175    }
176
177    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
178    pub async fn identify(&self, new: DistinctId) {
179        if let Err(e) = self
180            .outgoing
181            .send(RawSignal::Identify(new))
182            .instrument(tracing::trace_span!("sending the Identify message"))
183            .await
184        {
185            tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
186        }
187
188        self.trigger_configuration_refresh()
189            .instrument(tracing::trace_span!("triggering a configuration refresh"))
190            .await;
191    }
192
193    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
194    pub async fn alias(&self, alias: String) {
195        if let Err(e) = self
196            .outgoing
197            .send(RawSignal::Alias(alias))
198            .instrument(tracing::trace_span!("sending the Alias message"))
199            .await
200        {
201            tracing::error!(error = ?e, "Failed to enqueue Alias message");
202        }
203
204        self.trigger_configuration_refresh()
205            .instrument(tracing::trace_span!("triggering a configuration refresh"))
206            .await;
207    }
208
209    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
210    pub async fn reset(&self) {
211        if let Err(e) = self
212            .outgoing
213            .send(RawSignal::Reset)
214            .instrument(tracing::trace_span!("sending the Reset message"))
215            .await
216        {
217            tracing::error!(error = ?e, "Failed to enqueue reset message");
218        }
219
220        self.trigger_configuration_refresh()
221            .instrument(tracing::trace_span!("triggering a configuration refresh"))
222            .await;
223    }
224
225    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
226    async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
227        let (tx, rx) = tokio::sync::oneshot::channel();
228
229        self.outgoing
230            .send(RawSignal::GetSessionProperties { tx })
231            .instrument(tracing::trace_span!(
232                "sending the GetSessionProperties message"
233            ))
234            .await
235            .map_err(|_| FullDuplexError::SendError)?;
236
237        Ok(rx
238            .instrument(tracing::trace_span!("waiting for reply"))
239            .await?)
240    }
241
242    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
243    pub async fn flush_now(&self) {
244        if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
245            tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
246        }
247    }
248
249    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
250    pub(crate) async fn trigger_configuration_refresh(&self) {
251        let (tx, rx) = oneshot();
252
253        let session_properties = self
254            .get_session_properties()
255            .instrument(tracing::debug_span!("request session properties"))
256            .await
257            .inspect_err(|e| {
258                tracing::debug!(%e, "Failed to get session properties");
259            })
260            .unwrap_or_default();
261
262        if let Err(e) = self
263            .to_configuration_proxy
264            .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
265            .instrument(tracing::debug_span!("request immediate check-in"))
266            .await
267        {
268            tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
269        }
270
271        let feats = match rx
272            .instrument(tracing::debug_span!("receive feature facts"))
273            .await
274        {
275            Ok(feats) => feats,
276            Err(e) => {
277                tracing::error!(error = ?e, "Failed to refresh the configuration");
278
279                return;
280            }
281        };
282
283        if let Err(e) = self
284            .outgoing
285            .send(RawSignal::UpdateFeatureFacts(feats))
286            .instrument(tracing::debug_span!("forward feature facts"))
287            .await
288        {
289            tracing::error!(%e, "Failed to forward updated feature facts");
290        }
291    }
292}
293
294#[derive(thiserror::Error, Debug)]
295pub enum FullDuplexError {
296    #[error("Failed to request session properties")]
297    SendError,
298
299    #[error("Error waiting for a reply: {0}")]
300    Recv(#[from] tokio::sync::oneshot::error::RecvError),
301}