feature_probe_event/
recorder.rs1use crate::event::{Access, CountValue, Event, PackedData, ToggleCounter, Variation};
2use headers::HeaderValue;
3use parking_lot::{Mutex, RwLock};
4#[cfg(feature = "use_tokio")]
5use reqwest::{header::AUTHORIZATION, Client, Method};
6use std::collections::{HashMap, VecDeque};
7use std::{sync::Arc, time::Duration};
8use tracing::error;
9use url::Url;
10
11#[derive(Debug, Clone)]
12pub struct EventRecorder {
13 inner: Arc<Inner>,
14}
15
16impl EventRecorder {
17 pub fn new(
18 events_url: Url,
19 auth: HeaderValue,
20 user_agent: String,
21 flush_interval: Duration,
22 capacity: usize,
23 should_stop: Arc<RwLock<bool>>,
24 ) -> Self {
25 let slf = Self {
26 inner: Arc::new(Inner {
27 auth,
28 user_agent,
29 events_url,
30 flush_interval,
31 capacity,
32 incoming_events: Default::default(),
33 packed_data: Default::default(),
34 should_stop,
35 }),
36 };
37
38 slf.start();
39 slf
40 }
41
42 pub fn flush(&self) {
43 #[cfg(feature = "use_std")]
44 self.inner.do_flush();
45 #[cfg(fature = "use_tokio")]
46 {
47 let (tx, rx) = std::sync::mpsc::sync_channel(1);
48 tokio::spawn(async move {
49 let client = reqwest::Client::new();
50 self.inner.do_async_flush(&client).await;
51 let _ = tx.send(());
52 });
53 let _ = rx.recv();
54 }
55 }
56
57 fn start(&self) {
58 #[cfg(feature = "use_tokio")]
59 self.tokio_start();
60
61 #[cfg(feature = "use_std")]
62 self.std_start();
63 }
64
65 #[cfg(feature = "use_tokio")]
66 fn tokio_start(&self) {
67 let inner = self.inner.clone();
68 let client = reqwest::Client::new();
69 tokio::spawn(async move {
71 let mut interval = tokio::time::interval(inner.flush_interval);
72 loop {
73 inner.do_async_flush(&client).await;
74 interval.tick().await;
75 if *inner.should_stop.read() {
76 break;
77 }
78 }
79 });
80 }
81
82 #[cfg(feature = "use_std")]
83 fn std_start(&self) {
84 let inner = self.inner.clone();
85 std::thread::spawn(move || loop {
86 inner.do_flush();
87 std::thread::sleep(inner.flush_interval);
88 if *inner.should_stop.read() {
89 break;
90 }
91 });
92 }
93
94 pub fn record_event(&self, event: Event) {
96 let mut guard = self.inner.incoming_events.lock();
97 let mut events = guard.take();
98
99 match events {
100 None => events = Some(vec![event]),
101 Some(ref mut v) => v.push(event),
102 };
103 *guard = events;
104 }
105}
106
107#[derive(Debug)]
108struct Inner {
109 pub auth: HeaderValue,
110 pub user_agent: String,
111 pub events_url: Url,
112 pub flush_interval: Duration,
113 pub capacity: usize,
114 pub incoming_events: Mutex<Option<Vec<Event>>>,
115 pub packed_data: Mutex<Option<VecDeque<PackedData>>>,
116 pub should_stop: Arc<RwLock<bool>>,
117}
118
119impl Inner {
120 #[cfg(feature = "use_tokio")]
121 async fn do_async_flush(&self, client: &Client) {
122 use reqwest::header::USER_AGENT;
123 use tracing::debug;
124
125 let events = match self.take_events() {
126 Some(v) if !v.is_empty() => v,
127 _ => return,
128 };
129
130 let packed_data = self.build_packed_data(events);
131 let request = client
132 .request(Method::POST, self.events_url.clone())
133 .header(AUTHORIZATION, &self.auth)
134 .header(USER_AGENT, &self.user_agent)
135 .timeout(self.flush_interval)
136 .json(&packed_data);
137
138 debug!("flush req: {:?}", request);
140 match request.send().await {
141 Err(e) => {
142 error!("event post error: {}", e);
143 self.set_packed_data(packed_data); }
145 Ok(r) => debug!("flush resp: {:?}", r),
146 }
147 }
148
149 #[cfg(feature = "use_std")]
150 fn do_flush(&self) {
151 let events = match self.take_events() {
152 Some(v) if !v.is_empty() => v,
153 _ => return,
154 };
155
156 let packed_data = self.build_packed_data(events);
157 let body = match serde_json::to_string(&packed_data) {
158 Err(e) => {
159 error!("{:?}", e);
160 return;
161 }
162 Ok(s) => s,
163 };
164
165 if let Err(e) = ureq::post(self.events_url.as_str())
167 .set(
168 "authorization",
169 self.auth.to_str().expect("already valid header value"),
170 )
171 .set("user-agent", &self.user_agent)
172 .timeout(self.flush_interval)
173 .set("Content-Type", "application/json")
174 .send_string(&body)
175 {
176 error!("event post error: {}", e);
177 self.set_packed_data(packed_data); }
179 }
180
181 fn take_events(&self) -> Option<Vec<Event>> {
182 let mut guard = self.incoming_events.lock();
183 guard.take()
184 }
185
186 fn take_packed_data(&self) -> Option<VecDeque<PackedData>> {
187 let mut guard = self.packed_data.lock();
188 guard.take()
189 }
190
191 fn set_packed_data(&self, packed_data: Option<VecDeque<PackedData>>) {
192 let mut guard = self.packed_data.lock();
193 *guard = packed_data
194 }
195
196 fn build_events(&self, events: &Vec<Event>) -> Vec<Event> {
197 let mut res: Vec<Event> = Vec::new();
198 for e in events {
199 match e {
200 Event::AccessEvent(access_event) => {
201 if access_event.track_access_events {
202 res.push(Event::AccessEvent(access_event.clone()));
203 }
204 }
205 _ => res.push(e.clone()),
206 }
207 }
208 res
209 }
210
211 fn build_access(&self, events: &Vec<Event>) -> Access {
212 let mut start_time = u128::MAX;
213 let mut end_time = 0;
214 let mut counters: HashMap<Variation, CountValue> = HashMap::new();
215
216 for e in events {
217 if let Event::AccessEvent(access_event) = e {
218 if access_event.time < start_time {
219 start_time = access_event.time;
220 }
221 if access_event.time > end_time {
222 end_time = access_event.time
223 }
224 let variation = Variation {
225 key: access_event.key.clone(),
226 version: access_event.version,
227 index: access_event.variation_index,
228 };
229
230 let count_value = counters.entry(variation).or_insert(CountValue {
231 count: 0,
232 value: access_event.value.clone(),
233 });
234 count_value.count += 1;
235 }
236 }
237
238 let mut access = Access {
239 start_time,
240 end_time,
241 counters: Default::default(),
242 };
243
244 for (k, v) in counters {
245 let counter = ToggleCounter {
246 index: k.index,
247 version: k.version,
248 value: v.value,
249 count: v.count,
250 };
251 let vec = access.counters.entry(k.key).or_insert(Vec::new());
252 vec.push(counter);
253 }
254
255 access
256 }
257
258 fn build_packed_data(&self, events: Vec<Event>) -> Option<VecDeque<PackedData>> {
259 let access = self.build_access(&events);
260 let events = self.build_events(&events);
261 let packed_data = PackedData { events, access };
262 let mut packed_data_vec = self.take_packed_data();
263 match packed_data_vec {
264 None => {
265 packed_data_vec = {
266 let mut vecdeque = VecDeque::new();
267 vecdeque.push_back(packed_data);
268 Some(vecdeque)
269 }
270 }
271 Some(ref mut v) => {
272 if v.len() > self.capacity {
273 let _ = v.pop_front();
274 }
275 v.push_back(packed_data)
276 }
277 };
278 packed_data_vec
279 }
280}
281
282pub fn unix_timestamp() -> u128 {
283 std::time::SystemTime::now()
284 .duration_since(std::time::UNIX_EPOCH)
285 .expect("Time went backwards!")
286 .as_millis()
287}