detsys_ids_client/
recorder.rs

1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::Map;
6use crate::checkin::Feature;
7use crate::collator::FeatureFacts;
8use crate::configuration_proxy::ConfigurationProxySignal;
9use crate::identity::DistinctId;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13    Fact {
14        key: String,
15        value: serde_json::Value,
16    },
17    UpdateFeatureFacts(FeatureFacts),
18    Event {
19        event_name: String,
20        properties: Option<Map>,
21    },
22    GetSessionProperties {
23        tx: tokio::sync::oneshot::Sender<Map>,
24    },
25    FlushNow,
26    Identify(DistinctId),
27    AddGroup {
28        group_name: String,
29        group_member_id: String,
30    },
31    Alias(String),
32    Reset,
33}
34
35#[derive(Clone)]
36pub struct Recorder {
37    outgoing: Sender<RawSignal>,
38    to_configuration_proxy: Sender<ConfigurationProxySignal>,
39}
40
41impl std::fmt::Debug for Recorder {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("Recorder").finish()
44    }
45}
46
47impl Recorder {
48    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
49    pub(crate) fn new(
50        snapshotter_tx: Sender<RawSignal>,
51        to_configuration_proxy: Sender<ConfigurationProxySignal>,
52    ) -> Self {
53        Self {
54            outgoing: snapshotter_tx,
55            to_configuration_proxy,
56        }
57    }
58
59    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
60    pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
61        &self,
62        key: impl Into<String> + std::fmt::Debug,
63    ) -> Option<T> {
64        serde_json::from_value(self.get_feature::<T>(key).await?.variant)
65            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
66            .ok()
67    }
68
69    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
70    pub async fn get_feature_ptr_variant<
71        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
72    >(
73        &self,
74        key: impl Into<String> + std::fmt::Debug,
75    ) -> Option<T> {
76        serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
77            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
78            .ok()
79    }
80
81    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
82    pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
83        &self,
84        key: impl Into<String> + std::fmt::Debug,
85    ) -> Option<T> {
86        self.get_feature::<T>(key).await?.payload
87    }
88
89    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
90    pub async fn get_feature_ptr_payload<
91        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
92    >(
93        &self,
94        key: impl Into<String> + std::fmt::Debug,
95    ) -> Option<T> {
96        self.get_feature_ptr::<T>(key).await?.payload
97    }
98
99    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
100    pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
101        &self,
102        key: impl Into<String> + std::fmt::Debug,
103    ) -> Option<Feature<T>> {
104        let ptr = self.get_feature_payload::<String>(key).await?;
105        self.get_feature::<T>(ptr).await
106    }
107
108    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
109    pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
110        &self,
111        key: impl Into<String> + std::fmt::Debug,
112    ) -> Option<Feature<T>> {
113        let key: String = key.into();
114        let (tx, rx) = oneshot();
115
116        self.to_configuration_proxy
117            .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
118            .instrument(tracing::trace_span!(
119                "requesting feature from the configuration proxy"
120            ))
121            .await
122            .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
123            .ok()?;
124
125        let feature = rx
126            .instrument(tracing::trace_span!("waiting for the feature"))
127            .await
128            .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
129            .ok()
130            .flatten()?;
131
132        self.record(
133            "$feature_flag_called",
134            Some(Map::from_iter([
135                ("$feature_flag".into(), key.into()),
136                ("$feature_flag_response".into(), feature.variant.clone()),
137            ])),
138        )
139        .await;
140
141        let variant = feature.variant.clone();
142        let payload = if let Some(ref p) = feature.payload {
143            let ret = serde_json::from_value(p.clone()).ok()?;
144            Some(ret)
145        } else {
146            None
147        };
148
149        Some(Feature { variant, payload })
150    }
151
152    pub async fn subscribe_to_feature_changes(
153        &self,
154    ) -> Option<tokio::sync::broadcast::Receiver<()>> {
155        let (tx, rx) = oneshot();
156
157        self.to_configuration_proxy
158            .send(ConfigurationProxySignal::Subscribe(tx))
159            .instrument(tracing::debug_span!("subscribe to feature changes"))
160            .await
161            .inspect_err(|e| {
162                tracing::error!(error = ?e, "Failed to request subscription to feature changes");
163            })
164            .ok()?;
165
166        rx.await
167            .inspect_err(|e| {
168                tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
169            })
170            .ok()
171    }
172
173    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
174    pub async fn set_fact(
175        &self,
176        key: impl Into<String> + std::fmt::Debug,
177        value: serde_json::Value,
178    ) {
179        if let Err(e) = self
180            .outgoing
181            .send(RawSignal::Fact {
182                key: key.into(),
183                value,
184            })
185            .await
186        {
187            tracing::error!(error = ?e, "Failed to enqueue a fact");
188        }
189    }
190
191    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
192    pub async fn record(
193        &self,
194        event: impl Into<String> + std::fmt::Debug,
195        properties: Option<Map>,
196    ) {
197        if let Err(e) = self
198            .outgoing
199            .send(RawSignal::Event {
200                event_name: event.into(),
201                properties,
202            })
203            .instrument(tracing::trace_span!("recording the event"))
204            .await
205        {
206            tracing::error!(error = ?e, "Failed to enqueue an event message");
207        }
208    }
209
210    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
211    pub async fn identify(&self, new: DistinctId) {
212        if let Err(e) = self
213            .outgoing
214            .send(RawSignal::Identify(new))
215            .instrument(tracing::trace_span!("sending the Identify message"))
216            .await
217        {
218            tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
219        }
220
221        self.trigger_configuration_refresh()
222            .instrument(tracing::trace_span!("triggering a configuration refresh"))
223            .await;
224    }
225
226    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
227    pub async fn add_group(
228        &self,
229        group_name: impl Into<String> + std::fmt::Debug,
230        group_member_id: impl Into<String> + std::fmt::Debug,
231    ) {
232        if let Err(e) = self
233            .outgoing
234            .send(RawSignal::AddGroup {
235                group_name: group_name.into(),
236                group_member_id: group_member_id.into(),
237            })
238            .instrument(tracing::trace_span!("sending the AddGroup message"))
239            .await
240        {
241            tracing::error!(error = ?e, "Failed to enqueue AddGroup message");
242        }
243
244        self.trigger_configuration_refresh()
245            .instrument(tracing::trace_span!("triggering a configuration refresh"))
246            .await;
247    }
248
249    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
250    pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
251        if let Err(e) = self
252            .outgoing
253            .send(RawSignal::Alias(alias.into()))
254            .instrument(tracing::trace_span!("sending the Alias message"))
255            .await
256        {
257            tracing::error!(error = ?e, "Failed to enqueue Alias message");
258        }
259
260        self.trigger_configuration_refresh()
261            .instrument(tracing::trace_span!("triggering a configuration refresh"))
262            .await;
263    }
264
265    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
266    pub async fn reset(&self) {
267        if let Err(e) = self
268            .outgoing
269            .send(RawSignal::Reset)
270            .instrument(tracing::trace_span!("sending the Reset message"))
271            .await
272        {
273            tracing::error!(error = ?e, "Failed to enqueue reset message");
274        }
275
276        self.trigger_configuration_refresh()
277            .instrument(tracing::trace_span!("triggering a configuration refresh"))
278            .await;
279    }
280
281    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
282    async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
283        let (tx, rx) = tokio::sync::oneshot::channel();
284
285        self.outgoing
286            .send(RawSignal::GetSessionProperties { tx })
287            .instrument(tracing::trace_span!(
288                "sending the GetSessionProperties message"
289            ))
290            .await
291            .map_err(|_| FullDuplexError::SendError)?;
292
293        Ok(rx
294            .instrument(tracing::trace_span!("waiting for reply"))
295            .await?)
296    }
297
298    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
299    pub async fn flush_now(&self) {
300        if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
301            tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
302        }
303    }
304
305    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
306    pub(crate) async fn trigger_configuration_refresh(&self) {
307        let (tx, rx) = oneshot();
308
309        let session_properties = self
310            .get_session_properties()
311            .instrument(tracing::debug_span!("request session properties"))
312            .await
313            .inspect_err(|e| {
314                tracing::debug!(%e, "Failed to get session properties");
315            })
316            .unwrap_or_default();
317
318        if let Err(e) = self
319            .to_configuration_proxy
320            .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
321            .instrument(tracing::debug_span!("request immediate check-in"))
322            .await
323        {
324            tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
325        }
326
327        let feats = match rx
328            .instrument(tracing::debug_span!("receive feature facts"))
329            .await
330        {
331            Ok(feats) => feats,
332            Err(e) => {
333                tracing::error!(error = ?e, "Failed to refresh the configuration");
334
335                return;
336            }
337        };
338
339        if let Err(e) = self
340            .outgoing
341            .send(RawSignal::UpdateFeatureFacts(feats))
342            .instrument(tracing::debug_span!("forward feature facts"))
343            .await
344        {
345            tracing::error!(%e, "Failed to forward updated feature facts");
346        }
347    }
348}
349
350#[derive(thiserror::Error, Debug)]
351pub enum FullDuplexError {
352    #[error("Failed to request session properties")]
353    SendError,
354
355    #[error("Error waiting for a reply: {0}")]
356    Recv(#[from] tokio::sync::oneshot::error::RecvError),
357}