glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::HashMap;
7use std::fs;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufRead;
10use std::io::BufReader;
11use std::io::Write;
12use std::path::{Path, PathBuf};
13use std::sync::{Mutex, RwLock};
14
15use chrono::{DateTime, FixedOffset, Utc};
16
17use malloc_size_of::MallocSizeOf;
18use malloc_size_of_derive::MallocSizeOf;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value as JsonValue};
21
22use crate::common_metric_data::CommonMetricDataInternal;
23use crate::coverage::record_coverage;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::storage::INTERNAL_STORAGE;
27use crate::util::get_iso_time_string;
28use crate::Glean;
29use crate::Result;
30use crate::{CommonMetricData, CounterMetric, Lifetime};
31
32/// Represents the recorded data for a single event.
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
34#[cfg_attr(test, derive(Default))]
35pub struct RecordedEvent {
36    /// The timestamp of when the event was recorded.
37    ///
38    /// This allows to order events from a single process run.
39    pub timestamp: u64,
40
41    /// The event's category.
42    ///
43    /// This is defined by users in the metrics file.
44    pub category: String,
45
46    /// The event's name.
47    ///
48    /// This is defined by users in the metrics file.
49    pub name: String,
50
51    /// A map of all extra data values.
52    ///
53    /// The set of allowed extra keys is defined by users in the metrics file.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub extra: Option<HashMap<String, String>>,
56}
57
58/// Represents the stored data for a single event.
59#[derive(
60    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
61)]
62struct StoredEvent {
63    #[serde(flatten)]
64    event: RecordedEvent,
65
66    /// The monotonically-increasing execution counter.
67    ///
68    /// Included to allow sending of events across Glean restarts (bug 1716725).
69    /// Is i32 because it is stored in a CounterMetric.
70    #[serde(default)]
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub execution_counter: Option<i32>,
73}
74
75/// This struct handles the in-memory and on-disk storage logic for events.
76///
77/// So that the data survives shutting down of the application, events are stored
78/// in an append-only file on disk, in addition to the store in memory. Each line
79/// of this file records a single event in JSON, exactly as it will be sent in the
80/// ping. There is one file per store.
81///
82/// When restarting the application, these on-disk files are checked, and if any are
83/// found, they are loaded, and a `glean.restarted` event is added before any
84/// further events are collected. This is because the timestamps for these events
85/// may have come from a previous boot of the device, and therefore will not be
86/// compatible with any newly-collected events.
87///
88/// Normalizing all these timestamps happens on serialization for submission (see
89/// `serialize_as_json`) where the client time between restarts is calculated using
90/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
91/// the `execution_counter` stored in events on disk.
92///
93/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
94/// The `glean.restarted` event is, though.
95/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
96#[derive(Debug)]
97pub struct EventDatabase {
98    /// Path to directory of on-disk event files
99    pub path: PathBuf,
100    /// The in-memory list of events
101    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
102    /// A lock to be held when doing operations on the filesystem
103    file_lock: Mutex<()>,
104}
105
106impl MallocSizeOf for EventDatabase {
107    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
108        self.event_stores.read().unwrap().size_of(ops)
109    }
110}
111
112impl EventDatabase {
113    /// Creates a new event database.
114    ///
115    /// # Arguments
116    ///
117    /// * `data_path` - The directory to store events in. A new directory
118    /// * `events` - will be created inside of this directory.
119    pub fn new(data_path: &Path) -> Result<Self> {
120        let path = data_path.join("events");
121        create_dir_all(&path)?;
122
123        Ok(Self {
124            path,
125            event_stores: RwLock::new(HashMap::new()),
126            file_lock: Mutex::new(()),
127        })
128    }
129
130    /// Initializes events storage after Glean is fully initialized and ready to send pings.
131    ///
132    /// This must be called once on application startup, e.g. from
133    /// [Glean.initialize], but after we are ready to send pings, since this
134    /// could potentially collect and send the "events" ping.
135    ///
136    /// If there are any events queued on disk, it loads them into memory so
137    /// that the memory and disk representations are in sync.
138    ///
139    /// If event records for the "events" ping are present, they are assembled into
140    /// an "events" ping which is submitted immediately with reason "startup".
141    ///
142    /// If event records for custom pings are present, we increment the custom pings'
143    /// stores' `execution_counter` and record a `glean.restarted`
144    /// event with the current client clock in its `glean.startup.date` extra.
145    ///
146    /// # Arguments
147    ///
148    /// * `glean` - The Glean instance.
149    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
150    ///   any events not belonging to pings previously registered via `register_ping_type`.
151    ///
152    /// # Returns
153    ///
154    /// Whether the "events" ping was submitted.
155    pub fn flush_pending_events_on_startup(
156        &self,
157        glean: &Glean,
158        trim_data_to_registered_pings: bool,
159    ) -> bool {
160        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
161            Ok(_) => {
162                let stores_with_events: Vec<String> = {
163                    self.event_stores
164                        .read()
165                        .unwrap()
166                        .keys()
167                        .map(|x| x.to_owned())
168                        .collect() // safe unwrap, only error case is poisoning
169                };
170                // We do not want to be holding the event stores lock when
171                // submitting a ping or recording new events.
172                let has_events_events = stores_with_events.contains(&"events".to_owned());
173                let glean_restarted_stores = if has_events_events {
174                    stores_with_events
175                        .into_iter()
176                        .filter(|store| store != "events")
177                        .collect()
178                } else {
179                    stores_with_events
180                };
181                if !glean_restarted_stores.is_empty() {
182                    for store_name in glean_restarted_stores.iter() {
183                        CounterMetric::new(CommonMetricData {
184                            name: "execution_counter".into(),
185                            category: store_name.into(),
186                            send_in_pings: vec![INTERNAL_STORAGE.into()],
187                            lifetime: Lifetime::Ping,
188                            ..Default::default()
189                        })
190                        .add_sync(glean, 1);
191                    }
192                    let glean_restarted = CommonMetricData {
193                        name: "restarted".into(),
194                        category: "glean".into(),
195                        send_in_pings: glean_restarted_stores,
196                        lifetime: Lifetime::Ping,
197                        ..Default::default()
198                    };
199                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
200                    let mut extra: HashMap<String, String> =
201                        [("glean.startup.date".into(), startup)].into();
202                    if glean.with_timestamps() {
203                        let now = Utc::now();
204                        let precise_timestamp = now.timestamp_millis() as u64;
205                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
206                    }
207                    self.record(
208                        glean,
209                        &glean_restarted.into(),
210                        crate::get_timestamp_ms(),
211                        Some(extra),
212                    );
213                }
214                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
215            }
216            Err(err) => {
217                log::warn!("Error loading events from disk: {}", err);
218                false
219            }
220        }
221    }
222
223    fn load_events_from_disk(
224        &self,
225        glean: &Glean,
226        trim_data_to_registered_pings: bool,
227    ) -> Result<()> {
228        // NOTE: The order of locks here is important.
229        // In other code parts we might acquire the `file_lock` when we already have acquired
230        // a lock on `event_stores`.
231        // This is a potential lock-order-inversion.
232        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
233        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
234
235        for entry in fs::read_dir(&self.path)? {
236            let entry = entry?;
237            if entry.file_type()?.is_file() {
238                let store_name = entry.file_name().into_string()?;
239                log::info!("Loading events for {}", store_name);
240                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
241                    log::warn!("Trimming {}'s events", store_name);
242                    if let Err(err) = fs::remove_file(entry.path()) {
243                        match err.kind() {
244                            std::io::ErrorKind::NotFound => {
245                                // silently drop this error, the file was already non-existing
246                            }
247                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
248                        }
249                    }
250                    continue;
251                }
252                let file = BufReader::new(File::open(entry.path())?);
253                db.insert(
254                    store_name,
255                    file.lines()
256                        .map_while(Result::ok)
257                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
258                        .collect(),
259                );
260            }
261        }
262        Ok(())
263    }
264
265    /// Records an event in the desired stores.
266    ///
267    /// # Arguments
268    ///
269    /// * `glean` - The Glean instance.
270    /// * `meta` - The metadata about the event metric. Used to get the category,
271    ///   name and stores for the metric.
272    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
273    ///   monotonically increasing timer (this value is obtained on the
274    ///   platform-specific side).
275    /// * `extra` - Extra data values, mapping strings to strings.
276    ///
277    /// ## Returns
278    ///
279    /// `true` if a ping was submitted and should be uploaded.
280    /// `false` otherwise.
281    pub fn record(
282        &self,
283        glean: &Glean,
284        meta: &CommonMetricDataInternal,
285        timestamp: u64,
286        extra: Option<HashMap<String, String>>,
287    ) -> bool {
288        // If upload is disabled we don't want to record.
289        if !glean.is_upload_enabled() {
290            return false;
291        }
292
293        let mut submit_max_capacity_event_ping = false;
294        {
295            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
296            for store_name in meta.inner.send_in_pings.iter() {
297                if !glean.is_ping_enabled(store_name) {
298                    continue;
299                }
300
301                let store = db.entry(store_name.to_string()).or_default();
302                let execution_counter = CounterMetric::new(CommonMetricData {
303                    name: "execution_counter".into(),
304                    category: store_name.into(),
305                    send_in_pings: vec![INTERNAL_STORAGE.into()],
306                    lifetime: Lifetime::Ping,
307                    ..Default::default()
308                })
309                .get_value(glean, INTERNAL_STORAGE);
310                // Create StoredEvent object, and its JSON form for serialization on disk.
311                let event = StoredEvent {
312                    event: RecordedEvent {
313                        timestamp,
314                        category: meta.inner.category.to_string(),
315                        name: meta.inner.name.to_string(),
316                        extra: extra.clone(),
317                    },
318                    execution_counter,
319                };
320                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
321                store.push(event);
322                self.write_event_to_disk(store_name, &event_json);
323                if store_name == "events" && store.len() == glean.get_max_events() {
324                    submit_max_capacity_event_ping = true;
325                }
326            }
327        }
328        if submit_max_capacity_event_ping {
329            glean.submit_ping_by_name("events", Some("max_capacity"));
330            true
331        } else {
332            false
333        }
334    }
335
336    /// Writes an event to a single store on disk.
337    ///
338    /// # Arguments
339    ///
340    /// * `store_name` - The name of the store.
341    /// * `event_json` - The event content, as a single-line JSON-encoded string.
342    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
343        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
344        if let Err(err) = OpenOptions::new()
345            .create(true)
346            .append(true)
347            .open(self.path.join(store_name))
348            .and_then(|mut file| writeln!(file, "{}", event_json))
349        {
350            log::warn!("IO error writing event to store '{}': {}", store_name, err);
351        }
352    }
353
354    /// Normalizes the store in-place.
355    ///
356    /// A store may be in any order and contain any number of `glean.restarted` events,
357    /// whose values must be taken into account, along with `execution_counter` values,
358    /// to come up with the correct events with correct `timestamp` values,
359    /// on which we then sort.
360    ///
361    /// 1. Sort by `execution_counter` and `timestamp`,
362    ///    breaking ties so that `glean.restarted` comes first.
363    /// 2. Remove all initial and final `glean.restarted` events
364    /// 3. For each group of events that share a `execution_counter`,
365    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
366    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
367    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
368    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
369    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
370    /// 5. Sort by `timestamp`
371    ///
372    /// In the event that something goes awry, this will record an invalid_state on
373    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
374    /// on client clock weirdness.
375    ///
376    /// # Arguments
377    ///
378    /// * `glean` - Used to report errors
379    /// * `store_name` - The name of the store we're normalizing.
380    /// * `store` - The store we're to normalize.
381    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
382    fn normalize_store(
383        &self,
384        glean: &Glean,
385        store_name: &str,
386        store: &mut Vec<StoredEvent>,
387        glean_start_time: DateTime<FixedOffset>,
388    ) {
389        let is_glean_restarted =
390            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
391        let glean_restarted_meta = |store_name: &str| CommonMetricData {
392            name: "restarted".into(),
393            category: "glean".into(),
394            send_in_pings: vec![store_name.into()],
395            lifetime: Lifetime::Ping,
396            ..Default::default()
397        };
398        // Step 1
399        store.sort_by(|a, b| {
400            a.execution_counter
401                .cmp(&b.execution_counter)
402                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
403                .then_with(|| {
404                    if is_glean_restarted(&a.event) {
405                        Ordering::Less
406                    } else {
407                        Ordering::Greater
408                    }
409                })
410        });
411        // Step 2
412        // Find the index of the first and final non-`glean.restarted` events.
413        // Remove events before the first and after the final.
414        let final_event = match store
415            .iter()
416            .rposition(|event| !is_glean_restarted(&event.event))
417        {
418            Some(idx) => idx + 1,
419            _ => 0,
420        };
421        store.drain(final_event..);
422        let first_event = store
423            .iter()
424            .position(|event| !is_glean_restarted(&event.event))
425            .unwrap_or(store.len());
426        store.drain(..first_event);
427        if store.is_empty() {
428            // There was nothing but `glean.restarted` events. Job's done!
429            return;
430        }
431        // Step 3
432        // It is allowed that there might not be any `glean.restarted` event, nor
433        // `execution_counter` extra values. (This should always be the case for the
434        // "events" ping, for instance).
435        // Other inconsistencies are evidence of errors, and so are logged.
436        let mut cur_ec = 0;
437        // The offset within a group of events with the same `execution_counter`.
438        let mut intra_group_offset = store[0].event.timestamp;
439        // The offset between this group and ping_info.start_date.
440        let mut inter_group_offset = 0;
441        let mut highest_ts = 0;
442        for event in store.iter_mut() {
443            let execution_counter = event.execution_counter.take().unwrap_or(0);
444            if is_glean_restarted(&event.event) {
445                // We've entered the next "event group".
446                // We need a new epoch based on glean.startup.date - ping_info.start_date
447                cur_ec = execution_counter;
448                let glean_startup_date = event
449                    .event
450                    .extra
451                    .as_mut()
452                    .and_then(|extra| {
453                        extra.remove("glean.startup.date").and_then(|date_str| {
454                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
455                                .map_err(|_| {
456                                    record_error(
457                                        glean,
458                                        &glean_restarted_meta(store_name).into(),
459                                        ErrorType::InvalidState,
460                                        format!("Unparseable glean.startup.date '{}'", date_str),
461                                        None,
462                                    );
463                                })
464                                .ok()
465                        })
466                    })
467                    .unwrap_or(glean_start_time);
468                if event
469                    .event
470                    .extra
471                    .as_ref()
472                    .is_some_and(|extra| extra.is_empty())
473                {
474                    // Small optimization to save us sending empty dicts.
475                    event.event.extra = None;
476                }
477                let ping_start = DatetimeMetric::new(
478                    CommonMetricData {
479                        name: format!("{}#start", store_name),
480                        category: "".into(),
481                        send_in_pings: vec![INTERNAL_STORAGE.into()],
482                        lifetime: Lifetime::User,
483                        ..Default::default()
484                    },
485                    TimeUnit::Minute,
486                );
487                let ping_start = ping_start
488                    .get_value(glean, INTERNAL_STORAGE)
489                    .unwrap_or(glean_start_time);
490                let time_from_ping_start_to_glean_restarted =
491                    (glean_startup_date - ping_start).num_milliseconds();
492                intra_group_offset = event.event.timestamp;
493                inter_group_offset =
494                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
495                if inter_group_offset < highest_ts {
496                    record_error(
497                        glean,
498                        &glean_restarted_meta(store_name).into(),
499                        ErrorType::InvalidValue,
500                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
501                        None,
502                    );
503                    // The client's clock went backwards enough that this event group's
504                    // glean.restarted looks like it happened _before_ the final event of the previous group.
505                    // Or, it went ahead enough to overflow u64.
506                    // Adjust things so this group starts 1ms after the previous one.
507                    inter_group_offset = highest_ts + 1;
508                }
509            } else if cur_ec == 0 {
510                // bug 1811872 - cur_ec might need initialization.
511                cur_ec = execution_counter;
512            }
513            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
514            if execution_counter != cur_ec {
515                record_error(
516                    glean,
517                    &glean_restarted_meta(store_name).into(),
518                    ErrorType::InvalidState,
519                    format!(
520                        "Inconsistent execution counter {} (expected {})",
521                        execution_counter, cur_ec
522                    ),
523                    None,
524                );
525                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
526                cur_ec = execution_counter;
527            }
528            if highest_ts > event.event.timestamp {
529                // Even though we sorted everything, something in the
530                // execution_counter or glean.startup.date math went awry.
531                record_error(
532                    glean,
533                    &glean_restarted_meta(store_name).into(),
534                    ErrorType::InvalidState,
535                    format!(
536                        "Inconsistent previous highest timestamp {} (expected <= {})",
537                        highest_ts, event.event.timestamp
538                    ),
539                    None,
540                );
541                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
542            }
543            highest_ts = event.event.timestamp
544        }
545    }
546
547    /// Gets a snapshot of the stored event data as a JsonValue.
548    ///
549    /// # Arguments
550    ///
551    /// * `glean` - the Glean instance.
552    /// * `store_name` - The name of the desired store.
553    /// * `clear_store` - Whether to clear the store after snapshotting.
554    ///
555    /// # Returns
556    ///
557    /// A array of events, JSON encoded, if any. Otherwise `None`.
558    pub fn snapshot_as_json(
559        &self,
560        glean: &Glean,
561        store_name: &str,
562        clear_store: bool,
563    ) -> Option<JsonValue> {
564        let result = {
565            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
566            db.get_mut(&store_name.to_string()).and_then(|store| {
567                if !store.is_empty() {
568                    // Normalization happens in-place, so if we're not clearing,
569                    // operate on a clone.
570                    let mut clone;
571                    let store = if clear_store {
572                        store
573                    } else {
574                        clone = store.clone();
575                        &mut clone
576                    };
577                    // We may need to normalize event timestamps across multiple restarts.
578                    self.normalize_store(glean, store_name, store, glean.start_time());
579                    Some(json!(store))
580                } else {
581                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
582                    None
583                }
584            })
585        };
586
587        if clear_store {
588            self.event_stores
589                .write()
590                .unwrap() // safe unwrap, only error case is poisoning
591                .remove(&store_name.to_string());
592
593            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
594            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
595                match err.kind() {
596                    std::io::ErrorKind::NotFound => {
597                        // silently drop this error, the file was already non-existing
598                    }
599                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
600                }
601            }
602        }
603
604        result
605    }
606
607    /// Clears all stored events, both in memory and on-disk.
608    pub fn clear_all(&self) -> Result<()> {
609        // safe unwrap, only error case is poisoning
610        self.event_stores.write().unwrap().clear();
611
612        // safe unwrap, only error case is poisoning
613        let _lock = self.file_lock.lock().unwrap();
614        std::fs::remove_dir_all(&self.path)?;
615        create_dir_all(&self.path)?;
616
617        Ok(())
618    }
619
620    /// **Test-only API (exported for FFI purposes).**
621    ///
622    /// Gets the vector of currently stored events for the given event metric in
623    /// the given store.
624    ///
625    /// This doesn't clear the stored value.
626    pub fn test_get_value<'a>(
627        &'a self,
628        meta: &'a CommonMetricDataInternal,
629        store_name: &str,
630    ) -> Option<Vec<RecordedEvent>> {
631        record_coverage(&meta.base_identifier());
632
633        let value: Vec<RecordedEvent> = self
634            .event_stores
635            .read()
636            .unwrap() // safe unwrap, only error case is poisoning
637            .get(&store_name.to_string())
638            .into_iter()
639            .flatten()
640            .map(|stored_event| stored_event.event.clone())
641            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
642            .collect();
643        if !value.is_empty() {
644            Some(value)
645        } else {
646            None
647        }
648    }
649}
650
651#[cfg(test)]
652mod test {
653    use super::*;
654    use crate::test_get_num_recorded_errors;
655    use crate::tests::new_glean;
656    use chrono::{TimeZone, Timelike};
657
658    #[test]
659    fn handle_truncated_events_on_disk() {
660        let (glean, t) = new_glean(None);
661
662        {
663            let db = EventDatabase::new(t.path()).unwrap();
664            db.write_event_to_disk("events", "{\"timestamp\": 500");
665            db.write_event_to_disk("events", "{\"timestamp\"");
666            db.write_event_to_disk(
667                "events",
668                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
669            );
670        }
671
672        {
673            let db = EventDatabase::new(t.path()).unwrap();
674            db.load_events_from_disk(&glean, false).unwrap();
675            let events = &db.event_stores.read().unwrap()["events"];
676            assert_eq!(1, events.len());
677        }
678    }
679
680    #[test]
681    fn stable_serialization() {
682        let event_empty = RecordedEvent {
683            timestamp: 2,
684            category: "cat".to_string(),
685            name: "name".to_string(),
686            extra: None,
687        };
688
689        let mut data = HashMap::new();
690        data.insert("a key".to_string(), "a value".to_string());
691        let event_data = RecordedEvent {
692            timestamp: 2,
693            category: "cat".to_string(),
694            name: "name".to_string(),
695            extra: Some(data),
696        };
697
698        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
699        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
700
701        assert_eq!(
702            StoredEvent {
703                event: event_empty,
704                execution_counter: None
705            },
706            serde_json::from_str(&event_empty_json).unwrap()
707        );
708        assert_eq!(
709            StoredEvent {
710                event: event_data,
711                execution_counter: None
712            },
713            serde_json::from_str(&event_data_json).unwrap()
714        );
715    }
716
717    #[test]
718    fn deserialize_existing_data() {
719        let event_empty_json = r#"
720{
721  "timestamp": 2,
722  "category": "cat",
723  "name": "name"
724}
725            "#;
726
727        let event_data_json = r#"
728{
729  "timestamp": 2,
730  "category": "cat",
731  "name": "name",
732  "extra": {
733    "a key": "a value"
734  }
735}
736        "#;
737
738        let event_empty = RecordedEvent {
739            timestamp: 2,
740            category: "cat".to_string(),
741            name: "name".to_string(),
742            extra: None,
743        };
744
745        let mut data = HashMap::new();
746        data.insert("a key".to_string(), "a value".to_string());
747        let event_data = RecordedEvent {
748            timestamp: 2,
749            category: "cat".to_string(),
750            name: "name".to_string(),
751            extra: Some(data),
752        };
753
754        assert_eq!(
755            StoredEvent {
756                event: event_empty,
757                execution_counter: None
758            },
759            serde_json::from_str(event_empty_json).unwrap()
760        );
761        assert_eq!(
762            StoredEvent {
763                event: event_data,
764                execution_counter: None
765            },
766            serde_json::from_str(event_data_json).unwrap()
767        );
768    }
769
770    #[test]
771    fn doesnt_record_when_upload_is_disabled() {
772        let (mut glean, dir) = new_glean(None);
773        let db = EventDatabase::new(dir.path()).unwrap();
774
775        let test_storage = "store1";
776        let test_category = "category";
777        let test_name = "name";
778        let test_timestamp = 2;
779        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
780        let event_data = RecordedEvent {
781            timestamp: test_timestamp,
782            category: test_category.to_string(),
783            name: test_name.to_string(),
784            extra: None,
785        };
786
787        // Upload is not yet disabled,
788        // so let's check that everything is getting recorded as expected.
789        db.record(&glean, &test_meta, 2, None);
790        {
791            let event_stores = db.event_stores.read().unwrap();
792            assert_eq!(
793                &StoredEvent {
794                    event: event_data,
795                    execution_counter: None
796                },
797                &event_stores.get(test_storage).unwrap()[0]
798            );
799            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
800        }
801
802        glean.set_upload_enabled(false);
803
804        // Now that upload is disabled, let's check nothing is recorded.
805        db.record(&glean, &test_meta, 2, None);
806        {
807            let event_stores = db.event_stores.read().unwrap();
808            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
809        }
810    }
811
812    #[test]
813    fn normalize_store_of_glean_restarted() {
814        // Make sure stores empty of anything but glean.restarted events normalize without issue.
815        let (glean, _dir) = new_glean(None);
816
817        let store_name = "store-name";
818        let glean_restarted = StoredEvent {
819            event: RecordedEvent {
820                timestamp: 2,
821                category: "glean".into(),
822                name: "restarted".into(),
823                extra: None,
824            },
825            execution_counter: None,
826        };
827        let mut store = vec![glean_restarted.clone()];
828        let glean_start_time = glean.start_time();
829
830        glean
831            .event_storage()
832            .normalize_store(&glean, store_name, &mut store, glean_start_time);
833        assert!(store.is_empty());
834
835        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
836        glean
837            .event_storage()
838            .normalize_store(&glean, store_name, &mut store, glean_start_time);
839        assert!(store.is_empty());
840
841        let mut store = vec![
842            glean_restarted.clone(),
843            glean_restarted.clone(),
844            glean_restarted,
845        ];
846        glean
847            .event_storage()
848            .normalize_store(&glean, store_name, &mut store, glean_start_time);
849        assert!(store.is_empty());
850    }
851
852    #[test]
853    fn normalize_store_of_glean_restarted_on_both_ends() {
854        // Make sure stores with non-glean.restarted events don't get drained too far.
855        let (glean, _dir) = new_glean(None);
856
857        let store_name = "store-name";
858        let glean_restarted = StoredEvent {
859            event: RecordedEvent {
860                timestamp: 2,
861                category: "glean".into(),
862                name: "restarted".into(),
863                extra: None,
864            },
865            execution_counter: None,
866        };
867        let not_glean_restarted = StoredEvent {
868            event: RecordedEvent {
869                timestamp: 20,
870                category: "category".into(),
871                name: "name".into(),
872                extra: None,
873            },
874            execution_counter: None,
875        };
876        let mut store = vec![
877            glean_restarted.clone(),
878            not_glean_restarted.clone(),
879            glean_restarted,
880        ];
881        let glean_start_time = glean.start_time();
882
883        glean
884            .event_storage()
885            .normalize_store(&glean, store_name, &mut store, glean_start_time);
886        assert_eq!(1, store.len());
887        assert_eq!(
888            StoredEvent {
889                event: RecordedEvent {
890                    timestamp: 0,
891                    ..not_glean_restarted.event
892                },
893                execution_counter: None
894            },
895            store[0]
896        );
897    }
898
899    #[test]
900    fn normalize_store_single_run_timestamp_math() {
901        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
902        // ensure the timestamp math works.
903        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
904        let (glean, _dir) = new_glean(None);
905
906        let store_name = "store-name";
907        let glean_restarted = StoredEvent {
908            event: RecordedEvent {
909                timestamp: 2,
910                category: "glean".into(),
911                name: "restarted".into(),
912                extra: None,
913            },
914            execution_counter: None,
915        };
916        let timestamps = [20, 40, 200];
917        let not_glean_restarted = StoredEvent {
918            event: RecordedEvent {
919                timestamp: timestamps[0],
920                category: "category".into(),
921                name: "name".into(),
922                extra: None,
923            },
924            execution_counter: None,
925        };
926        let mut store = vec![
927            glean_restarted.clone(),
928            not_glean_restarted.clone(),
929            StoredEvent {
930                event: RecordedEvent {
931                    timestamp: timestamps[1],
932                    ..not_glean_restarted.event.clone()
933                },
934                execution_counter: None,
935            },
936            StoredEvent {
937                event: RecordedEvent {
938                    timestamp: timestamps[2],
939                    ..not_glean_restarted.event.clone()
940                },
941                execution_counter: None,
942            },
943            glean_restarted,
944        ];
945
946        glean
947            .event_storage()
948            .normalize_store(&glean, store_name, &mut store, glean.start_time());
949        assert_eq!(3, store.len());
950        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
951            assert_eq!(
952                &StoredEvent {
953                    event: RecordedEvent {
954                        timestamp: timestamp - timestamps[0],
955                        ..not_glean_restarted.clone().event
956                    },
957                    execution_counter: None
958                },
959                event
960            );
961        }
962    }
963
964    #[test]
965    fn normalize_store_multi_run_timestamp_math() {
966        // With multiple runs of events (separated by `glean.restarted`),
967        // ensure the timestamp math works.
968        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
969        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
970        let (glean, _dir) = new_glean(None);
971
972        let store_name = "store-name";
973        let glean_restarted = StoredEvent {
974            event: RecordedEvent {
975                category: "glean".into(),
976                name: "restarted".into(),
977                ..Default::default()
978            },
979            execution_counter: None,
980        };
981        let not_glean_restarted = StoredEvent {
982            event: RecordedEvent {
983                category: "category".into(),
984                name: "name".into(),
985                ..Default::default()
986            },
987            execution_counter: None,
988        };
989
990        // This scenario represents a run of three events followed by an hour between runs,
991        // followed by one final event.
992        let timestamps = [20, 40, 200, 12];
993        let ecs = [0, 1];
994        let some_hour = 16;
995        let startup_date = FixedOffset::east(0)
996            .ymd(2022, 11, 24)
997            .and_hms(some_hour, 29, 0); // TimeUnit::Minute -- don't put seconds
998        let glean_start_time = startup_date.with_hour(some_hour - 1);
999        let restarted_ts = 2;
1000        let mut store = vec![
1001            StoredEvent {
1002                event: RecordedEvent {
1003                    timestamp: timestamps[0],
1004                    ..not_glean_restarted.event.clone()
1005                },
1006                execution_counter: Some(ecs[0]),
1007            },
1008            StoredEvent {
1009                event: RecordedEvent {
1010                    timestamp: timestamps[1],
1011                    ..not_glean_restarted.event.clone()
1012                },
1013                execution_counter: Some(ecs[0]),
1014            },
1015            StoredEvent {
1016                event: RecordedEvent {
1017                    timestamp: timestamps[2],
1018                    ..not_glean_restarted.event.clone()
1019                },
1020                execution_counter: Some(ecs[0]),
1021            },
1022            StoredEvent {
1023                event: RecordedEvent {
1024                    extra: Some(
1025                        [(
1026                            "glean.startup.date".into(),
1027                            get_iso_time_string(startup_date, TimeUnit::Minute),
1028                        )]
1029                        .into(),
1030                    ),
1031                    timestamp: restarted_ts,
1032                    ..glean_restarted.event.clone()
1033                },
1034                execution_counter: Some(ecs[1]),
1035            },
1036            StoredEvent {
1037                event: RecordedEvent {
1038                    timestamp: timestamps[3],
1039                    ..not_glean_restarted.event.clone()
1040                },
1041                execution_counter: Some(ecs[1]),
1042            },
1043        ];
1044
1045        glean.event_storage().normalize_store(
1046            &glean,
1047            store_name,
1048            &mut store,
1049            glean_start_time.unwrap(),
1050        );
1051        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1052
1053        // Let's check the first three.
1054        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1055            assert_eq!(
1056                StoredEvent {
1057                    event: RecordedEvent {
1058                        timestamp: timestamp - timestamps[0],
1059                        ..not_glean_restarted.event.clone()
1060                    },
1061                    execution_counter: None,
1062                },
1063                event
1064            );
1065        }
1066        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1067        let hour_in_millis = 3600000;
1068        assert_eq!(
1069            store[3],
1070            StoredEvent {
1071                event: RecordedEvent {
1072                    timestamp: hour_in_millis,
1073                    ..glean_restarted.event
1074                },
1075                execution_counter: None,
1076            }
1077        );
1078        // The fifth should have a timestamp based on the new origin.
1079        assert_eq!(
1080            store[4],
1081            StoredEvent {
1082                event: RecordedEvent {
1083                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1084                    ..not_glean_restarted.event
1085                },
1086                execution_counter: None,
1087            }
1088        );
1089    }
1090
1091    #[test]
1092    fn normalize_store_multi_run_client_clocks() {
1093        // With multiple runs of events (separated by `glean.restarted`),
1094        // ensure the timestamp math works. Even when the client clock goes backwards.
1095        let (glean, _dir) = new_glean(None);
1096
1097        let store_name = "store-name";
1098        let glean_restarted = StoredEvent {
1099            event: RecordedEvent {
1100                category: "glean".into(),
1101                name: "restarted".into(),
1102                ..Default::default()
1103            },
1104            execution_counter: None,
1105        };
1106        let not_glean_restarted = StoredEvent {
1107            event: RecordedEvent {
1108                category: "category".into(),
1109                name: "name".into(),
1110                ..Default::default()
1111            },
1112            execution_counter: None,
1113        };
1114
1115        // This scenario represents a run of two events followed by negative one hours between runs,
1116        // followed by two more events.
1117        let timestamps = [20, 40, 12, 200];
1118        let ecs = [0, 1];
1119        let some_hour = 10;
1120        let startup_date = FixedOffset::east(0)
1121            .ymd(2022, 11, 25)
1122            .and_hms(some_hour, 37, 0); // TimeUnit::Minute -- don't put seconds
1123        let glean_start_time = startup_date.with_hour(some_hour + 1);
1124        let restarted_ts = 2;
1125        let mut store = vec![
1126            StoredEvent {
1127                event: RecordedEvent {
1128                    timestamp: timestamps[0],
1129                    ..not_glean_restarted.event.clone()
1130                },
1131                execution_counter: Some(ecs[0]),
1132            },
1133            StoredEvent {
1134                event: RecordedEvent {
1135                    timestamp: timestamps[1],
1136                    ..not_glean_restarted.event.clone()
1137                },
1138                execution_counter: Some(ecs[0]),
1139            },
1140            StoredEvent {
1141                event: RecordedEvent {
1142                    extra: Some(
1143                        [(
1144                            "glean.startup.date".into(),
1145                            get_iso_time_string(startup_date, TimeUnit::Minute),
1146                        )]
1147                        .into(),
1148                    ),
1149                    timestamp: restarted_ts,
1150                    ..glean_restarted.event.clone()
1151                },
1152                execution_counter: Some(ecs[1]),
1153            },
1154            StoredEvent {
1155                event: RecordedEvent {
1156                    timestamp: timestamps[2],
1157                    ..not_glean_restarted.event.clone()
1158                },
1159                execution_counter: Some(ecs[1]),
1160            },
1161            StoredEvent {
1162                event: RecordedEvent {
1163                    timestamp: timestamps[3],
1164                    ..not_glean_restarted.event.clone()
1165                },
1166                execution_counter: Some(ecs[1]),
1167            },
1168        ];
1169
1170        glean.event_storage().normalize_store(
1171            &glean,
1172            store_name,
1173            &mut store,
1174            glean_start_time.unwrap(),
1175        );
1176        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1177
1178        // Let's check the first two.
1179        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1180            assert_eq!(
1181                StoredEvent {
1182                    event: RecordedEvent {
1183                        timestamp: timestamp - timestamps[0],
1184                        ..not_glean_restarted.event.clone()
1185                    },
1186                    execution_counter: None,
1187                },
1188                event
1189            );
1190        }
1191        // The third should be a glean.restarted. Its timestamp should be
1192        // one larger than the largest timestamp seen so far (because that's
1193        // how we ensure monotonic timestamps when client clocks go backwards).
1194        assert_eq!(
1195            store[2],
1196            StoredEvent {
1197                event: RecordedEvent {
1198                    timestamp: store[1].event.timestamp + 1,
1199                    ..glean_restarted.event
1200                },
1201                execution_counter: None,
1202            }
1203        );
1204        // The fifth should have a timestamp based on the new origin.
1205        assert_eq!(
1206            store[3],
1207            StoredEvent {
1208                event: RecordedEvent {
1209                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1210                    ..not_glean_restarted.event
1211                },
1212                execution_counter: None,
1213            }
1214        );
1215        // And we should have an InvalidValue on glean.restarted to show for it.
1216        assert_eq!(
1217            Ok(1),
1218            test_get_num_recorded_errors(
1219                &glean,
1220                &CommonMetricData {
1221                    name: "restarted".into(),
1222                    category: "glean".into(),
1223                    send_in_pings: vec![store_name.into()],
1224                    lifetime: Lifetime::Ping,
1225                    ..Default::default()
1226                }
1227                .into(),
1228                ErrorType::InvalidValue
1229            )
1230        );
1231    }
1232
1233    #[test]
1234    fn normalize_store_non_zero_ec() {
1235        // After the first run, execution_counter will likely be non-zero.
1236        // Ensure normalizing a store that begins with non-zero ec works.
1237        let (glean, _dir) = new_glean(None);
1238
1239        let store_name = "store-name";
1240        let glean_restarted = StoredEvent {
1241            event: RecordedEvent {
1242                timestamp: 2,
1243                category: "glean".into(),
1244                name: "restarted".into(),
1245                extra: None,
1246            },
1247            execution_counter: Some(2),
1248        };
1249        let not_glean_restarted = StoredEvent {
1250            event: RecordedEvent {
1251                timestamp: 20,
1252                category: "category".into(),
1253                name: "name".into(),
1254                extra: None,
1255            },
1256            execution_counter: Some(2),
1257        };
1258        let glean_restarted_2 = StoredEvent {
1259            event: RecordedEvent {
1260                timestamp: 2,
1261                category: "glean".into(),
1262                name: "restarted".into(),
1263                extra: None,
1264            },
1265            execution_counter: Some(3),
1266        };
1267        let mut store = vec![
1268            glean_restarted,
1269            not_glean_restarted.clone(),
1270            glean_restarted_2,
1271        ];
1272        let glean_start_time = glean.start_time();
1273
1274        glean
1275            .event_storage()
1276            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1277
1278        assert_eq!(1, store.len());
1279        assert_eq!(
1280            StoredEvent {
1281                event: RecordedEvent {
1282                    timestamp: 0,
1283                    ..not_glean_restarted.event
1284                },
1285                execution_counter: None
1286            },
1287            store[0]
1288        );
1289        // And we should have no InvalidState errors on glean.restarted.
1290        assert!(test_get_num_recorded_errors(
1291            &glean,
1292            &CommonMetricData {
1293                name: "restarted".into(),
1294                category: "glean".into(),
1295                send_in_pings: vec![store_name.into()],
1296                lifetime: Lifetime::Ping,
1297                ..Default::default()
1298            }
1299            .into(),
1300            ErrorType::InvalidState
1301        )
1302        .is_err());
1303        // (and, just because we're here, double-check there are no InvalidValue either).
1304        assert!(test_get_num_recorded_errors(
1305            &glean,
1306            &CommonMetricData {
1307                name: "restarted".into(),
1308                category: "glean".into(),
1309                send_in_pings: vec![store_name.into()],
1310                lifetime: Lifetime::Ping,
1311                ..Default::default()
1312            }
1313            .into(),
1314            ErrorType::InvalidValue
1315        )
1316        .is_err());
1317    }
1318}