1use tokio::sync::mpsc::Sender;
2use tokio::sync::oneshot::channel as oneshot;
3use tracing::Instrument;
4
5use crate::Map;
6use crate::checkin::Feature;
7use crate::collator::FeatureFacts;
8use crate::configuration_proxy::ConfigurationProxySignal;
9use crate::identity::DistinctId;
10
11#[derive(Debug)]
12pub(crate) enum RawSignal {
13 Fact {
14 key: String,
15 value: serde_json::Value,
16 },
17 UpdateFeatureFacts(FeatureFacts),
18 Event {
19 event_name: String,
20 properties: Option<Map>,
21 },
22 GetSessionProperties {
23 tx: tokio::sync::oneshot::Sender<Map>,
24 },
25 FlushNow,
26 Identify(DistinctId),
27 Alias(String),
28 Reset,
29}
30
31#[derive(Clone)]
32pub struct Recorder {
33 outgoing: Sender<RawSignal>,
34 to_configuration_proxy: Sender<ConfigurationProxySignal>,
35}
36
37impl std::fmt::Debug for Recorder {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("Recorder").finish()
40 }
41}
42
43impl Recorder {
44 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all))]
45 pub(crate) fn new(
46 snapshotter_tx: Sender<RawSignal>,
47 to_configuration_proxy: Sender<ConfigurationProxySignal>,
48 ) -> Self {
49 Self {
50 outgoing: snapshotter_tx,
51 to_configuration_proxy,
52 }
53 }
54
55 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
56 pub async fn get_feature_variant<T: serde::de::DeserializeOwned + std::fmt::Debug + Send>(
57 &self,
58 key: impl Into<String> + std::fmt::Debug,
59 ) -> Option<T> {
60 serde_json::from_value(self.get_feature::<T>(key).await?.variant)
61 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
62 .ok()
63 }
64
65 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
66 pub async fn get_feature_ptr_variant<
67 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
68 >(
69 &self,
70 key: impl Into<String> + std::fmt::Debug,
71 ) -> Option<T> {
72 serde_json::from_value(self.get_feature_ptr::<T>(key).await?.variant)
73 .inspect_err(|e| tracing::debug!(%e, "Deserializing feature variant failed"))
74 .ok()
75 }
76
77 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
78 pub async fn get_feature_payload<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
79 &self,
80 key: impl Into<String> + std::fmt::Debug,
81 ) -> Option<T> {
82 self.get_feature::<T>(key).await?.payload
83 }
84
85 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
86 pub async fn get_feature_ptr_payload<
87 T: serde::de::DeserializeOwned + Send + std::fmt::Debug,
88 >(
89 &self,
90 key: impl Into<String> + std::fmt::Debug,
91 ) -> Option<T> {
92 self.get_feature_ptr::<T>(key).await?.payload
93 }
94
95 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
96 pub async fn get_feature_ptr<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
97 &self,
98 key: impl Into<String> + std::fmt::Debug,
99 ) -> Option<Feature<T>> {
100 let ptr = self.get_feature_payload::<String>(key).await?;
101 self.get_feature::<T>(ptr).await
102 }
103
104 #[tracing::instrument(skip(self), ret(level = tracing::Level::TRACE))]
105 pub async fn get_feature<T: serde::de::DeserializeOwned + Send + std::fmt::Debug>(
106 &self,
107 key: impl Into<String> + std::fmt::Debug,
108 ) -> Option<Feature<T>> {
109 let key: String = key.into();
110 let (tx, rx) = oneshot();
111
112 self.to_configuration_proxy
113 .send(ConfigurationProxySignal::GetFeature(key.clone(), tx))
114 .instrument(tracing::trace_span!(
115 "requesting feature from the configuration proxy"
116 ))
117 .await
118 .inspect_err(|e| tracing::trace!(%e, "Error sending the feature flag request"))
119 .ok()?;
120
121 let feature = rx
122 .instrument(tracing::trace_span!("waiting for the feature"))
123 .await
124 .inspect_err(|e| tracing::trace!(%e, "Error requesting the feature flag"))
125 .ok()
126 .flatten()?;
127
128 self.record(
129 "$feature_flag_called",
130 Some(Map::from_iter([
131 ("$feature_flag".into(), key.into()),
132 ("$feature_flag_response".into(), feature.variant.clone()),
133 ])),
134 )
135 .await;
136
137 let variant = feature.variant.clone();
138 let payload = if let Some(ref p) = feature.payload {
139 let ret = serde_json::from_value(p.clone()).ok()?;
140 Some(ret)
141 } else {
142 None
143 };
144
145 Some(Feature { variant, payload })
146 }
147
148 pub async fn subscribe_to_feature_changes(
149 &self,
150 ) -> Option<tokio::sync::broadcast::Receiver<()>> {
151 let (tx, rx) = oneshot();
152
153 self.to_configuration_proxy
154 .send(ConfigurationProxySignal::Subscribe(tx))
155 .instrument(tracing::debug_span!("subscribe to feature changes"))
156 .await
157 .inspect_err(|e| {
158 tracing::error!(error = ?e, "Failed to request subscription to feature changes");
159 })
160 .ok()?;
161
162 rx.await
163 .inspect_err(|e| {
164 tracing::error!(error = ?e, "No response when waiting a feature change subscriber");
165 })
166 .ok()
167 }
168
169 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
170 pub async fn set_fact(
171 &self,
172 key: impl Into<String> + std::fmt::Debug,
173 value: serde_json::Value,
174 ) {
175 if let Err(e) = self
176 .outgoing
177 .send(RawSignal::Fact {
178 key: key.into(),
179 value,
180 })
181 .await
182 {
183 tracing::error!(error = ?e, "Failed to enqueue a fact");
184 }
185 }
186
187 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
188 pub async fn record(
189 &self,
190 event: impl Into<String> + std::fmt::Debug,
191 properties: Option<Map>,
192 ) {
193 if let Err(e) = self
194 .outgoing
195 .send(RawSignal::Event {
196 event_name: event.into(),
197 properties,
198 })
199 .instrument(tracing::trace_span!("recording the event"))
200 .await
201 {
202 tracing::error!(error = ?e, "Failed to enqueue an event message");
203 }
204 }
205
206 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
207 pub async fn identify(&self, new: DistinctId) {
208 if let Err(e) = self
209 .outgoing
210 .send(RawSignal::Identify(new))
211 .instrument(tracing::trace_span!("sending the Identify message"))
212 .await
213 {
214 tracing::error!(error = ?e, "Failed to enqueue swap_identity message");
215 }
216
217 self.trigger_configuration_refresh()
218 .instrument(tracing::trace_span!("triggering a configuration refresh"))
219 .await;
220 }
221
222 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
223 pub async fn alias(&self, alias: impl Into<String> + std::fmt::Debug) {
224 if let Err(e) = self
225 .outgoing
226 .send(RawSignal::Alias(alias.into()))
227 .instrument(tracing::trace_span!("sending the Alias message"))
228 .await
229 {
230 tracing::error!(error = ?e, "Failed to enqueue Alias message");
231 }
232
233 self.trigger_configuration_refresh()
234 .instrument(tracing::trace_span!("triggering a configuration refresh"))
235 .await;
236 }
237
238 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
239 pub async fn reset(&self) {
240 if let Err(e) = self
241 .outgoing
242 .send(RawSignal::Reset)
243 .instrument(tracing::trace_span!("sending the Reset message"))
244 .await
245 {
246 tracing::error!(error = ?e, "Failed to enqueue reset message");
247 }
248
249 self.trigger_configuration_refresh()
250 .instrument(tracing::trace_span!("triggering a configuration refresh"))
251 .await;
252 }
253
254 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self), ret(level = tracing::Level::TRACE)))]
255 async fn get_session_properties(&self) -> Result<Map, FullDuplexError> {
256 let (tx, rx) = tokio::sync::oneshot::channel();
257
258 self.outgoing
259 .send(RawSignal::GetSessionProperties { tx })
260 .instrument(tracing::trace_span!(
261 "sending the GetSessionProperties message"
262 ))
263 .await
264 .map_err(|_| FullDuplexError::SendError)?;
265
266 Ok(rx
267 .instrument(tracing::trace_span!("waiting for reply"))
268 .await?)
269 }
270
271 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
272 pub async fn flush_now(&self) {
273 if let Err(e) = self.outgoing.send(RawSignal::FlushNow).await {
274 tracing::error!(error = ?e, "Failed to enqueue a FlushNow message");
275 }
276 }
277
278 #[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip(self)))]
279 pub(crate) async fn trigger_configuration_refresh(&self) {
280 let (tx, rx) = oneshot();
281
282 let session_properties = self
283 .get_session_properties()
284 .instrument(tracing::debug_span!("request session properties"))
285 .await
286 .inspect_err(|e| {
287 tracing::debug!(%e, "Failed to get session properties");
288 })
289 .unwrap_or_default();
290
291 if let Err(e) = self
292 .to_configuration_proxy
293 .send(ConfigurationProxySignal::CheckInNow(session_properties, tx))
294 .instrument(tracing::debug_span!("request immediate check-in"))
295 .await
296 {
297 tracing::error!(error = ?e, "Failed to enqueue CheckInNow message");
298 }
299
300 let feats = match rx
301 .instrument(tracing::debug_span!("receive feature facts"))
302 .await
303 {
304 Ok(feats) => feats,
305 Err(e) => {
306 tracing::error!(error = ?e, "Failed to refresh the configuration");
307
308 return;
309 }
310 };
311
312 if let Err(e) = self
313 .outgoing
314 .send(RawSignal::UpdateFeatureFacts(feats))
315 .instrument(tracing::debug_span!("forward feature facts"))
316 .await
317 {
318 tracing::error!(%e, "Failed to forward updated feature facts");
319 }
320 }
321}
322
323#[derive(thiserror::Error, Debug)]
324pub enum FullDuplexError {
325 #[error("Failed to request session properties")]
326 SendError,
327
328 #[error("Error waiting for a reply: {0}")]
329 Recv(#[from] tokio::sync::oneshot::error::RecvError),
330}