detsys_ids_client/
recorder.rs

1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::Feature;
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::ConfigurationProxySignal;
8use crate::identity::DistinctId;
9use crate::Map;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13    Fact {
14        key: String,
15        value: serde_json::Value,
16    },
17    UpdateFeatureFacts(FeatureFacts),
18    Event {
19        event_name: String,
20        properties: Option<Map>,
21    },
22    GetSessionProperties {
23        tx: tokio::sync::oneshot::Sender<Map>,
24    },
25    FlushNow,
26    Identify(DistinctId),
27    Alias(String),
28    Reset,
29}
30
31#[derive(Clone)]
32pub struct Recorder {
33    outgoing: Sender<RawSignal>,
34    to_configuration_proxy: Sender<ConfigurationProxySignal>,
35}
36
37impl std::fmt::Debug for Recorder {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("Recorder").finish()
40    }
41}
42
43impl Recorder {
44    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
45    pub(crate) fn new(
46        snapshotter_tx: Sender<RawSignal>,
47        to_configuration_proxy: Sender<ConfigurationProxySignal>,
48    ) -> Self {
49        Self {
50            outgoing: snapshotter_tx,
51            to_configuration_proxy,
52        }
53    }
54
55    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
56    pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
57        &self,
58        key: impl Into<String> + std::fmt::Debug,
59    ) -> Option<T> {
60        serde_json::from_value(self.get_feature::<T>(key).await?.variant)
61            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
62            .ok()
63    }
64
65    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
66    pub async fn get_feature_ptr_variant<
67        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
68    >(
69        &self,
70        key: impl Into<String> + std::fmt::Debug,
71    ) -> Option<T> {
72        serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
73            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
74            .ok()
75    }
76
77    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
78    pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
79        &self,
80        key: impl Into<String> + std::fmt::Debug,
81    ) -> Option<T> {
82        self.get_feature::<T>(key).await?.payload
83    }
84
85    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
86    pub async fn get_feature_ptr_payload<
87        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
88    >(
89        &self,
90        key: impl Into<String> + std::fmt::Debug,
91    ) -> Option<T> {
92        self.get_feature_ptr::<T>(key).await?.payload
93    }
94
95    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
96    pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
97        &self,
98        key: impl Into<String> + std::fmt::Debug,
99    ) -> Option<Feature<T>> {
100        let ptr = self.get_feature_payload::<String>(key).await?;
101        self.get_feature::<T>(ptr).await
102    }
103
104    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
105    pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
106        &self,
107        key: impl Into<String> + std::fmt::Debug,
108    ) -> Option<Feature<T>> {
109        let key: String = key.into();
110        let (tx, rx) = oneshot();
111
112        self.to_configuration_proxy
113            .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
114            .instrument(tracing::trace_span!(
115                "requesting feature from the configuration proxy"
116            ))
117            .await
118            .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
119            .ok()?;
120
121        let feature = rx
122            .instrument(tracing::trace_span!("waiting for the feature"))
123            .await
124            .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
125            .ok()
126            .flatten()?;
127
128        self.record(
129            "$feature_flag_called",
130            Some(Map::from_iter([
131                ("$feature_flag".into(), key.into()),
132                ("$feature_flag_response".into(), feature.variant.clone()),
133            ])),
134        )
135        .await;
136
137        let variant = feature.variant.clone();
138        let payload = if let Some(ref p) = feature.payload {
139            let ret = serde_json::from_value(p.clone()).ok()?;
140            Some(ret)
141        } else {
142            None
143        };
144
145        Some(Feature { variant, payload })
146    }
147
148    pub async fn subscribe_to_feature_changes(
149        &self,
150    ) -> Option<tokio::sync::broadcast::Receiver<()>> {
151        let (tx, rx) = oneshot();
152
153        self.to_configuration_proxy
154            .send(ConfigurationProxySignal::Subscribe(tx))
155            .instrument(tracing::debug_span!("subscribe to feature changes"))
156            .await
157            .inspect_err(|e| {
158                tracing::error!(error = ?e, "Failed to request subscription to feature changes");
159            })
160            .ok()?;
161
162        rx.await
163            .inspect_err(|e| {
164                tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
165            })
166            .ok()
167    }
168
169    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
170    pub async fn add_fact(&self, key: &str, value: serde_json::Value) {
171        if let Err(e) = self
172            .outgoing
173            .send(RawSignal::Fact {
174                key: key.to_string(),
175                value,
176            })
177            .await
178        {
179            tracing::error!(error = ?e, "Failed to enqueue a fact");
180        }
181    }
182
183    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
184    pub async fn record(&self, event: &str, properties: Option<Map>) {
185        if let Err(e) = self
186            .outgoing
187            .send(RawSignal::Event {
188                event_name: event.to_string(),
189                properties,
190            })
191            .instrument(tracing::trace_span!("recording the event"))
192            .await
193        {
194            tracing::error!(error = ?e, "Failed to enqueue an event message");
195        }
196    }
197
198    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
199    pub async fn identify(&self, new: DistinctId) {
200        if let Err(e) = self
201            .outgoing
202            .send(RawSignal::Identify(new))
203            .instrument(tracing::trace_span!("sending the Identify message"))
204            .await
205        {
206            tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
207        }
208
209        self.trigger_configuration_refresh()
210            .instrument(tracing::trace_span!("triggering a configuration refresh"))
211            .await;
212    }
213
214    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
215    pub async fn alias(&self, alias: String) {
216        if let Err(e) = self
217            .outgoing
218            .send(RawSignal::Alias(alias))
219            .instrument(tracing::trace_span!("sending the Alias message"))
220            .await
221        {
222            tracing::error!(error = ?e, "Failed to enqueue Alias message");
223        }
224
225        self.trigger_configuration_refresh()
226            .instrument(tracing::trace_span!("triggering a configuration refresh"))
227            .await;
228    }
229
230    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
231    pub async fn reset(&self) {
232        if let Err(e) = self
233            .outgoing
234            .send(RawSignal::Reset)
235            .instrument(tracing::trace_span!("sending the Reset message"))
236            .await
237        {
238            tracing::error!(error = ?e, "Failed to enqueue reset message");
239        }
240
241        self.trigger_configuration_refresh()
242            .instrument(tracing::trace_span!("triggering a configuration refresh"))
243            .await;
244    }
245
246    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
247    async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
248        let (tx, rx) = tokio::sync::oneshot::channel();
249
250        self.outgoing
251            .send(RawSignal::GetSessionProperties { tx })
252            .instrument(tracing::trace_span!(
253                "sending the GetSessionProperties message"
254            ))
255            .await
256            .map_err(|_| FullDuplexError::SendError)?;
257
258        Ok(rx
259            .instrument(tracing::trace_span!("waiting for reply"))
260            .await?)
261    }
262
263    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
264    pub async fn flush_now(&self) {
265        if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
266            tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
267        }
268    }
269
270    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
271    pub(crate) async fn trigger_configuration_refresh(&self) {
272        let (tx, rx) = oneshot();
273
274        let session_properties = self
275            .get_session_properties()
276            .instrument(tracing::debug_span!("request session properties"))
277            .await
278            .inspect_err(|e| {
279                tracing::debug!(%e, "Failed to get session properties");
280            })
281            .unwrap_or_default();
282
283        if let Err(e) = self
284            .to_configuration_proxy
285            .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
286            .instrument(tracing::debug_span!("request immediate check-in"))
287            .await
288        {
289            tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
290        }
291
292        let feats = match rx
293            .instrument(tracing::debug_span!("receive feature facts"))
294            .await
295        {
296            Ok(feats) => feats,
297            Err(e) => {
298                tracing::error!(error = ?e, "Failed to refresh the configuration");
299
300                return;
301            }
302        };
303
304        if let Err(e) = self
305            .outgoing
306            .send(RawSignal::UpdateFeatureFacts(feats))
307            .instrument(tracing::debug_span!("forward feature facts"))
308            .await
309        {
310            tracing::error!(%e, "Failed to forward updated feature facts");
311        }
312    }
313}
314
315#[derive(thiserror::Error, Debug)]
316pub enum FullDuplexError {
317    #[error("Failed to request session properties")]
318    SendError,
319
320    #[error("Error waiting for a reply: {0}")]
321    Recv(#[from] tokio::sync::oneshot::error::RecvError),
322}