1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::checkin::Feature;
6use crate::collator::FeatureFacts;
7use crate::configuration_proxy::ConfigurationProxySignal;
8use crate::identity::DistinctId;
9use crate::Map;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13 Fact {
14 key: String,
15 value: serde_json::Value,
16 },
17 UpdateFeatureFacts(FeatureFacts),
18 Event {
19 event_name: String,
20 properties: Option<Map>,
21 },
22 GetSessionProperties {
23 tx: tokio::sync::oneshot::Sender<Map>,
24 },
25 FlushNow,
26 Identify(DistinctId),
27 Alias(String),
28 Reset,
29}
30
31#[derive(Clone)]
32pub struct Recorder {
33 outgoing: Sender<RawSignal>,
34 to_configuration_proxy: Sender<ConfigurationProxySignal>,
35}
36
37impl std::fmt::Debug for Recorder {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("Recorder").finish()
40 }
41}
42
43impl Recorder {
44 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
45 pub(crate) fn new(
46 snapshotter_tx: Sender<RawSignal>,
47 to_configuration_proxy: Sender<ConfigurationProxySignal>,
48 ) -> Self {
49 Self {
50 outgoing: snapshotter_tx,
51 to_configuration_proxy,
52 }
53 }
54
55 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
56 pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
57 &self,
58 key: impl Into<String> + std::fmt::Debug,
59 ) -> Option<T> {
60 serde_json::from_value(self.get_feature::<T>(key).await?.variant)
61 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
62 .ok()
63 }
64
65 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
66 pub async fn get_feature_ptr_variant<
67 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
68 >(
69 &self,
70 key: impl Into<String> + std::fmt::Debug,
71 ) -> Option<T> {
72 serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
73 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
74 .ok()
75 }
76
77 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
78 pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
79 &self,
80 key: impl Into<String> + std::fmt::Debug,
81 ) -> Option<T> {
82 self.get_feature::<T>(key).await?.payload
83 }
84
85 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
86 pub async fn get_feature_ptr_payload<
87 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
88 >(
89 &self,
90 key: impl Into<String> + std::fmt::Debug,
91 ) -> Option<T> {
92 self.get_feature_ptr::<T>(key).await?.payload
93 }
94
95 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
96 pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
97 &self,
98 key: impl Into<String> + std::fmt::Debug,
99 ) -> Option<Feature<T>> {
100 let ptr = self.get_feature_payload::<String>(key).await?;
101 self.get_feature::<T>(ptr).await
102 }
103
104 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
105 pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
106 &self,
107 key: impl Into<String> + std::fmt::Debug,
108 ) -> Option<Feature<T>> {
109 let key: String = key.into();
110 let (tx, rx) = oneshot();
111
112 self.to_configuration_proxy
113 .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
114 .instrument(tracing::trace_span!(
115 "requesting feature from the configuration proxy"
116 ))
117 .await
118 .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
119 .ok()?;
120
121 let feature = rx
122 .instrument(tracing::trace_span!("waiting for the feature"))
123 .await
124 .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
125 .ok()
126 .flatten()?;
127
128 self.record(
129 "$feature_flag_called",
130 Some(Map::from_iter([
131 ("$feature_flag".into(), key.into()),
132 ("$feature_flag_response".into(), feature.variant.clone()),
133 ])),
134 )
135 .await;
136
137 let variant = feature.variant.clone();
138 let payload = if let Some(ref p) = feature.payload {
139 let ret = serde_json::from_value(p.clone()).ok()?;
140 Some(ret)
141 } else {
142 None
143 };
144
145 Some(Feature { variant, payload })
146 }
147
148 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
149 pub async fn add_fact(&self, key: &str, value: serde_json::Value) {
150 if let Err(e) = self
151 .outgoing
152 .send(RawSignal::Fact {
153 key: key.to_string(),
154 value,
155 })
156 .await
157 {
158 tracing::error!(error = ?e, "Failed to enqueue a fact");
159 }
160 }
161
162 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
163 pub async fn record(&self, event: &str, properties: Option<Map>) {
164 if let Err(e) = self
165 .outgoing
166 .send(RawSignal::Event {
167 event_name: event.to_string(),
168 properties,
169 })
170 .instrument(tracing::trace_span!("recording the event"))
171 .await
172 {
173 tracing::error!(error = ?e, "Failed to enqueue an event message");
174 }
175 }
176
177 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
178 pub async fn identify(&self, new: DistinctId) {
179 if let Err(e) = self
180 .outgoing
181 .send(RawSignal::Identify(new))
182 .instrument(tracing::trace_span!("sending the Identify message"))
183 .await
184 {
185 tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
186 }
187
188 self.trigger_configuration_refresh()
189 .instrument(tracing::trace_span!("triggering a configuration refresh"))
190 .await;
191 }
192
193 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
194 pub async fn alias(&self, alias: String) {
195 if let Err(e) = self
196 .outgoing
197 .send(RawSignal::Alias(alias))
198 .instrument(tracing::trace_span!("sending the Alias message"))
199 .await
200 {
201 tracing::error!(error = ?e, "Failed to enqueue Alias message");
202 }
203
204 self.trigger_configuration_refresh()
205 .instrument(tracing::trace_span!("triggering a configuration refresh"))
206 .await;
207 }
208
209 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
210 pub async fn reset(&self) {
211 if let Err(e) = self
212 .outgoing
213 .send(RawSignal::Reset)
214 .instrument(tracing::trace_span!("sending the Reset message"))
215 .await
216 {
217 tracing::error!(error = ?e, "Failed to enqueue reset message");
218 }
219
220 self.trigger_configuration_refresh()
221 .instrument(tracing::trace_span!("triggering a configuration refresh"))
222 .await;
223 }
224
225 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
226 async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
227 let (tx, rx) = tokio::sync::oneshot::channel();
228
229 self.outgoing
230 .send(RawSignal::GetSessionProperties { tx })
231 .instrument(tracing::trace_span!(
232 "sending the GetSessionProperties message"
233 ))
234 .await
235 .map_err(|_| FullDuplexError::SendError)?;
236
237 Ok(rx
238 .instrument(tracing::trace_span!("waiting for reply"))
239 .await?)
240 }
241
242 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
243 pub async fn flush_now(&self) {
244 if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
245 tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
246 }
247 }
248
249 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
250 pub(crate) async fn trigger_configuration_refresh(&self) {
251 let (tx, rx) = oneshot();
252
253 let session_properties = self
254 .get_session_properties()
255 .instrument(tracing::debug_span!("request session properties"))
256 .await
257 .inspect_err(|e| {
258 tracing::debug!(%e, "Failed to get session properties");
259 })
260 .unwrap_or_default();
261
262 if let Err(e) = self
263 .to_configuration_proxy
264 .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
265 .instrument(tracing::debug_span!("request immediate check-in"))
266 .await
267 {
268 tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
269 }
270
271 let feats = match rx
272 .instrument(tracing::debug_span!("receive feature facts"))
273 .await
274 {
275 Ok(feats) => feats,
276 Err(e) => {
277 tracing::error!(error = ?e, "Failed to refresh the configuration");
278
279 return;
280 }
281 };
282
283 if let Err(e) = self
284 .outgoing
285 .send(RawSignal::UpdateFeatureFacts(feats))
286 .instrument(tracing::debug_span!("forward feature facts"))
287 .await
288 {
289 tracing::error!(%e, "Failed to forward updated feature facts");
290 }
291 }
292}
293
294#[derive(thiserror::Error, Debug)]
295pub enum FullDuplexError {
296 #[error("Failed to request session properties")]
297 SendError,
298
299 #[error("Error waiting for a reply: {0}")]
300 Recv(#[from] tokio::sync::oneshot::error::RecvError),
301}