detsys_ids_client/
recorder.rs

1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::Map;
6use crate::checkin::Feature;
7use crate::collator::FeatureFacts;
8use crate::configuration_proxy::ConfigurationProxySignal;
9use crate::identity::DistinctId;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13    Fact {
14        key: String,
15        value: serde_json::Value,
16    },
17    UpdateFeatureFacts(FeatureFacts),
18    Event {
19        event_name: String,
20        properties: Option<Map>,
21    },
22    GetSessionProperties {
23        tx: tokio::sync::oneshot::Sender<Map>,
24    },
25    FlushNow,
26    Identify(DistinctId),
27    Alias(String),
28    Reset,
29}
30
31#[derive(Clone)]
32pub struct Recorder {
33    outgoing: Sender<RawSignal>,
34    to_configuration_proxy: Sender<ConfigurationProxySignal>,
35}
36
37impl std::fmt::Debug for Recorder {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("Recorder").finish()
40    }
41}
42
43impl Recorder {
44    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
45    pub(crate) fn new(
46        snapshotter_tx: Sender<RawSignal>,
47        to_configuration_proxy: Sender<ConfigurationProxySignal>,
48    ) -> Self {
49        Self {
50            outgoing: snapshotter_tx,
51            to_configuration_proxy,
52        }
53    }
54
55    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
56    pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
57        &self,
58        key: impl Into<String> + std::fmt::Debug,
59    ) -> Option<T> {
60        serde_json::from_value(self.get_feature::<T>(key).await?.variant)
61            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
62            .ok()
63    }
64
65    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
66    pub async fn get_feature_ptr_variant<
67        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
68    >(
69        &self,
70        key: impl Into<String> + std::fmt::Debug,
71    ) -> Option<T> {
72        serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
73            .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
74            .ok()
75    }
76
77    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
78    pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
79        &self,
80        key: impl Into<String> + std::fmt::Debug,
81    ) -> Option<T> {
82        self.get_feature::<T>(key).await?.payload
83    }
84
85    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
86    pub async fn get_feature_ptr_payload<
87        T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
88    >(
89        &self,
90        key: impl Into<String> + std::fmt::Debug,
91    ) -> Option<T> {
92        self.get_feature_ptr::<T>(key).await?.payload
93    }
94
95    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
96    pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
97        &self,
98        key: impl Into<String> + std::fmt::Debug,
99    ) -> Option<Feature<T>> {
100        let ptr = self.get_feature_payload::<String>(key).await?;
101        self.get_feature::<T>(ptr).await
102    }
103
104    #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
105    pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
106        &self,
107        key: impl Into<String> + std::fmt::Debug,
108    ) -> Option<Feature<T>> {
109        let key: String = key.into();
110        let (tx, rx) = oneshot();
111
112        self.to_configuration_proxy
113            .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
114            .instrument(tracing::trace_span!(
115                "requesting feature from the configuration proxy"
116            ))
117            .await
118            .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
119            .ok()?;
120
121        let feature = rx
122            .instrument(tracing::trace_span!("waiting for the feature"))
123            .await
124            .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
125            .ok()
126            .flatten()?;
127
128        self.record(
129            "$feature_flag_called",
130            Some(Map::from_iter([
131                ("$feature_flag".into(), key.into()),
132                ("$feature_flag_response".into(), feature.variant.clone()),
133            ])),
134        )
135        .await;
136
137        let variant = feature.variant.clone();
138        let payload = if let Some(ref p) = feature.payload {
139            let ret = serde_json::from_value(p.clone()).ok()?;
140            Some(ret)
141        } else {
142            None
143        };
144
145        Some(Feature { variant, payload })
146    }
147
148    pub async fn subscribe_to_feature_changes(
149        &self,
150    ) -> Option<tokio::sync::broadcast::Receiver<()>> {
151        let (tx, rx) = oneshot();
152
153        self.to_configuration_proxy
154            .send(ConfigurationProxySignal::Subscribe(tx))
155            .instrument(tracing::debug_span!("subscribe to feature changes"))
156            .await
157            .inspect_err(|e| {
158                tracing::error!(error = ?e, "Failed to request subscription to feature changes");
159            })
160            .ok()?;
161
162        rx.await
163            .inspect_err(|e| {
164                tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
165            })
166            .ok()
167    }
168
169    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
170    pub async fn set_fact(
171        &self,
172        key: impl Into<String> + std::fmt::Debug,
173        value: serde_json::Value,
174    ) {
175        if let Err(e) = self
176            .outgoing
177            .send(RawSignal::Fact {
178                key: key.into(),
179                value,
180            })
181            .await
182        {
183            tracing::error!(error = ?e, "Failed to enqueue a fact");
184        }
185    }
186
187    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
188    pub async fn record(
189        &self,
190        event: impl Into<String> + std::fmt::Debug,
191        properties: Option<Map>,
192    ) {
193        if let Err(e) = self
194            .outgoing
195            .send(RawSignal::Event {
196                event_name: event.into(),
197                properties,
198            })
199            .instrument(tracing::trace_span!("recording the event"))
200            .await
201        {
202            tracing::error!(error = ?e, "Failed to enqueue an event message");
203        }
204    }
205
206    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
207    pub async fn identify(&self, new: DistinctId) {
208        if let Err(e) = self
209            .outgoing
210            .send(RawSignal::Identify(new))
211            .instrument(tracing::trace_span!("sending the Identify message"))
212            .await
213        {
214            tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
215        }
216
217        self.trigger_configuration_refresh()
218            .instrument(tracing::trace_span!("triggering a configuration refresh"))
219            .await;
220    }
221
222    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
223    pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
224        if let Err(e) = self
225            .outgoing
226            .send(RawSignal::Alias(alias.into()))
227            .instrument(tracing::trace_span!("sending the Alias message"))
228            .await
229        {
230            tracing::error!(error = ?e, "Failed to enqueue Alias message");
231        }
232
233        self.trigger_configuration_refresh()
234            .instrument(tracing::trace_span!("triggering a configuration refresh"))
235            .await;
236    }
237
238    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
239    pub async fn reset(&self) {
240        if let Err(e) = self
241            .outgoing
242            .send(RawSignal::Reset)
243            .instrument(tracing::trace_span!("sending the Reset message"))
244            .await
245        {
246            tracing::error!(error = ?e, "Failed to enqueue reset message");
247        }
248
249        self.trigger_configuration_refresh()
250            .instrument(tracing::trace_span!("triggering a configuration refresh"))
251            .await;
252    }
253
254    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
255    async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
256        let (tx, rx) = tokio::sync::oneshot::channel();
257
258        self.outgoing
259            .send(RawSignal::GetSessionProperties { tx })
260            .instrument(tracing::trace_span!(
261                "sending the GetSessionProperties message"
262            ))
263            .await
264            .map_err(|_| FullDuplexError::SendError)?;
265
266        Ok(rx
267            .instrument(tracing::trace_span!("waiting for reply"))
268            .await?)
269    }
270
271    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
272    pub async fn flush_now(&self) {
273        if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
274            tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
275        }
276    }
277
278    #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
279    pub(crate) async fn trigger_configuration_refresh(&self) {
280        let (tx, rx) = oneshot();
281
282        let session_properties = self
283            .get_session_properties()
284            .instrument(tracing::debug_span!("request session properties"))
285            .await
286            .inspect_err(|e| {
287                tracing::debug!(%e, "Failed to get session properties");
288            })
289            .unwrap_or_default();
290
291        if let Err(e) = self
292            .to_configuration_proxy
293            .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
294            .instrument(tracing::debug_span!("request immediate check-in"))
295            .await
296        {
297            tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
298        }
299
300        let feats = match rx
301            .instrument(tracing::debug_span!("receive feature facts"))
302            .await
303        {
304            Ok(feats) => feats,
305            Err(e) => {
306                tracing::error!(error = ?e, "Failed to refresh the configuration");
307
308                return;
309            }
310        };
311
312        if let Err(e) = self
313            .outgoing
314            .send(RawSignal::UpdateFeatureFacts(feats))
315            .instrument(tracing::debug_span!("forward feature facts"))
316            .await
317        {
318            tracing::error!(%e, "Failed to forward updated feature facts");
319        }
320    }
321}
322
323#[derive(thiserror::Error, Debug)]
324pub enum FullDuplexError {
325    #[error("Failed to request session properties")]
326    SendError,
327
328    #[error("Error waiting for a reply: {0}")]
329    Recv(#[from] tokio::sync::oneshot::error::RecvError),
330}