1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::Map;
6use crate::checkin::Feature;
7use crate::collator::FeatureFacts;
8use crate::configuration_proxy::ConfigurationProxySignal;
9use crate::identity::DistinctId;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13 Fact {
14 key: String,
15 value: serde_json::Value,
16 },
17 UpdateFeatureFacts(FeatureFacts),
18 Event {
19 event_name: String,
20 properties: Option<Map>,
21 },
22 GetSessionProperties {
23 tx: tokio::sync::oneshot::Sender<Map>,
24 },
25 FlushNow,
26 Identify(DistinctId),
27 AddGroup {
28 group_name: String,
29 group_member_id: String,
30 },
31 Alias(String),
32 Reset,
33}
34
35#[derive(Clone)]
36pub struct Recorder {
37 outgoing: Sender<RawSignal>,
38 to_configuration_proxy: Sender<ConfigurationProxySignal>,
39}
40
41impl std::fmt::Debug for Recorder {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("Recorder").finish()
44 }
45}
46
47impl Recorder {
48 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
49 pub(crate) fn new(
50 snapshotter_tx: Sender<RawSignal>,
51 to_configuration_proxy: Sender<ConfigurationProxySignal>,
52 ) -> Self {
53 Self {
54 outgoing: snapshotter_tx,
55 to_configuration_proxy,
56 }
57 }
58
59 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
60 pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
61 &self,
62 key: impl Into<String> + std::fmt::Debug,
63 ) -> Option<T> {
64 serde_json::from_value(self.get_feature::<T>(key).await?.variant)
65 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
66 .ok()
67 }
68
69 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
70 pub async fn get_feature_ptr_variant<
71 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
72 >(
73 &self,
74 key: impl Into<String> + std::fmt::Debug,
75 ) -> Option<T> {
76 serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
77 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
78 .ok()
79 }
80
81 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
82 pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
83 &self,
84 key: impl Into<String> + std::fmt::Debug,
85 ) -> Option<T> {
86 self.get_feature::<T>(key).await?.payload
87 }
88
89 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
90 pub async fn get_feature_ptr_payload<
91 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
92 >(
93 &self,
94 key: impl Into<String> + std::fmt::Debug,
95 ) -> Option<T> {
96 self.get_feature_ptr::<T>(key).await?.payload
97 }
98
99 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
100 pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
101 &self,
102 key: impl Into<String> + std::fmt::Debug,
103 ) -> Option<Feature<T>> {
104 let ptr = self.get_feature_payload::<String>(key).await?;
105 self.get_feature::<T>(ptr).await
106 }
107
108 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
109 pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
110 &self,
111 key: impl Into<String> + std::fmt::Debug,
112 ) -> Option<Feature<T>> {
113 let key: String = key.into();
114 let (tx, rx) = oneshot();
115
116 self.to_configuration_proxy
117 .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
118 .instrument(tracing::trace_span!(
119 "requesting feature from the configuration proxy"
120 ))
121 .await
122 .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
123 .ok()?;
124
125 let feature = rx
126 .instrument(tracing::trace_span!("waiting for the feature"))
127 .await
128 .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
129 .ok()
130 .flatten()?;
131
132 self.record(
133 "$feature_flag_called",
134 Some(Map::from_iter([
135 ("$feature_flag".into(), key.into()),
136 ("$feature_flag_response".into(), feature.variant.clone()),
137 ])),
138 )
139 .await;
140
141 let variant = feature.variant.clone();
142 let payload = if let Some(ref p) = feature.payload {
143 let ret = serde_json::from_value(p.clone()).ok()?;
144 Some(ret)
145 } else {
146 None
147 };
148
149 Some(Feature { variant, payload })
150 }
151
152 pub async fn subscribe_to_feature_changes(
153 &self,
154 ) -> Option<tokio::sync::broadcast::Receiver<()>> {
155 let (tx, rx) = oneshot();
156
157 self.to_configuration_proxy
158 .send(ConfigurationProxySignal::Subscribe(tx))
159 .instrument(tracing::debug_span!("subscribe to feature changes"))
160 .await
161 .inspect_err(|e| {
162 tracing::error!(error = ?e, "Failed to request subscription to feature changes");
163 })
164 .ok()?;
165
166 rx.await
167 .inspect_err(|e| {
168 tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
169 })
170 .ok()
171 }
172
173 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
174 pub async fn set_fact(
175 &self,
176 key: impl Into<String> + std::fmt::Debug,
177 value: serde_json::Value,
178 ) {
179 if let Err(e) = self
180 .outgoing
181 .send(RawSignal::Fact {
182 key: key.into(),
183 value,
184 })
185 .await
186 {
187 tracing::error!(error = ?e, "Failed to enqueue a fact");
188 }
189 }
190
191 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
192 pub async fn record(
193 &self,
194 event: impl Into<String> + std::fmt::Debug,
195 properties: Option<Map>,
196 ) {
197 if let Err(e) = self
198 .outgoing
199 .send(RawSignal::Event {
200 event_name: event.into(),
201 properties,
202 })
203 .instrument(tracing::trace_span!("recording the event"))
204 .await
205 {
206 tracing::error!(error = ?e, "Failed to enqueue an event message");
207 }
208 }
209
210 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
211 pub async fn identify(&self, new: DistinctId) {
212 if let Err(e) = self
213 .outgoing
214 .send(RawSignal::Identify(new))
215 .instrument(tracing::trace_span!("sending the Identify message"))
216 .await
217 {
218 tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
219 }
220
221 self.trigger_configuration_refresh()
222 .instrument(tracing::trace_span!("triggering a configuration refresh"))
223 .await;
224 }
225
226 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
227 pub async fn add_group(
228 &self,
229 group_name: impl Into<String> + std::fmt::Debug,
230 group_member_id: impl Into<String> + std::fmt::Debug,
231 ) {
232 if let Err(e) = self
233 .outgoing
234 .send(RawSignal::AddGroup {
235 group_name: group_name.into(),
236 group_member_id: group_member_id.into(),
237 })
238 .instrument(tracing::trace_span!("sending the AddGroup message"))
239 .await
240 {
241 tracing::error!(error = ?e, "Failed to enqueue AddGroup message");
242 }
243
244 self.trigger_configuration_refresh()
245 .instrument(tracing::trace_span!("triggering a configuration refresh"))
246 .await;
247 }
248
249 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
250 pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
251 if let Err(e) = self
252 .outgoing
253 .send(RawSignal::Alias(alias.into()))
254 .instrument(tracing::trace_span!("sending the Alias message"))
255 .await
256 {
257 tracing::error!(error = ?e, "Failed to enqueue Alias message");
258 }
259
260 self.trigger_configuration_refresh()
261 .instrument(tracing::trace_span!("triggering a configuration refresh"))
262 .await;
263 }
264
265 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
266 pub async fn reset(&self) {
267 if let Err(e) = self
268 .outgoing
269 .send(RawSignal::Reset)
270 .instrument(tracing::trace_span!("sending the Reset message"))
271 .await
272 {
273 tracing::error!(error = ?e, "Failed to enqueue reset message");
274 }
275
276 self.trigger_configuration_refresh()
277 .instrument(tracing::trace_span!("triggering a configuration refresh"))
278 .await;
279 }
280
281 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
282 async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
283 let (tx, rx) = tokio::sync::oneshot::channel();
284
285 self.outgoing
286 .send(RawSignal::GetSessionProperties { tx })
287 .instrument(tracing::trace_span!(
288 "sending the GetSessionProperties message"
289 ))
290 .await
291 .map_err(|_| FullDuplexError::SendError)?;
292
293 Ok(rx
294 .instrument(tracing::trace_span!("waiting for reply"))
295 .await?)
296 }
297
298 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
299 pub async fn flush_now(&self) {
300 if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
301 tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
302 }
303 }
304
305 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
306 pub(crate) async fn trigger_configuration_refresh(&self) {
307 let (tx, rx) = oneshot();
308
309 let session_properties = self
310 .get_session_properties()
311 .instrument(tracing::debug_span!("request session properties"))
312 .await
313 .inspect_err(|e| {
314 tracing::debug!(%e, "Failed to get session properties");
315 })
316 .unwrap_or_default();
317
318 if let Err(e) = self
319 .to_configuration_proxy
320 .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
321 .instrument(tracing::debug_span!("request immediate check-in"))
322 .await
323 {
324 tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
325 }
326
327 let feats = match rx
328 .instrument(tracing::debug_span!("receive feature facts"))
329 .await
330 {
331 Ok(feats) => feats,
332 Err(e) => {
333 tracing::error!(error = ?e, "Failed to refresh the configuration");
334
335 return;
336 }
337 };
338
339 if let Err(e) = self
340 .outgoing
341 .send(RawSignal::UpdateFeatureFacts(feats))
342 .instrument(tracing::debug_span!("forward feature facts"))
343 .await
344 {
345 tracing::error!(%e, "Failed to forward updated feature facts");
346 }
347 }
348}
349
350#[derive(thiserror::Error, Debug)]
351pub enum FullDuplexError {
352 #[error("Failed to request session properties")]
353 SendError,
354
355 #[error("Error waiting for a reply: {0}")]
356 Recv(#[from] tokio::sync::oneshot::error::RecvError),
357}