glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::HashMap;
7use std::fs;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufRead;
10use std::io::BufReader;
11use std::io::Write;
12use std::path::{Path, PathBuf};
13use std::sync::{Mutex, RwLock};
14
15use chrono::{DateTime, FixedOffset, Utc};
16
17use serde::{Deserialize, Serialize};
18use serde_json::{json, Value as JsonValue};
19
20use crate::common_metric_data::CommonMetricDataInternal;
21use crate::coverage::record_coverage;
22use crate::error_recording::{record_error, ErrorType};
23use crate::metrics::{DatetimeMetric, TimeUnit};
24use crate::storage::INTERNAL_STORAGE;
25use crate::util::get_iso_time_string;
26use crate::Glean;
27use crate::Result;
28use crate::{CommonMetricData, CounterMetric, Lifetime};
29
30/// Represents the recorded data for a single event.
31#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
32#[cfg_attr(test, derive(Default))]
33pub struct RecordedEvent {
34    /// The timestamp of when the event was recorded.
35    ///
36    /// This allows to order events from a single process run.
37    pub timestamp: u64,
38
39    /// The event's category.
40    ///
41    /// This is defined by users in the metrics file.
42    pub category: String,
43
44    /// The event's name.
45    ///
46    /// This is defined by users in the metrics file.
47    pub name: String,
48
49    /// A map of all extra data values.
50    ///
51    /// The set of allowed extra keys is defined by users in the metrics file.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub extra: Option<HashMap<String, String>>,
54}
55
56/// Represents the stored data for a single event.
57#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
58struct StoredEvent {
59    #[serde(flatten)]
60    event: RecordedEvent,
61
62    /// The monotonically-increasing execution counter.
63    ///
64    /// Included to allow sending of events across Glean restarts (bug 1716725).
65    /// Is i32 because it is stored in a CounterMetric.
66    #[serde(default)]
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub execution_counter: Option<i32>,
69}
70
71/// This struct handles the in-memory and on-disk storage logic for events.
72///
73/// So that the data survives shutting down of the application, events are stored
74/// in an append-only file on disk, in addition to the store in memory. Each line
75/// of this file records a single event in JSON, exactly as it will be sent in the
76/// ping. There is one file per store.
77///
78/// When restarting the application, these on-disk files are checked, and if any are
79/// found, they are loaded, and a `glean.restarted` event is added before any
80/// further events are collected. This is because the timestamps for these events
81/// may have come from a previous boot of the device, and therefore will not be
82/// compatible with any newly-collected events.
83///
84/// Normalizing all these timestamps happens on serialization for submission (see
85/// `serialize_as_json`) where the client time between restarts is calculated using
86/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
87/// the `execution_counter` stored in events on disk.
88///
89/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
90/// The `glean.restarted` event is, though.
91/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
92#[derive(Debug)]
93pub struct EventDatabase {
94    /// Path to directory of on-disk event files
95    pub path: PathBuf,
96    /// The in-memory list of events
97    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
98    /// A lock to be held when doing operations on the filesystem
99    file_lock: Mutex<()>,
100}
101
102impl EventDatabase {
103    /// Creates a new event database.
104    ///
105    /// # Arguments
106    ///
107    /// * `data_path` - The directory to store events in. A new directory
108    /// * `events` - will be created inside of this directory.
109    pub fn new(data_path: &Path) -> Result<Self> {
110        let path = data_path.join("events");
111        create_dir_all(&path)?;
112
113        Ok(Self {
114            path,
115            event_stores: RwLock::new(HashMap::new()),
116            file_lock: Mutex::new(()),
117        })
118    }
119
120    /// Initializes events storage after Glean is fully initialized and ready to send pings.
121    ///
122    /// This must be called once on application startup, e.g. from
123    /// [Glean.initialize], but after we are ready to send pings, since this
124    /// could potentially collect and send the "events" ping.
125    ///
126    /// If there are any events queued on disk, it loads them into memory so
127    /// that the memory and disk representations are in sync.
128    ///
129    /// If event records for the "events" ping are present, they are assembled into
130    /// an "events" ping which is submitted immediately with reason "startup".
131    ///
132    /// If event records for custom pings are present, we increment the custom pings'
133    /// stores' `execution_counter` and record a `glean.restarted`
134    /// event with the current client clock in its `glean.startup.date` extra.
135    ///
136    /// # Arguments
137    ///
138    /// * `glean` - The Glean instance.
139    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
140    ///   any events not belonging to pings previously registered via `register_ping_type`.
141    ///
142    /// # Returns
143    ///
144    /// Whether the "events" ping was submitted.
145    pub fn flush_pending_events_on_startup(
146        &self,
147        glean: &Glean,
148        trim_data_to_registered_pings: bool,
149    ) -> bool {
150        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
151            Ok(_) => {
152                let stores_with_events: Vec<String> = {
153                    self.event_stores
154                        .read()
155                        .unwrap()
156                        .keys()
157                        .map(|x| x.to_owned())
158                        .collect() // safe unwrap, only error case is poisoning
159                };
160                // We do not want to be holding the event stores lock when
161                // submitting a ping or recording new events.
162                let has_events_events = stores_with_events.contains(&"events".to_owned());
163                let glean_restarted_stores = if has_events_events {
164                    stores_with_events
165                        .into_iter()
166                        .filter(|store| store != "events")
167                        .collect()
168                } else {
169                    stores_with_events
170                };
171                if !glean_restarted_stores.is_empty() {
172                    for store_name in glean_restarted_stores.iter() {
173                        CounterMetric::new(CommonMetricData {
174                            name: "execution_counter".into(),
175                            category: store_name.into(),
176                            send_in_pings: vec![INTERNAL_STORAGE.into()],
177                            lifetime: Lifetime::Ping,
178                            ..Default::default()
179                        })
180                        .add_sync(glean, 1);
181                    }
182                    let glean_restarted = CommonMetricData {
183                        name: "restarted".into(),
184                        category: "glean".into(),
185                        send_in_pings: glean_restarted_stores,
186                        lifetime: Lifetime::Ping,
187                        ..Default::default()
188                    };
189                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
190                    let mut extra: HashMap<String, String> =
191                        [("glean.startup.date".into(), startup)].into();
192                    if glean.with_timestamps() {
193                        let now = Utc::now();
194                        let precise_timestamp = now.timestamp_millis() as u64;
195                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
196                    }
197                    self.record(
198                        glean,
199                        &glean_restarted.into(),
200                        crate::get_timestamp_ms(),
201                        Some(extra),
202                    );
203                }
204                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
205            }
206            Err(err) => {
207                log::warn!("Error loading events from disk: {}", err);
208                false
209            }
210        }
211    }
212
213    fn load_events_from_disk(
214        &self,
215        glean: &Glean,
216        trim_data_to_registered_pings: bool,
217    ) -> Result<()> {
218        // NOTE: The order of locks here is important.
219        // In other code parts we might acquire the `file_lock` when we already have acquired
220        // a lock on `event_stores`.
221        // This is a potential lock-order-inversion.
222        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
223        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
224
225        for entry in fs::read_dir(&self.path)? {
226            let entry = entry?;
227            if entry.file_type()?.is_file() {
228                let store_name = entry.file_name().into_string()?;
229                log::info!("Loading events for {}", store_name);
230                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
231                    log::warn!("Trimming {}'s events", store_name);
232                    if let Err(err) = fs::remove_file(entry.path()) {
233                        match err.kind() {
234                            std::io::ErrorKind::NotFound => {
235                                // silently drop this error, the file was already non-existing
236                            }
237                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
238                        }
239                    }
240                    continue;
241                }
242                let file = BufReader::new(File::open(entry.path())?);
243                db.insert(
244                    store_name,
245                    file.lines()
246                        .map_while(Result::ok)
247                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
248                        .collect(),
249                );
250            }
251        }
252        Ok(())
253    }
254
255    /// Records an event in the desired stores.
256    ///
257    /// # Arguments
258    ///
259    /// * `glean` - The Glean instance.
260    /// * `meta` - The metadata about the event metric. Used to get the category,
261    ///   name and stores for the metric.
262    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
263    ///   monotonically increasing timer (this value is obtained on the
264    ///   platform-specific side).
265    /// * `extra` - Extra data values, mapping strings to strings.
266    ///
267    /// ## Returns
268    ///
269    /// `true` if a ping was submitted and should be uploaded.
270    /// `false` otherwise.
271    pub fn record(
272        &self,
273        glean: &Glean,
274        meta: &CommonMetricDataInternal,
275        timestamp: u64,
276        extra: Option<HashMap<String, String>>,
277    ) -> bool {
278        // If upload is disabled we don't want to record.
279        if !glean.is_upload_enabled() {
280            return false;
281        }
282
283        let mut submit_max_capacity_event_ping = false;
284        {
285            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
286            for store_name in meta.inner.send_in_pings.iter() {
287                if !glean.is_ping_enabled(store_name) {
288                    continue;
289                }
290
291                let store = db.entry(store_name.to_string()).or_default();
292                let execution_counter = CounterMetric::new(CommonMetricData {
293                    name: "execution_counter".into(),
294                    category: store_name.into(),
295                    send_in_pings: vec![INTERNAL_STORAGE.into()],
296                    lifetime: Lifetime::Ping,
297                    ..Default::default()
298                })
299                .get_value(glean, INTERNAL_STORAGE);
300                // Create StoredEvent object, and its JSON form for serialization on disk.
301                let event = StoredEvent {
302                    event: RecordedEvent {
303                        timestamp,
304                        category: meta.inner.category.to_string(),
305                        name: meta.inner.name.to_string(),
306                        extra: extra.clone(),
307                    },
308                    execution_counter,
309                };
310                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
311                store.push(event);
312                self.write_event_to_disk(store_name, &event_json);
313                if store_name == "events" && store.len() == glean.get_max_events() {
314                    submit_max_capacity_event_ping = true;
315                }
316            }
317        }
318        if submit_max_capacity_event_ping {
319            glean.submit_ping_by_name("events", Some("max_capacity"));
320            true
321        } else {
322            false
323        }
324    }
325
326    /// Writes an event to a single store on disk.
327    ///
328    /// # Arguments
329    ///
330    /// * `store_name` - The name of the store.
331    /// * `event_json` - The event content, as a single-line JSON-encoded string.
332    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
333        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
334        if let Err(err) = OpenOptions::new()
335            .create(true)
336            .append(true)
337            .open(self.path.join(store_name))
338            .and_then(|mut file| writeln!(file, "{}", event_json))
339        {
340            log::warn!("IO error writing event to store '{}': {}", store_name, err);
341        }
342    }
343
344    /// Normalizes the store in-place.
345    ///
346    /// A store may be in any order and contain any number of `glean.restarted` events,
347    /// whose values must be taken into account, along with `execution_counter` values,
348    /// to come up with the correct events with correct `timestamp` values,
349    /// on which we then sort.
350    ///
351    /// 1. Sort by `execution_counter` and `timestamp`,
352    ///    breaking ties so that `glean.restarted` comes first.
353    /// 2. Remove all initial and final `glean.restarted` events
354    /// 3. For each group of events that share a `execution_counter`,
355    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
356    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
357    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
358    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
359    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
360    /// 5. Sort by `timestamp`
361    ///
362    /// In the event that something goes awry, this will record an invalid_state on
363    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
364    /// on client clock weirdness.
365    ///
366    /// # Arguments
367    ///
368    /// * `glean` - Used to report errors
369    /// * `store_name` - The name of the store we're normalizing.
370    /// * `store` - The store we're to normalize.
371    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
372    fn normalize_store(
373        &self,
374        glean: &Glean,
375        store_name: &str,
376        store: &mut Vec<StoredEvent>,
377        glean_start_time: DateTime<FixedOffset>,
378    ) {
379        let is_glean_restarted =
380            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
381        let glean_restarted_meta = |store_name: &str| CommonMetricData {
382            name: "restarted".into(),
383            category: "glean".into(),
384            send_in_pings: vec![store_name.into()],
385            lifetime: Lifetime::Ping,
386            ..Default::default()
387        };
388        // Step 1
389        store.sort_by(|a, b| {
390            a.execution_counter
391                .cmp(&b.execution_counter)
392                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
393                .then_with(|| {
394                    if is_glean_restarted(&a.event) {
395                        Ordering::Less
396                    } else {
397                        Ordering::Greater
398                    }
399                })
400        });
401        // Step 2
402        // Find the index of the first and final non-`glean.restarted` events.
403        // Remove events before the first and after the final.
404        let final_event = match store
405            .iter()
406            .rposition(|event| !is_glean_restarted(&event.event))
407        {
408            Some(idx) => idx + 1,
409            _ => 0,
410        };
411        store.drain(final_event..);
412        let first_event = store
413            .iter()
414            .position(|event| !is_glean_restarted(&event.event))
415            .unwrap_or(store.len());
416        store.drain(..first_event);
417        if store.is_empty() {
418            // There was nothing but `glean.restarted` events. Job's done!
419            return;
420        }
421        // Step 3
422        // It is allowed that there might not be any `glean.restarted` event, nor
423        // `execution_counter` extra values. (This should always be the case for the
424        // "events" ping, for instance).
425        // Other inconsistencies are evidence of errors, and so are logged.
426        let mut cur_ec = 0;
427        // The offset within a group of events with the same `execution_counter`.
428        let mut intra_group_offset = store[0].event.timestamp;
429        // The offset between this group and ping_info.start_date.
430        let mut inter_group_offset = 0;
431        let mut highest_ts = 0;
432        for event in store.iter_mut() {
433            let execution_counter = event.execution_counter.take().unwrap_or(0);
434            if is_glean_restarted(&event.event) {
435                // We've entered the next "event group".
436                // We need a new epoch based on glean.startup.date - ping_info.start_date
437                cur_ec = execution_counter;
438                let glean_startup_date = event
439                    .event
440                    .extra
441                    .as_mut()
442                    .and_then(|extra| {
443                        extra.remove("glean.startup.date").and_then(|date_str| {
444                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
445                                .map_err(|_| {
446                                    record_error(
447                                        glean,
448                                        &glean_restarted_meta(store_name).into(),
449                                        ErrorType::InvalidState,
450                                        format!("Unparseable glean.startup.date '{}'", date_str),
451                                        None,
452                                    );
453                                })
454                                .ok()
455                        })
456                    })
457                    .unwrap_or(glean_start_time);
458                if event
459                    .event
460                    .extra
461                    .as_ref()
462                    .is_some_and(|extra| extra.is_empty())
463                {
464                    // Small optimization to save us sending empty dicts.
465                    event.event.extra = None;
466                }
467                let ping_start = DatetimeMetric::new(
468                    CommonMetricData {
469                        name: format!("{}#start", store_name),
470                        category: "".into(),
471                        send_in_pings: vec![INTERNAL_STORAGE.into()],
472                        lifetime: Lifetime::User,
473                        ..Default::default()
474                    },
475                    TimeUnit::Minute,
476                );
477                let ping_start = ping_start
478                    .get_value(glean, INTERNAL_STORAGE)
479                    .unwrap_or(glean_start_time);
480                let time_from_ping_start_to_glean_restarted =
481                    (glean_startup_date - ping_start).num_milliseconds();
482                intra_group_offset = event.event.timestamp;
483                inter_group_offset =
484                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
485                if inter_group_offset < highest_ts {
486                    record_error(
487                        glean,
488                        &glean_restarted_meta(store_name).into(),
489                        ErrorType::InvalidValue,
490                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
491                        None,
492                    );
493                    // The client's clock went backwards enough that this event group's
494                    // glean.restarted looks like it happened _before_ the final event of the previous group.
495                    // Or, it went ahead enough to overflow u64.
496                    // Adjust things so this group starts 1ms after the previous one.
497                    inter_group_offset = highest_ts + 1;
498                }
499            } else if cur_ec == 0 {
500                // bug 1811872 - cur_ec might need initialization.
501                cur_ec = execution_counter;
502            }
503            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
504            if execution_counter != cur_ec {
505                record_error(
506                    glean,
507                    &glean_restarted_meta(store_name).into(),
508                    ErrorType::InvalidState,
509                    format!(
510                        "Inconsistent execution counter {} (expected {})",
511                        execution_counter, cur_ec
512                    ),
513                    None,
514                );
515                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
516                cur_ec = execution_counter;
517            }
518            if highest_ts > event.event.timestamp {
519                // Even though we sorted everything, something in the
520                // execution_counter or glean.startup.date math went awry.
521                record_error(
522                    glean,
523                    &glean_restarted_meta(store_name).into(),
524                    ErrorType::InvalidState,
525                    format!(
526                        "Inconsistent previous highest timestamp {} (expected <= {})",
527                        highest_ts, event.event.timestamp
528                    ),
529                    None,
530                );
531                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
532            }
533            highest_ts = event.event.timestamp
534        }
535    }
536
537    /// Gets a snapshot of the stored event data as a JsonValue.
538    ///
539    /// # Arguments
540    ///
541    /// * `glean` - the Glean instance.
542    /// * `store_name` - The name of the desired store.
543    /// * `clear_store` - Whether to clear the store after snapshotting.
544    ///
545    /// # Returns
546    ///
547    /// A array of events, JSON encoded, if any. Otherwise `None`.
548    pub fn snapshot_as_json(
549        &self,
550        glean: &Glean,
551        store_name: &str,
552        clear_store: bool,
553    ) -> Option<JsonValue> {
554        let result = {
555            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
556            db.get_mut(&store_name.to_string()).and_then(|store| {
557                if !store.is_empty() {
558                    // Normalization happens in-place, so if we're not clearing,
559                    // operate on a clone.
560                    let mut clone;
561                    let store = if clear_store {
562                        store
563                    } else {
564                        clone = store.clone();
565                        &mut clone
566                    };
567                    // We may need to normalize event timestamps across multiple restarts.
568                    self.normalize_store(glean, store_name, store, glean.start_time());
569                    Some(json!(store))
570                } else {
571                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
572                    None
573                }
574            })
575        };
576
577        if clear_store {
578            self.event_stores
579                .write()
580                .unwrap() // safe unwrap, only error case is poisoning
581                .remove(&store_name.to_string());
582
583            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
584            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
585                match err.kind() {
586                    std::io::ErrorKind::NotFound => {
587                        // silently drop this error, the file was already non-existing
588                    }
589                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
590                }
591            }
592        }
593
594        result
595    }
596
597    /// Clears all stored events, both in memory and on-disk.
598    pub fn clear_all(&self) -> Result<()> {
599        // safe unwrap, only error case is poisoning
600        self.event_stores.write().unwrap().clear();
601
602        // safe unwrap, only error case is poisoning
603        let _lock = self.file_lock.lock().unwrap();
604        std::fs::remove_dir_all(&self.path)?;
605        create_dir_all(&self.path)?;
606
607        Ok(())
608    }
609
610    /// **Test-only API (exported for FFI purposes).**
611    ///
612    /// Gets the vector of currently stored events for the given event metric in
613    /// the given store.
614    ///
615    /// This doesn't clear the stored value.
616    pub fn test_get_value<'a>(
617        &'a self,
618        meta: &'a CommonMetricDataInternal,
619        store_name: &str,
620    ) -> Option<Vec<RecordedEvent>> {
621        record_coverage(&meta.base_identifier());
622
623        let value: Vec<RecordedEvent> = self
624            .event_stores
625            .read()
626            .unwrap() // safe unwrap, only error case is poisoning
627            .get(&store_name.to_string())
628            .into_iter()
629            .flatten()
630            .map(|stored_event| stored_event.event.clone())
631            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
632            .collect();
633        if !value.is_empty() {
634            Some(value)
635        } else {
636            None
637        }
638    }
639}
640
641#[cfg(test)]
642mod test {
643    use super::*;
644    use crate::test_get_num_recorded_errors;
645    use crate::tests::new_glean;
646    use chrono::{TimeZone, Timelike};
647
648    #[test]
649    fn handle_truncated_events_on_disk() {
650        let (glean, t) = new_glean(None);
651
652        {
653            let db = EventDatabase::new(t.path()).unwrap();
654            db.write_event_to_disk("events", "{\"timestamp\": 500");
655            db.write_event_to_disk("events", "{\"timestamp\"");
656            db.write_event_to_disk(
657                "events",
658                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
659            );
660        }
661
662        {
663            let db = EventDatabase::new(t.path()).unwrap();
664            db.load_events_from_disk(&glean, false).unwrap();
665            let events = &db.event_stores.read().unwrap()["events"];
666            assert_eq!(1, events.len());
667        }
668    }
669
670    #[test]
671    fn stable_serialization() {
672        let event_empty = RecordedEvent {
673            timestamp: 2,
674            category: "cat".to_string(),
675            name: "name".to_string(),
676            extra: None,
677        };
678
679        let mut data = HashMap::new();
680        data.insert("a key".to_string(), "a value".to_string());
681        let event_data = RecordedEvent {
682            timestamp: 2,
683            category: "cat".to_string(),
684            name: "name".to_string(),
685            extra: Some(data),
686        };
687
688        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
689        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
690
691        assert_eq!(
692            StoredEvent {
693                event: event_empty,
694                execution_counter: None
695            },
696            serde_json::from_str(&event_empty_json).unwrap()
697        );
698        assert_eq!(
699            StoredEvent {
700                event: event_data,
701                execution_counter: None
702            },
703            serde_json::from_str(&event_data_json).unwrap()
704        );
705    }
706
707    #[test]
708    fn deserialize_existing_data() {
709        let event_empty_json = r#"
710{
711  "timestamp": 2,
712  "category": "cat",
713  "name": "name"
714}
715            "#;
716
717        let event_data_json = r#"
718{
719  "timestamp": 2,
720  "category": "cat",
721  "name": "name",
722  "extra": {
723    "a key": "a value"
724  }
725}
726        "#;
727
728        let event_empty = RecordedEvent {
729            timestamp: 2,
730            category: "cat".to_string(),
731            name: "name".to_string(),
732            extra: None,
733        };
734
735        let mut data = HashMap::new();
736        data.insert("a key".to_string(), "a value".to_string());
737        let event_data = RecordedEvent {
738            timestamp: 2,
739            category: "cat".to_string(),
740            name: "name".to_string(),
741            extra: Some(data),
742        };
743
744        assert_eq!(
745            StoredEvent {
746                event: event_empty,
747                execution_counter: None
748            },
749            serde_json::from_str(event_empty_json).unwrap()
750        );
751        assert_eq!(
752            StoredEvent {
753                event: event_data,
754                execution_counter: None
755            },
756            serde_json::from_str(event_data_json).unwrap()
757        );
758    }
759
760    #[test]
761    fn doesnt_record_when_upload_is_disabled() {
762        let (mut glean, dir) = new_glean(None);
763        let db = EventDatabase::new(dir.path()).unwrap();
764
765        let test_storage = "store1";
766        let test_category = "category";
767        let test_name = "name";
768        let test_timestamp = 2;
769        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
770        let event_data = RecordedEvent {
771            timestamp: test_timestamp,
772            category: test_category.to_string(),
773            name: test_name.to_string(),
774            extra: None,
775        };
776
777        // Upload is not yet disabled,
778        // so let's check that everything is getting recorded as expected.
779        db.record(&glean, &test_meta, 2, None);
780        {
781            let event_stores = db.event_stores.read().unwrap();
782            assert_eq!(
783                &StoredEvent {
784                    event: event_data,
785                    execution_counter: None
786                },
787                &event_stores.get(test_storage).unwrap()[0]
788            );
789            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
790        }
791
792        glean.set_upload_enabled(false);
793
794        // Now that upload is disabled, let's check nothing is recorded.
795        db.record(&glean, &test_meta, 2, None);
796        {
797            let event_stores = db.event_stores.read().unwrap();
798            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
799        }
800    }
801
802    #[test]
803    fn normalize_store_of_glean_restarted() {
804        // Make sure stores empty of anything but glean.restarted events normalize without issue.
805        let (glean, _dir) = new_glean(None);
806
807        let store_name = "store-name";
808        let glean_restarted = StoredEvent {
809            event: RecordedEvent {
810                timestamp: 2,
811                category: "glean".into(),
812                name: "restarted".into(),
813                extra: None,
814            },
815            execution_counter: None,
816        };
817        let mut store = vec![glean_restarted.clone()];
818        let glean_start_time = glean.start_time();
819
820        glean
821            .event_storage()
822            .normalize_store(&glean, store_name, &mut store, glean_start_time);
823        assert!(store.is_empty());
824
825        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
826        glean
827            .event_storage()
828            .normalize_store(&glean, store_name, &mut store, glean_start_time);
829        assert!(store.is_empty());
830
831        let mut store = vec![
832            glean_restarted.clone(),
833            glean_restarted.clone(),
834            glean_restarted,
835        ];
836        glean
837            .event_storage()
838            .normalize_store(&glean, store_name, &mut store, glean_start_time);
839        assert!(store.is_empty());
840    }
841
842    #[test]
843    fn normalize_store_of_glean_restarted_on_both_ends() {
844        // Make sure stores with non-glean.restarted events don't get drained too far.
845        let (glean, _dir) = new_glean(None);
846
847        let store_name = "store-name";
848        let glean_restarted = StoredEvent {
849            event: RecordedEvent {
850                timestamp: 2,
851                category: "glean".into(),
852                name: "restarted".into(),
853                extra: None,
854            },
855            execution_counter: None,
856        };
857        let not_glean_restarted = StoredEvent {
858            event: RecordedEvent {
859                timestamp: 20,
860                category: "category".into(),
861                name: "name".into(),
862                extra: None,
863            },
864            execution_counter: None,
865        };
866        let mut store = vec![
867            glean_restarted.clone(),
868            not_glean_restarted.clone(),
869            glean_restarted,
870        ];
871        let glean_start_time = glean.start_time();
872
873        glean
874            .event_storage()
875            .normalize_store(&glean, store_name, &mut store, glean_start_time);
876        assert_eq!(1, store.len());
877        assert_eq!(
878            StoredEvent {
879                event: RecordedEvent {
880                    timestamp: 0,
881                    ..not_glean_restarted.event
882                },
883                execution_counter: None
884            },
885            store[0]
886        );
887    }
888
889    #[test]
890    fn normalize_store_single_run_timestamp_math() {
891        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
892        // ensure the timestamp math works.
893        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
894        let (glean, _dir) = new_glean(None);
895
896        let store_name = "store-name";
897        let glean_restarted = StoredEvent {
898            event: RecordedEvent {
899                timestamp: 2,
900                category: "glean".into(),
901                name: "restarted".into(),
902                extra: None,
903            },
904            execution_counter: None,
905        };
906        let timestamps = [20, 40, 200];
907        let not_glean_restarted = StoredEvent {
908            event: RecordedEvent {
909                timestamp: timestamps[0],
910                category: "category".into(),
911                name: "name".into(),
912                extra: None,
913            },
914            execution_counter: None,
915        };
916        let mut store = vec![
917            glean_restarted.clone(),
918            not_glean_restarted.clone(),
919            StoredEvent {
920                event: RecordedEvent {
921                    timestamp: timestamps[1],
922                    ..not_glean_restarted.event.clone()
923                },
924                execution_counter: None,
925            },
926            StoredEvent {
927                event: RecordedEvent {
928                    timestamp: timestamps[2],
929                    ..not_glean_restarted.event.clone()
930                },
931                execution_counter: None,
932            },
933            glean_restarted,
934        ];
935
936        glean
937            .event_storage()
938            .normalize_store(&glean, store_name, &mut store, glean.start_time());
939        assert_eq!(3, store.len());
940        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
941            assert_eq!(
942                &StoredEvent {
943                    event: RecordedEvent {
944                        timestamp: timestamp - timestamps[0],
945                        ..not_glean_restarted.clone().event
946                    },
947                    execution_counter: None
948                },
949                event
950            );
951        }
952    }
953
954    #[test]
955    fn normalize_store_multi_run_timestamp_math() {
956        // With multiple runs of events (separated by `glean.restarted`),
957        // ensure the timestamp math works.
958        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
959        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
960        let (glean, _dir) = new_glean(None);
961
962        let store_name = "store-name";
963        let glean_restarted = StoredEvent {
964            event: RecordedEvent {
965                category: "glean".into(),
966                name: "restarted".into(),
967                ..Default::default()
968            },
969            execution_counter: None,
970        };
971        let not_glean_restarted = StoredEvent {
972            event: RecordedEvent {
973                category: "category".into(),
974                name: "name".into(),
975                ..Default::default()
976            },
977            execution_counter: None,
978        };
979
980        // This scenario represents a run of three events followed by an hour between runs,
981        // followed by one final event.
982        let timestamps = [20, 40, 200, 12];
983        let ecs = [0, 1];
984        let some_hour = 16;
985        let startup_date = FixedOffset::east(0)
986            .ymd(2022, 11, 24)
987            .and_hms(some_hour, 29, 0); // TimeUnit::Minute -- don't put seconds
988        let glean_start_time = startup_date.with_hour(some_hour - 1);
989        let restarted_ts = 2;
990        let mut store = vec![
991            StoredEvent {
992                event: RecordedEvent {
993                    timestamp: timestamps[0],
994                    ..not_glean_restarted.event.clone()
995                },
996                execution_counter: Some(ecs[0]),
997            },
998            StoredEvent {
999                event: RecordedEvent {
1000                    timestamp: timestamps[1],
1001                    ..not_glean_restarted.event.clone()
1002                },
1003                execution_counter: Some(ecs[0]),
1004            },
1005            StoredEvent {
1006                event: RecordedEvent {
1007                    timestamp: timestamps[2],
1008                    ..not_glean_restarted.event.clone()
1009                },
1010                execution_counter: Some(ecs[0]),
1011            },
1012            StoredEvent {
1013                event: RecordedEvent {
1014                    extra: Some(
1015                        [(
1016                            "glean.startup.date".into(),
1017                            get_iso_time_string(startup_date, TimeUnit::Minute),
1018                        )]
1019                        .into(),
1020                    ),
1021                    timestamp: restarted_ts,
1022                    ..glean_restarted.event.clone()
1023                },
1024                execution_counter: Some(ecs[1]),
1025            },
1026            StoredEvent {
1027                event: RecordedEvent {
1028                    timestamp: timestamps[3],
1029                    ..not_glean_restarted.event.clone()
1030                },
1031                execution_counter: Some(ecs[1]),
1032            },
1033        ];
1034
1035        glean.event_storage().normalize_store(
1036            &glean,
1037            store_name,
1038            &mut store,
1039            glean_start_time.unwrap(),
1040        );
1041        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1042
1043        // Let's check the first three.
1044        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1045            assert_eq!(
1046                StoredEvent {
1047                    event: RecordedEvent {
1048                        timestamp: timestamp - timestamps[0],
1049                        ..not_glean_restarted.event.clone()
1050                    },
1051                    execution_counter: None,
1052                },
1053                event
1054            );
1055        }
1056        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1057        let hour_in_millis = 3600000;
1058        assert_eq!(
1059            store[3],
1060            StoredEvent {
1061                event: RecordedEvent {
1062                    timestamp: hour_in_millis,
1063                    ..glean_restarted.event
1064                },
1065                execution_counter: None,
1066            }
1067        );
1068        // The fifth should have a timestamp based on the new origin.
1069        assert_eq!(
1070            store[4],
1071            StoredEvent {
1072                event: RecordedEvent {
1073                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1074                    ..not_glean_restarted.event
1075                },
1076                execution_counter: None,
1077            }
1078        );
1079    }
1080
1081    #[test]
1082    fn normalize_store_multi_run_client_clocks() {
1083        // With multiple runs of events (separated by `glean.restarted`),
1084        // ensure the timestamp math works. Even when the client clock goes backwards.
1085        let (glean, _dir) = new_glean(None);
1086
1087        let store_name = "store-name";
1088        let glean_restarted = StoredEvent {
1089            event: RecordedEvent {
1090                category: "glean".into(),
1091                name: "restarted".into(),
1092                ..Default::default()
1093            },
1094            execution_counter: None,
1095        };
1096        let not_glean_restarted = StoredEvent {
1097            event: RecordedEvent {
1098                category: "category".into(),
1099                name: "name".into(),
1100                ..Default::default()
1101            },
1102            execution_counter: None,
1103        };
1104
1105        // This scenario represents a run of two events followed by negative one hours between runs,
1106        // followed by two more events.
1107        let timestamps = [20, 40, 12, 200];
1108        let ecs = [0, 1];
1109        let some_hour = 10;
1110        let startup_date = FixedOffset::east(0)
1111            .ymd(2022, 11, 25)
1112            .and_hms(some_hour, 37, 0); // TimeUnit::Minute -- don't put seconds
1113        let glean_start_time = startup_date.with_hour(some_hour + 1);
1114        let restarted_ts = 2;
1115        let mut store = vec![
1116            StoredEvent {
1117                event: RecordedEvent {
1118                    timestamp: timestamps[0],
1119                    ..not_glean_restarted.event.clone()
1120                },
1121                execution_counter: Some(ecs[0]),
1122            },
1123            StoredEvent {
1124                event: RecordedEvent {
1125                    timestamp: timestamps[1],
1126                    ..not_glean_restarted.event.clone()
1127                },
1128                execution_counter: Some(ecs[0]),
1129            },
1130            StoredEvent {
1131                event: RecordedEvent {
1132                    extra: Some(
1133                        [(
1134                            "glean.startup.date".into(),
1135                            get_iso_time_string(startup_date, TimeUnit::Minute),
1136                        )]
1137                        .into(),
1138                    ),
1139                    timestamp: restarted_ts,
1140                    ..glean_restarted.event.clone()
1141                },
1142                execution_counter: Some(ecs[1]),
1143            },
1144            StoredEvent {
1145                event: RecordedEvent {
1146                    timestamp: timestamps[2],
1147                    ..not_glean_restarted.event.clone()
1148                },
1149                execution_counter: Some(ecs[1]),
1150            },
1151            StoredEvent {
1152                event: RecordedEvent {
1153                    timestamp: timestamps[3],
1154                    ..not_glean_restarted.event.clone()
1155                },
1156                execution_counter: Some(ecs[1]),
1157            },
1158        ];
1159
1160        glean.event_storage().normalize_store(
1161            &glean,
1162            store_name,
1163            &mut store,
1164            glean_start_time.unwrap(),
1165        );
1166        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1167
1168        // Let's check the first two.
1169        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1170            assert_eq!(
1171                StoredEvent {
1172                    event: RecordedEvent {
1173                        timestamp: timestamp - timestamps[0],
1174                        ..not_glean_restarted.event.clone()
1175                    },
1176                    execution_counter: None,
1177                },
1178                event
1179            );
1180        }
1181        // The third should be a glean.restarted. Its timestamp should be
1182        // one larger than the largest timestamp seen so far (because that's
1183        // how we ensure monotonic timestamps when client clocks go backwards).
1184        assert_eq!(
1185            store[2],
1186            StoredEvent {
1187                event: RecordedEvent {
1188                    timestamp: store[1].event.timestamp + 1,
1189                    ..glean_restarted.event
1190                },
1191                execution_counter: None,
1192            }
1193        );
1194        // The fifth should have a timestamp based on the new origin.
1195        assert_eq!(
1196            store[3],
1197            StoredEvent {
1198                event: RecordedEvent {
1199                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1200                    ..not_glean_restarted.event
1201                },
1202                execution_counter: None,
1203            }
1204        );
1205        // And we should have an InvalidValue on glean.restarted to show for it.
1206        assert_eq!(
1207            Ok(1),
1208            test_get_num_recorded_errors(
1209                &glean,
1210                &CommonMetricData {
1211                    name: "restarted".into(),
1212                    category: "glean".into(),
1213                    send_in_pings: vec![store_name.into()],
1214                    lifetime: Lifetime::Ping,
1215                    ..Default::default()
1216                }
1217                .into(),
1218                ErrorType::InvalidValue
1219            )
1220        );
1221    }
1222
1223    #[test]
1224    fn normalize_store_non_zero_ec() {
1225        // After the first run, execution_counter will likely be non-zero.
1226        // Ensure normalizing a store that begins with non-zero ec works.
1227        let (glean, _dir) = new_glean(None);
1228
1229        let store_name = "store-name";
1230        let glean_restarted = StoredEvent {
1231            event: RecordedEvent {
1232                timestamp: 2,
1233                category: "glean".into(),
1234                name: "restarted".into(),
1235                extra: None,
1236            },
1237            execution_counter: Some(2),
1238        };
1239        let not_glean_restarted = StoredEvent {
1240            event: RecordedEvent {
1241                timestamp: 20,
1242                category: "category".into(),
1243                name: "name".into(),
1244                extra: None,
1245            },
1246            execution_counter: Some(2),
1247        };
1248        let glean_restarted_2 = StoredEvent {
1249            event: RecordedEvent {
1250                timestamp: 2,
1251                category: "glean".into(),
1252                name: "restarted".into(),
1253                extra: None,
1254            },
1255            execution_counter: Some(3),
1256        };
1257        let mut store = vec![
1258            glean_restarted,
1259            not_glean_restarted.clone(),
1260            glean_restarted_2,
1261        ];
1262        let glean_start_time = glean.start_time();
1263
1264        glean
1265            .event_storage()
1266            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1267
1268        assert_eq!(1, store.len());
1269        assert_eq!(
1270            StoredEvent {
1271                event: RecordedEvent {
1272                    timestamp: 0,
1273                    ..not_glean_restarted.event
1274                },
1275                execution_counter: None
1276            },
1277            store[0]
1278        );
1279        // And we should have no InvalidState errors on glean.restarted.
1280        assert!(test_get_num_recorded_errors(
1281            &glean,
1282            &CommonMetricData {
1283                name: "restarted".into(),
1284                category: "glean".into(),
1285                send_in_pings: vec![store_name.into()],
1286                lifetime: Lifetime::Ping,
1287                ..Default::default()
1288            }
1289            .into(),
1290            ErrorType::InvalidState
1291        )
1292        .is_err());
1293        // (and, just because we're here, double-check there are no InvalidValue either).
1294        assert!(test_get_num_recorded_errors(
1295            &glean,
1296            &CommonMetricData {
1297                name: "restarted".into(),
1298                category: "glean".into(),
1299                send_in_pings: vec![store_name.into()],
1300                lifetime: Lifetime::Ping,
1301                ..Default::default()
1302            }
1303            .into(),
1304            ErrorType::InvalidValue
1305        )
1306        .is_err());
1307    }
1308}