Skip to main content

feature_probe_event/
recorder.rs

1use crate::event::{Access, CountValue, Event, PackedData, ToggleCounter, Variation};
2use headers::HeaderValue;
3use parking_lot::{Mutex, RwLock};
4#[cfg(feature = "use_tokio")]
5use reqwest::{header::AUTHORIZATION, Client, Method};
6use std::collections::{HashMap, VecDeque};
7use std::{sync::Arc, time::Duration};
8use tracing::error;
9use url::Url;
10
11#[derive(Debug, Clone)]
12pub struct EventRecorder {
13    inner: Arc<Inner>,
14}
15
16impl EventRecorder {
17    pub fn new(
18        events_url: Url,
19        auth: HeaderValue,
20        user_agent: String,
21        flush_interval: Duration,
22        capacity: usize,
23        should_stop: Arc<RwLock<bool>>,
24    ) -> Self {
25        let slf = Self {
26            inner: Arc::new(Inner {
27                auth,
28                user_agent,
29                events_url,
30                flush_interval,
31                capacity,
32                incoming_events: Default::default(),
33                packed_data: Default::default(),
34                should_stop,
35            }),
36        };
37
38        slf.start();
39        slf
40    }
41
42    pub fn flush(&self) {
43        #[cfg(feature = "use_std")]
44        self.inner.do_flush();
45        #[cfg(fature = "use_tokio")]
46        {
47            let (tx, rx) = std::sync::mpsc::sync_channel(1);
48            tokio::spawn(async move {
49                let client = reqwest::Client::new();
50                self.inner.do_async_flush(&client).await;
51                let _ = tx.send(());
52            });
53            let _ = rx.recv();
54        }
55    }
56
57    fn start(&self) {
58        #[cfg(feature = "use_tokio")]
59        self.tokio_start();
60
61        #[cfg(feature = "use_std")]
62        self.std_start();
63    }
64
65    #[cfg(feature = "use_tokio")]
66    fn tokio_start(&self) {
67        let inner = self.inner.clone();
68        let client = reqwest::Client::new();
69        // TODO: gracefull shutdown
70        tokio::spawn(async move {
71            let mut interval = tokio::time::interval(inner.flush_interval);
72            loop {
73                inner.do_async_flush(&client).await;
74                interval.tick().await;
75                if *inner.should_stop.read() {
76                    break;
77                }
78            }
79        });
80    }
81
82    #[cfg(feature = "use_std")]
83    fn std_start(&self) {
84        let inner = self.inner.clone();
85        std::thread::spawn(move || loop {
86            inner.do_flush();
87            std::thread::sleep(inner.flush_interval);
88            if *inner.should_stop.read() {
89                break;
90            }
91        });
92    }
93
94    // TODO: performance
95    pub fn record_event(&self, event: Event) {
96        let mut guard = self.inner.incoming_events.lock();
97        let mut events = guard.take();
98
99        match events {
100            None => events = Some(vec![event]),
101            Some(ref mut v) => v.push(event),
102        };
103        *guard = events;
104    }
105}
106
107#[derive(Debug)]
108struct Inner {
109    pub auth: HeaderValue,
110    pub user_agent: String,
111    pub events_url: Url,
112    pub flush_interval: Duration,
113    pub capacity: usize,
114    pub incoming_events: Mutex<Option<Vec<Event>>>,
115    pub packed_data: Mutex<Option<VecDeque<PackedData>>>,
116    pub should_stop: Arc<RwLock<bool>>,
117}
118
119impl Inner {
120    #[cfg(feature = "use_tokio")]
121    async fn do_async_flush(&self, client: &Client) {
122        use reqwest::header::USER_AGENT;
123        use tracing::debug;
124
125        let events = match self.take_events() {
126            Some(v) if !v.is_empty() => v,
127            _ => return,
128        };
129
130        let packed_data = self.build_packed_data(events);
131        let request = client
132            .request(Method::POST, self.events_url.clone())
133            .header(AUTHORIZATION, &self.auth)
134            .header(USER_AGENT, &self.user_agent)
135            .timeout(self.flush_interval)
136            .json(&packed_data);
137
138        //TODO: report failure
139        debug!("flush req: {:?}", request);
140        match request.send().await {
141            Err(e) => {
142                error!("event post error: {}", e);
143                self.set_packed_data(packed_data); // put back
144            }
145            Ok(r) => debug!("flush resp: {:?}", r),
146        }
147    }
148
149    #[cfg(feature = "use_std")]
150    fn do_flush(&self) {
151        let events = match self.take_events() {
152            Some(v) if !v.is_empty() => v,
153            _ => return,
154        };
155
156        let packed_data = self.build_packed_data(events);
157        let body = match serde_json::to_string(&packed_data) {
158            Err(e) => {
159                error!("{:?}", e);
160                return;
161            }
162            Ok(s) => s,
163        };
164
165        //TODO: report failure
166        if let Err(e) = ureq::post(self.events_url.as_str())
167            .set(
168                "authorization",
169                self.auth.to_str().expect("already valid header value"),
170            )
171            .set("user-agent", &self.user_agent)
172            .timeout(self.flush_interval)
173            .set("Content-Type", "application/json")
174            .send_string(&body)
175        {
176            error!("event post error: {}", e);
177            self.set_packed_data(packed_data); // put back
178        }
179    }
180
181    fn take_events(&self) -> Option<Vec<Event>> {
182        let mut guard = self.incoming_events.lock();
183        guard.take()
184    }
185
186    fn take_packed_data(&self) -> Option<VecDeque<PackedData>> {
187        let mut guard = self.packed_data.lock();
188        guard.take()
189    }
190
191    fn set_packed_data(&self, packed_data: Option<VecDeque<PackedData>>) {
192        let mut guard = self.packed_data.lock();
193        *guard = packed_data
194    }
195
196    fn build_events(&self, events: &Vec<Event>) -> Vec<Event> {
197        let mut res: Vec<Event> = Vec::new();
198        for e in events {
199            match e {
200                Event::AccessEvent(access_event) => {
201                    if access_event.track_access_events {
202                        res.push(Event::AccessEvent(access_event.clone()));
203                    }
204                }
205                _ => res.push(e.clone()),
206            }
207        }
208        res
209    }
210
211    fn build_access(&self, events: &Vec<Event>) -> Access {
212        let mut start_time = u128::MAX;
213        let mut end_time = 0;
214        let mut counters: HashMap<Variation, CountValue> = HashMap::new();
215
216        for e in events {
217            if let Event::AccessEvent(access_event) = e {
218                if access_event.time < start_time {
219                    start_time = access_event.time;
220                }
221                if access_event.time > end_time {
222                    end_time = access_event.time
223                }
224                let variation = Variation {
225                    key: access_event.key.clone(),
226                    version: access_event.version,
227                    index: access_event.variation_index,
228                };
229
230                let count_value = counters.entry(variation).or_insert(CountValue {
231                    count: 0,
232                    value: access_event.value.clone(),
233                });
234                count_value.count += 1;
235            }
236        }
237
238        let mut access = Access {
239            start_time,
240            end_time,
241            counters: Default::default(),
242        };
243
244        for (k, v) in counters {
245            let counter = ToggleCounter {
246                index: k.index,
247                version: k.version,
248                value: v.value,
249                count: v.count,
250            };
251            let vec = access.counters.entry(k.key).or_insert(Vec::new());
252            vec.push(counter);
253        }
254
255        access
256    }
257
258    fn build_packed_data(&self, events: Vec<Event>) -> Option<VecDeque<PackedData>> {
259        let access = self.build_access(&events);
260        let events = self.build_events(&events);
261        let packed_data = PackedData { events, access };
262        let mut packed_data_vec = self.take_packed_data();
263        match packed_data_vec {
264            None => {
265                packed_data_vec = {
266                    let mut vecdeque = VecDeque::new();
267                    vecdeque.push_back(packed_data);
268                    Some(vecdeque)
269                }
270            }
271            Some(ref mut v) => {
272                if v.len() > self.capacity {
273                    let _ = v.pop_front();
274                }
275                v.push_back(packed_data)
276            }
277        };
278        packed_data_vec
279    }
280}
281
282pub fn unix_timestamp() -> u128 {
283    std::time::SystemTime::now()
284        .duration_since(std::time::UNIX_EPOCH)
285        .expect("Time went backwards!")
286        .as_millis()
287}