1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::{Checkin, Feature};
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::{CheckinStatus, ConfigurationProxySignal};
8use crate::identity::DistinctId;
9use crate::{Map, PersonProperties};
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13 Fact {
14 key: String,
15 value: serde_json::Value,
16 },
17 UpdateFeatureConfiguration(Option<Checkin>, FeatureFacts),
18 Event {
19 event_name: String,
20 properties: Option<Map>,
21 },
22 GetSessionProperties {
23 tx: tokio::sync::oneshot::Sender<Map>,
24 },
25 FlushNow,
26 Identify(DistinctId, IdentifyProperties),
27 SetPersonProperties(IdentifyProperties),
28 AddGroup {
29 group_name: String,
30 group_member_id: String,
31 },
32 Alias(String),
33 Reset,
34}
35
36#[derive(Default, Debug, serde::Serialize)]
37pub struct IdentifyProperties {
38 #[serde(rename = "$set")]
39 pub set: PersonProperties,
40 #[serde(rename = "$set_once")]
41 pub set_once: PersonProperties,
42}
43
44impl IdentifyProperties {
45 pub(crate) fn as_map(&self) -> Map {
46 let val = serde_json::to_value(self)
47 .inspect_err(|e| {
48 tracing::error!(
49 self = ?&self,
50 error = ?e,
51 "IdentifyProperties cannot convert to a Map"
52 );
53 })
54 .unwrap_or_default();
55
56 let serde_json::Value::Object(map) = val else {
57 tracing::error!(
58 self = ?&self,
59 "IdentifyProperties did not serialize to an Object"
60 );
61 return Map::default();
62 };
63 map
64 }
65}
66
67#[derive(thiserror::Error, Debug)]
68pub enum RecorderError {
69 #[error("Timed out waiting for configuration to complete: {0:?}")]
70 WaitForConfiguration(#[from] tokio::time::error::Elapsed),
71
72 #[error("Failed to subscribe to the ConfigurationProxy for configuration changes")]
73 SubscribeFailed,
74
75 #[error(transparent)]
76 Subscription(#[from] tokio::sync::broadcast::error::RecvError),
77
78 #[error("Failed to signal the configuration proxy: '{0}'")]
79 SendToConfigurationProxy(String),
80
81 #[error(transparent)]
82 Response(#[from] tokio::sync::oneshot::error::RecvError),
83}
84
85pub struct Recorder {
86 outgoing: Sender<RawSignal>,
87 auto_refresh_config: bool,
88 to_configuration_proxy: Sender<ConfigurationProxySignal>,
89}
90
91impl Clone for Recorder {
92 fn clone(&self) -> Self {
93 Self {
94 outgoing: self.outgoing.clone(),
95 auto_refresh_config: true,
96 to_configuration_proxy: self.to_configuration_proxy.clone(),
97 }
98 }
99}
100
101impl std::fmt::Debug for Recorder {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("Recorder").finish()
104 }
105}
106
107impl Recorder {
108 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
109 pub(crate) fn new(
110 snapshotter_tx: Sender<RawSignal>,
111 to_configuration_proxy: Sender<ConfigurationProxySignal>,
112 ) -> Self {
113 Self {
114 outgoing: snapshotter_tx,
115 to_configuration_proxy,
116 auto_refresh_config: true,
117 }
118 }
119
120 pub async fn in_configuration_txn<F, T>(&self, f: F) -> T
123 where
124 F: AsyncFnOnce(&Recorder) -> T,
125 {
126 let mut rec = self.clone();
127
128 rec.auto_refresh_config = false;
129
130 let ret = f(self).await;
131
132 rec.auto_refresh_config = true;
133 rec.trigger_configuration_refresh().await;
134
135 ret
136 }
137
138 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
139 pub async fn get_feature_variant<
140 T: serde::ser::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Send,
141 >(
142 &self,
143 key: impl Into<String> + std::fmt::Debug,
144 ) -> Option<T> {
145 serde_json::from_value(self.get_feature::<T>(key).await?.variant)
146 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
147 .ok()
148 }
149
150 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
151 pub async fn get_feature_ptr_variant<
152 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
153 >(
154 &self,
155 key: impl Into<String> + std::fmt::Debug,
156 ) -> Option<T> {
157 serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
158 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
159 .ok()
160 }
161
162 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
163 pub async fn get_feature_payload<
164 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
165 >(
166 &self,
167 key: impl Into<String> + std::fmt::Debug,
168 ) -> Option<T> {
169 self.get_feature::<T>(key).await?.payload
170 }
171
172 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
173 pub async fn get_feature_ptr_payload<
174 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
175 >(
176 &self,
177 key: impl Into<String> + std::fmt::Debug,
178 ) -> Option<T> {
179 self.get_feature_ptr::<T>(key).await?.payload
180 }
181
182 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
183 pub async fn get_feature_ptr<
184 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
185 >(
186 &self,
187 key: impl Into<String> + std::fmt::Debug,
188 ) -> Option<Feature<T>> {
189 let ptr = self.get_feature_payload::<String>(key).await?;
190 self.get_feature::<T>(ptr).await
191 }
192
193 pub async fn wait_for_checkin(
194 &self,
195 duration: Option<std::time::Duration>,
196 ) -> Result<(), RecorderError> {
197 let (tx, rx) = oneshot();
198
199 let subscription = self.subscribe_to_feature_changes().await;
200
201 self.to_configuration_proxy
202 .send(ConfigurationProxySignal::QueryIfCheckedIn(tx))
203 .instrument(tracing::trace_span!(
204 "requesting check in status from the configuration proxy"
205 ))
206 .await
207 .map_err(|e| RecorderError::SendToConfigurationProxy(format!("{e:?}")))?;
208
209 if rx.await? == CheckinStatus::CheckedIn {
210 tracing::debug!("Already checked in!");
211 return Ok(());
212 }
213
214 let Some(mut subscription) = subscription else {
215 return Err(RecorderError::SubscribeFailed);
216 };
217
218 if let Some(duration) = duration {
219 Ok(tokio::time::timeout(duration, subscription.recv()).await??)
220 } else {
221 Ok(subscription.recv().await?)
222 }
223 }
224
225 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
226 pub async fn get_feature<
227 T: serde::ser::Serialize + serde::de::DeserializeOwned + Send + std::fmt::Debug,
228 >(
229 &self,
230 key: impl Into<String> + std::fmt::Debug,
231 ) -> Option<Feature<T>> {
232 let key: String = key.into();
233 let (tx, rx) = oneshot();
234
235 self.to_configuration_proxy
236 .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
237 .instrument(tracing::trace_span!(
238 "requesting feature from the configuration proxy"
239 ))
240 .await
241 .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
242 .ok()?;
243
244 let feature = rx
245 .instrument(tracing::trace_span!("waiting for the feature"))
246 .await
247 .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
248 .ok()
249 .flatten()?;
250
251 self.record(
252 "$feature_flag_called",
253 Some(Map::from_iter([
254 ("$feature_flag".into(), key.into()),
255 ("$feature_flag_response".into(), feature.variant.clone()),
256 ])),
257 )
258 .await;
259
260 let variant = feature.variant.clone();
261 let payload = if let Some(ref p) = feature.payload {
262 let ret = serde_json::from_value(p.clone()).ok()?;
263 Some(ret)
264 } else {
265 None
266 };
267
268 Some(Feature { variant, payload })
269 }
270
271 pub async fn subscribe_to_feature_changes(
272 &self,
273 ) -> Option<tokio::sync::broadcast::Receiver<()>> {
274 let (tx, rx) = oneshot();
275
276 self.to_configuration_proxy
277 .send(ConfigurationProxySignal::Subscribe(tx))
278 .instrument(tracing::debug_span!("subscribe to feature changes"))
279 .await
280 .inspect_err(|e| {
281 tracing::error!(error = ?e, "Failed to request subscription to feature changes");
282 })
283 .ok()?;
284
285 rx.await
286 .inspect_err(|e| {
287 tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
288 })
289 .ok()
290 }
291
292 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
293 pub async fn set_fact(
294 &self,
295 key: impl Into<String> + std::fmt::Debug,
296 value: serde_json::Value,
297 ) {
298 if let Err(e) = self
299 .outgoing
300 .send(RawSignal::Fact {
301 key: key.into(),
302 value,
303 })
304 .await
305 {
306 tracing::error!(error = ?e, "Failed to enqueue a fact");
307 }
308 }
309
310 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
311 pub async fn record(
312 &self,
313 event: impl Into<String> + std::fmt::Debug,
314 properties: Option<Map>,
315 ) {
316 if let Err(e) = self
317 .outgoing
318 .send(RawSignal::Event {
319 event_name: event.into(),
320 properties,
321 })
322 .instrument(tracing::trace_span!("recording the event"))
323 .await
324 {
325 tracing::error!(error = ?e, "Failed to enqueue an event message");
326 }
327 }
328
329 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
330 pub async fn identify(&self, new: DistinctId) {
331 self.identify_with_properties(new, IdentifyProperties::default())
332 .await;
333 }
334
335 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
336 pub async fn identify_with_properties(&self, new: DistinctId, properties: IdentifyProperties) {
337 if let Err(e) = self
338 .outgoing
339 .send(RawSignal::Identify(new, properties))
340 .instrument(tracing::trace_span!("sending the Identify message"))
341 .await
342 {
343 tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
344 }
345
346 self.trigger_configuration_refresh()
347 .instrument(tracing::trace_span!("triggering a configuration refresh"))
348 .await;
349 }
350
351 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
352 pub async fn set_person_properties(&self, properties: IdentifyProperties) {
353 if let Err(e) = self
354 .outgoing
355 .send(RawSignal::SetPersonProperties(properties))
356 .instrument(tracing::trace_span!(
357 "sending the SetPersonProperties message"
358 ))
359 .await
360 {
361 tracing::error!(error = ?e, "Failed to enqueue set_person_properties message");
362 }
363
364 self.trigger_configuration_refresh()
365 .instrument(tracing::trace_span!("triggering a configuration refresh"))
366 .await;
367 }
368
369 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
370 pub async fn add_group(
371 &self,
372 group_name: impl Into<String> + std::fmt::Debug,
373 group_member_id: impl Into<String> + std::fmt::Debug,
374 ) {
375 if let Err(e) = self
376 .outgoing
377 .send(RawSignal::AddGroup {
378 group_name: group_name.into(),
379 group_member_id: group_member_id.into(),
380 })
381 .instrument(tracing::trace_span!("sending the AddGroup message"))
382 .await
383 {
384 tracing::error!(error = ?e, "Failed to enqueue AddGroup message");
385 }
386
387 self.trigger_configuration_refresh()
388 .instrument(tracing::trace_span!("triggering a configuration refresh"))
389 .await;
390 }
391
392 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
393 pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
394 if let Err(e) = self
395 .outgoing
396 .send(RawSignal::Alias(alias.into()))
397 .instrument(tracing::trace_span!("sending the Alias message"))
398 .await
399 {
400 tracing::error!(error = ?e, "Failed to enqueue Alias message");
401 }
402
403 self.trigger_configuration_refresh()
404 .instrument(tracing::trace_span!("triggering a configuration refresh"))
405 .await;
406 }
407
408 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
409 pub async fn reset(&self) {
410 if let Err(e) = self
411 .outgoing
412 .send(RawSignal::Reset)
413 .instrument(tracing::trace_span!("sending the Reset message"))
414 .await
415 {
416 tracing::error!(error = ?e, "Failed to enqueue reset message");
417 }
418
419 self.trigger_configuration_refresh()
420 .instrument(tracing::trace_span!("triggering a configuration refresh"))
421 .await;
422 }
423
424 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
425 async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
426 let (tx, rx) = tokio::sync::oneshot::channel();
427
428 self.outgoing
429 .send(RawSignal::GetSessionProperties { tx })
430 .instrument(tracing::trace_span!(
431 "sending the GetSessionProperties message"
432 ))
433 .await
434 .map_err(|_| FullDuplexError::SendError)?;
435
436 Ok(rx
437 .instrument(tracing::trace_span!("waiting for reply"))
438 .await?)
439 }
440
441 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
442 pub async fn flush_now(&self) {
443 if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
444 tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
445 }
446 }
447
448 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
449 pub(crate) async fn trigger_configuration_refresh(&self) {
450 if !self.auto_refresh_config {
451 tracing::trace!("Not refreshing configuration because it is paused");
452 return;
453 }
454
455 let (tx, rx) = oneshot();
456
457 let session_properties = self
458 .get_session_properties()
459 .instrument(tracing::debug_span!("request session properties"))
460 .await
461 .inspect_err(|e| {
462 tracing::debug!(%e, "Failed to get session properties");
463 })
464 .unwrap_or_default();
465
466 if let Err(e) = self
467 .to_configuration_proxy
468 .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
469 .instrument(tracing::debug_span!("request immediate check-in"))
470 .await
471 {
472 tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
473 }
474
475 let (config, feats) = match rx
476 .instrument(tracing::debug_span!("receive feature facts"))
477 .await
478 {
479 Ok((config, feats)) => (config, feats),
480 Err(e) => {
481 tracing::error!(error = ?e, "Failed to refresh the configuration");
482
483 return;
484 }
485 };
486
487 if let Err(e) = self
488 .outgoing
489 .send(RawSignal::UpdateFeatureConfiguration(config, feats))
490 .instrument(tracing::debug_span!("forward feature facts"))
491 .await
492 {
493 tracing::error!(%e, "Failed to forward updated feature facts");
494 }
495 }
496}
497
498#[derive(thiserror::Error, Debug)]
499pub enum FullDuplexError {
500 #[error("Failed to request session properties")]
501 SendError,
502
503 #[error("Error waiting for a reply: {0}")]
504 Recv(#[from] tokio::sync::oneshot::error::RecvError),
505}