glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::HashMap;
7use std::fs::{create_dir_all, File, OpenOptions};
8use std::io::BufRead;
9use std::io::BufReader;
10use std::io::Write;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex, RwLock};
13use std::{fs, mem};
14
15use chrono::{DateTime, FixedOffset, Utc};
16
17use malloc_size_of::MallocSizeOf;
18use malloc_size_of_derive::MallocSizeOf;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value as JsonValue};
21
22use crate::common_metric_data::CommonMetricDataInternal;
23use crate::coverage::record_coverage;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::storage::INTERNAL_STORAGE;
27use crate::util::get_iso_time_string;
28use crate::Glean;
29use crate::Result;
30use crate::{CommonMetricData, CounterMetric, Lifetime};
31
32/// Represents the recorded data for a single event.
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
34#[cfg_attr(test, derive(Default))]
35pub struct RecordedEvent {
36    /// The timestamp of when the event was recorded.
37    ///
38    /// This allows to order events from a single process run.
39    pub timestamp: u64,
40
41    /// The event's category.
42    ///
43    /// This is defined by users in the metrics file.
44    pub category: String,
45
46    /// The event's name.
47    ///
48    /// This is defined by users in the metrics file.
49    pub name: String,
50
51    /// A map of all extra data values.
52    ///
53    /// The set of allowed extra keys is defined by users in the metrics file.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub extra: Option<HashMap<String, String>>,
56}
57
58/// Represents the stored data for a single event.
59#[derive(
60    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
61)]
62struct StoredEvent {
63    #[serde(flatten)]
64    event: RecordedEvent,
65
66    /// The monotonically-increasing execution counter.
67    ///
68    /// Included to allow sending of events across Glean restarts (bug 1716725).
69    /// Is i32 because it is stored in a CounterMetric.
70    #[serde(default)]
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub execution_counter: Option<i32>,
73}
74
75/// This struct handles the in-memory and on-disk storage logic for events.
76///
77/// So that the data survives shutting down of the application, events are stored
78/// in an append-only file on disk, in addition to the store in memory. Each line
79/// of this file records a single event in JSON, exactly as it will be sent in the
80/// ping. There is one file per store.
81///
82/// When restarting the application, these on-disk files are checked, and if any are
83/// found, they are loaded, and a `glean.restarted` event is added before any
84/// further events are collected. This is because the timestamps for these events
85/// may have come from a previous boot of the device, and therefore will not be
86/// compatible with any newly-collected events.
87///
88/// Normalizing all these timestamps happens on serialization for submission (see
89/// `serialize_as_json`) where the client time between restarts is calculated using
90/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
91/// the `execution_counter` stored in events on disk.
92///
93/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
94/// The `glean.restarted` event is, though.
95/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
96#[derive(Debug)]
97pub struct EventDatabase {
98    /// Path to directory of on-disk event files
99    pub path: PathBuf,
100    /// The in-memory list of events
101    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
102    event_store_files: RwLock<HashMap<String, Arc<File>>>,
103    /// A lock to be held when doing operations on the filesystem
104    file_lock: Mutex<()>,
105}
106
107impl MallocSizeOf for EventDatabase {
108    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
109        let mut n = 0;
110        n += self.event_stores.read().unwrap().size_of(ops);
111
112        let map = self.event_store_files.read().unwrap();
113        for store_name in map.keys() {
114            n += store_name.size_of(ops);
115            // `File` doesn't allocate, but `Arc` puts it on the heap.
116            n += mem::size_of::<File>();
117        }
118        n
119    }
120}
121
122impl EventDatabase {
123    /// Creates a new event database.
124    ///
125    /// # Arguments
126    ///
127    /// * `data_path` - The directory to store events in. A new directory
128    /// * `events` - will be created inside of this directory.
129    pub fn new(data_path: &Path) -> Result<Self> {
130        let path = data_path.join("events");
131        create_dir_all(&path)?;
132
133        Ok(Self {
134            path,
135            event_stores: RwLock::new(HashMap::new()),
136            event_store_files: RwLock::new(HashMap::new()),
137            file_lock: Mutex::new(()),
138        })
139    }
140
141    /// Initializes events storage after Glean is fully initialized and ready to send pings.
142    ///
143    /// This must be called once on application startup, e.g. from
144    /// [Glean.initialize], but after we are ready to send pings, since this
145    /// could potentially collect and send the "events" ping.
146    ///
147    /// If there are any events queued on disk, it loads them into memory so
148    /// that the memory and disk representations are in sync.
149    ///
150    /// If event records for the "events" ping are present, they are assembled into
151    /// an "events" ping which is submitted immediately with reason "startup".
152    ///
153    /// If event records for custom pings are present, we increment the custom pings'
154    /// stores' `execution_counter` and record a `glean.restarted`
155    /// event with the current client clock in its `glean.startup.date` extra.
156    ///
157    /// # Arguments
158    ///
159    /// * `glean` - The Glean instance.
160    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
161    ///   any events not belonging to pings previously registered via `register_ping_type`.
162    ///
163    /// # Returns
164    ///
165    /// Whether the "events" ping was submitted.
166    pub fn flush_pending_events_on_startup(
167        &self,
168        glean: &Glean,
169        trim_data_to_registered_pings: bool,
170    ) -> bool {
171        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
172            Ok(_) => {
173                let stores_with_events: Vec<String> = {
174                    self.event_stores
175                        .read()
176                        .unwrap()
177                        .keys()
178                        .map(|x| x.to_owned())
179                        .collect() // safe unwrap, only error case is poisoning
180                };
181                // We do not want to be holding the event stores lock when
182                // submitting a ping or recording new events.
183                let has_events_events = stores_with_events.contains(&"events".to_owned());
184                let glean_restarted_stores = if has_events_events {
185                    stores_with_events
186                        .into_iter()
187                        .filter(|store| store != "events")
188                        .collect()
189                } else {
190                    stores_with_events
191                };
192                if !glean_restarted_stores.is_empty() {
193                    for store_name in glean_restarted_stores.iter() {
194                        CounterMetric::new(CommonMetricData {
195                            name: "execution_counter".into(),
196                            category: store_name.into(),
197                            send_in_pings: vec![INTERNAL_STORAGE.into()],
198                            lifetime: Lifetime::Ping,
199                            ..Default::default()
200                        })
201                        .add_sync(glean, 1);
202                    }
203                    let glean_restarted = CommonMetricData {
204                        name: "restarted".into(),
205                        category: "glean".into(),
206                        send_in_pings: glean_restarted_stores,
207                        lifetime: Lifetime::Ping,
208                        ..Default::default()
209                    };
210                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
211                    let mut extra: HashMap<String, String> =
212                        [("glean.startup.date".into(), startup)].into();
213                    if glean.with_timestamps() {
214                        let now = Utc::now();
215                        let precise_timestamp = now.timestamp_millis() as u64;
216                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
217                    }
218                    self.record(
219                        glean,
220                        &glean_restarted.into(),
221                        crate::get_timestamp_ms(),
222                        Some(extra),
223                    );
224                }
225                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
226            }
227            Err(err) => {
228                log::warn!("Error loading events from disk: {}", err);
229                false
230            }
231        }
232    }
233
234    fn load_events_from_disk(
235        &self,
236        glean: &Glean,
237        trim_data_to_registered_pings: bool,
238    ) -> Result<()> {
239        // NOTE: The order of locks here is important.
240        // In other code parts we might acquire the `file_lock` when we already have acquired
241        // a lock on `event_stores`.
242        // This is a potential lock-order-inversion.
243        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
244        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
245
246        for entry in fs::read_dir(&self.path)? {
247            let entry = entry?;
248            if entry.file_type()?.is_file() {
249                let store_name = entry.file_name().into_string()?;
250                log::info!("Loading events for {}", store_name);
251                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
252                    log::warn!("Trimming {}'s events", store_name);
253                    if let Err(err) = fs::remove_file(entry.path()) {
254                        match err.kind() {
255                            std::io::ErrorKind::NotFound => {
256                                // silently drop this error, the file was already non-existing
257                            }
258                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
259                        }
260                    }
261                    continue;
262                }
263                let file = BufReader::new(File::open(entry.path())?);
264                db.insert(
265                    store_name,
266                    file.lines()
267                        .map_while(Result::ok)
268                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
269                        .collect(),
270                );
271            }
272        }
273        Ok(())
274    }
275
276    /// Records an event in the desired stores.
277    ///
278    /// # Arguments
279    ///
280    /// * `glean` - The Glean instance.
281    /// * `meta` - The metadata about the event metric. Used to get the category,
282    ///   name and stores for the metric.
283    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
284    ///   monotonically increasing timer (this value is obtained on the
285    ///   platform-specific side).
286    /// * `extra` - Extra data values, mapping strings to strings.
287    ///
288    /// ## Returns
289    ///
290    /// `true` if a ping was submitted and should be uploaded.
291    /// `false` otherwise.
292    pub fn record(
293        &self,
294        glean: &Glean,
295        meta: &CommonMetricDataInternal,
296        timestamp: u64,
297        extra: Option<HashMap<String, String>>,
298    ) -> bool {
299        // If upload is disabled we don't want to record.
300        if !glean.is_upload_enabled() {
301            return false;
302        }
303
304        let mut submit_max_capacity_event_ping = false;
305        {
306            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
307            for store_name in meta.inner.send_in_pings.iter() {
308                if !glean.is_ping_enabled(store_name) {
309                    continue;
310                }
311
312                let store = db.entry(store_name.to_string()).or_default();
313                let execution_counter = CounterMetric::new(CommonMetricData {
314                    name: "execution_counter".into(),
315                    category: store_name.into(),
316                    send_in_pings: vec![INTERNAL_STORAGE.into()],
317                    lifetime: Lifetime::Ping,
318                    ..Default::default()
319                })
320                .get_value(glean, INTERNAL_STORAGE);
321                // Create StoredEvent object, and its JSON form for serialization on disk.
322                let event = StoredEvent {
323                    event: RecordedEvent {
324                        timestamp,
325                        category: meta.inner.category.to_string(),
326                        name: meta.inner.name.to_string(),
327                        extra: extra.clone(),
328                    },
329                    execution_counter,
330                };
331                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
332                store.push(event);
333                self.write_event_to_disk(store_name, &event_json);
334                if store_name == "events" && store.len() == glean.get_max_events() {
335                    submit_max_capacity_event_ping = true;
336                }
337            }
338        }
339        if submit_max_capacity_event_ping {
340            glean.submit_ping_by_name("events", Some("max_capacity"));
341            true
342        } else {
343            false
344        }
345    }
346
347    fn get_event_store(&self, store_name: &str) -> Arc<File> {
348        Arc::clone(
349            self.event_store_files
350                .write()
351                .unwrap()
352                .entry(store_name.to_string())
353                .or_insert_with(|| {
354                    Arc::new(
355                        OpenOptions::new()
356                            .create(true)
357                            .append(true)
358                            .open(self.path.join(store_name))
359                            .unwrap(),
360                    )
361                }),
362        )
363    }
364
365    /// Writes an event to a single store on disk.
366    ///
367    /// # Arguments
368    ///
369    /// * `store_name` - The name of the store.
370    /// * `event_json` - The event content, as a single-line JSON-encoded string.
371    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
372        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
373        let mut file = self.get_event_store(store_name);
374
375        let write_res = (|| {
376            file.write_all(event_json.as_bytes())?;
377            file.write_all(b"\n")?;
378            file.flush()?;
379            Ok::<(), std::io::Error>(())
380        })();
381
382        if let Err(err) = write_res {
383            log::warn!("IO error writing event to store '{}': {}", store_name, err);
384        }
385    }
386
387    /// Normalizes the store in-place.
388    ///
389    /// A store may be in any order and contain any number of `glean.restarted` events,
390    /// whose values must be taken into account, along with `execution_counter` values,
391    /// to come up with the correct events with correct `timestamp` values,
392    /// on which we then sort.
393    ///
394    /// 1. Sort by `execution_counter` and `timestamp`,
395    ///    breaking ties so that `glean.restarted` comes first.
396    /// 2. Remove all initial and final `glean.restarted` events
397    /// 3. For each group of events that share a `execution_counter`,
398    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
399    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
400    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
401    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
402    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
403    /// 5. Sort by `timestamp`
404    ///
405    /// In the event that something goes awry, this will record an invalid_state on
406    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
407    /// on client clock weirdness.
408    ///
409    /// # Arguments
410    ///
411    /// * `glean` - Used to report errors
412    /// * `store_name` - The name of the store we're normalizing.
413    /// * `store` - The store we're to normalize.
414    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
415    fn normalize_store(
416        &self,
417        glean: &Glean,
418        store_name: &str,
419        store: &mut Vec<StoredEvent>,
420        glean_start_time: DateTime<FixedOffset>,
421    ) {
422        let is_glean_restarted =
423            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
424        let glean_restarted_meta = |store_name: &str| CommonMetricData {
425            name: "restarted".into(),
426            category: "glean".into(),
427            send_in_pings: vec![store_name.into()],
428            lifetime: Lifetime::Ping,
429            ..Default::default()
430        };
431        // Step 1
432        store.sort_by(|a, b| {
433            a.execution_counter
434                .cmp(&b.execution_counter)
435                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
436                .then_with(|| {
437                    if is_glean_restarted(&a.event) {
438                        Ordering::Less
439                    } else {
440                        Ordering::Greater
441                    }
442                })
443        });
444        // Step 2
445        // Find the index of the first and final non-`glean.restarted` events.
446        // Remove events before the first and after the final.
447        let final_event = match store
448            .iter()
449            .rposition(|event| !is_glean_restarted(&event.event))
450        {
451            Some(idx) => idx + 1,
452            _ => 0,
453        };
454        store.drain(final_event..);
455        let first_event = store
456            .iter()
457            .position(|event| !is_glean_restarted(&event.event))
458            .unwrap_or(store.len());
459        store.drain(..first_event);
460        if store.is_empty() {
461            // There was nothing but `glean.restarted` events. Job's done!
462            return;
463        }
464        // Step 3
465        // It is allowed that there might not be any `glean.restarted` event, nor
466        // `execution_counter` extra values. (This should always be the case for the
467        // "events" ping, for instance).
468        // Other inconsistencies are evidence of errors, and so are logged.
469        let mut cur_ec = 0;
470        // The offset within a group of events with the same `execution_counter`.
471        let mut intra_group_offset = store[0].event.timestamp;
472        // The offset between this group and ping_info.start_date.
473        let mut inter_group_offset = 0;
474        let mut highest_ts = 0;
475        for event in store.iter_mut() {
476            let execution_counter = event.execution_counter.take().unwrap_or(0);
477            if is_glean_restarted(&event.event) {
478                // We've entered the next "event group".
479                // We need a new epoch based on glean.startup.date - ping_info.start_date
480                cur_ec = execution_counter;
481                let glean_startup_date = event
482                    .event
483                    .extra
484                    .as_mut()
485                    .and_then(|extra| {
486                        extra.remove("glean.startup.date").and_then(|date_str| {
487                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
488                                .map_err(|_| {
489                                    record_error(
490                                        glean,
491                                        &glean_restarted_meta(store_name).into(),
492                                        ErrorType::InvalidState,
493                                        format!("Unparseable glean.startup.date '{}'", date_str),
494                                        None,
495                                    );
496                                })
497                                .ok()
498                        })
499                    })
500                    .unwrap_or(glean_start_time);
501                if event
502                    .event
503                    .extra
504                    .as_ref()
505                    .is_some_and(|extra| extra.is_empty())
506                {
507                    // Small optimization to save us sending empty dicts.
508                    event.event.extra = None;
509                }
510                let ping_start = DatetimeMetric::new(
511                    CommonMetricData {
512                        name: format!("{}#start", store_name),
513                        category: "".into(),
514                        send_in_pings: vec![INTERNAL_STORAGE.into()],
515                        lifetime: Lifetime::User,
516                        ..Default::default()
517                    },
518                    TimeUnit::Minute,
519                );
520                let ping_start = ping_start
521                    .get_value(glean, INTERNAL_STORAGE)
522                    .unwrap_or(glean_start_time);
523                let time_from_ping_start_to_glean_restarted =
524                    (glean_startup_date - ping_start).num_milliseconds();
525                intra_group_offset = event.event.timestamp;
526                inter_group_offset =
527                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
528                if inter_group_offset < highest_ts {
529                    record_error(
530                        glean,
531                        &glean_restarted_meta(store_name).into(),
532                        ErrorType::InvalidValue,
533                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
534                        None,
535                    );
536                    // The client's clock went backwards enough that this event group's
537                    // glean.restarted looks like it happened _before_ the final event of the previous group.
538                    // Or, it went ahead enough to overflow u64.
539                    // Adjust things so this group starts 1ms after the previous one.
540                    inter_group_offset = highest_ts + 1;
541                }
542            } else if cur_ec == 0 {
543                // bug 1811872 - cur_ec might need initialization.
544                cur_ec = execution_counter;
545            }
546            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
547            if execution_counter != cur_ec {
548                record_error(
549                    glean,
550                    &glean_restarted_meta(store_name).into(),
551                    ErrorType::InvalidState,
552                    format!(
553                        "Inconsistent execution counter {} (expected {})",
554                        execution_counter, cur_ec
555                    ),
556                    None,
557                );
558                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
559                cur_ec = execution_counter;
560            }
561            if highest_ts > event.event.timestamp {
562                // Even though we sorted everything, something in the
563                // execution_counter or glean.startup.date math went awry.
564                record_error(
565                    glean,
566                    &glean_restarted_meta(store_name).into(),
567                    ErrorType::InvalidState,
568                    format!(
569                        "Inconsistent previous highest timestamp {} (expected <= {})",
570                        highest_ts, event.event.timestamp
571                    ),
572                    None,
573                );
574                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
575            }
576            highest_ts = event.event.timestamp
577        }
578    }
579
580    /// Gets a snapshot of the stored event data as a JsonValue.
581    ///
582    /// # Arguments
583    ///
584    /// * `glean` - the Glean instance.
585    /// * `store_name` - The name of the desired store.
586    /// * `clear_store` - Whether to clear the store after snapshotting.
587    ///
588    /// # Returns
589    ///
590    /// A array of events, JSON encoded, if any. Otherwise `None`.
591    pub fn snapshot_as_json(
592        &self,
593        glean: &Glean,
594        store_name: &str,
595        clear_store: bool,
596    ) -> Option<JsonValue> {
597        let result = {
598            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
599            db.get_mut(&store_name.to_string()).and_then(|store| {
600                if !store.is_empty() {
601                    // Normalization happens in-place, so if we're not clearing,
602                    // operate on a clone.
603                    let mut clone;
604                    let store = if clear_store {
605                        store
606                    } else {
607                        clone = store.clone();
608                        &mut clone
609                    };
610                    // We may need to normalize event timestamps across multiple restarts.
611                    self.normalize_store(glean, store_name, store, glean.start_time());
612                    Some(json!(store))
613                } else {
614                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
615                    None
616                }
617            })
618        };
619
620        if clear_store {
621            self.event_stores
622                .write()
623                .unwrap() // safe unwrap, only error case is poisoning
624                .remove(&store_name.to_string());
625            self.event_store_files
626                .write()
627                .unwrap() // safe unwrap, only error case is poisoning
628                .remove(&store_name.to_string());
629
630            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
631            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
632                match err.kind() {
633                    std::io::ErrorKind::NotFound => {
634                        // silently drop this error, the file was already non-existing
635                    }
636                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
637                }
638            }
639        }
640
641        result
642    }
643
644    /// Clears all stored events, both in memory and on-disk.
645    pub fn clear_all(&self) -> Result<()> {
646        // safe unwrap, only error case is poisoning
647        self.event_stores.write().unwrap().clear();
648        self.event_store_files.write().unwrap().clear();
649
650        // safe unwrap, only error case is poisoning
651        let _lock = self.file_lock.lock().unwrap();
652        std::fs::remove_dir_all(&self.path)?;
653        create_dir_all(&self.path)?;
654
655        Ok(())
656    }
657
658    /// **Test-only API (exported for FFI purposes).**
659    ///
660    /// Gets the vector of currently stored events for the given event metric in
661    /// the given store.
662    ///
663    /// This doesn't clear the stored value.
664    pub fn test_get_value<'a>(
665        &'a self,
666        meta: &'a CommonMetricDataInternal,
667        store_name: &str,
668    ) -> Option<Vec<RecordedEvent>> {
669        record_coverage(&meta.base_identifier());
670
671        let value: Vec<RecordedEvent> = self
672            .event_stores
673            .read()
674            .unwrap() // safe unwrap, only error case is poisoning
675            .get(&store_name.to_string())
676            .into_iter()
677            .flatten()
678            .map(|stored_event| stored_event.event.clone())
679            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
680            .collect();
681        if !value.is_empty() {
682            Some(value)
683        } else {
684            None
685        }
686    }
687}
688
689#[cfg(test)]
690mod test {
691    use super::*;
692    use crate::test_get_num_recorded_errors;
693    use crate::tests::new_glean;
694    use chrono::{TimeZone, Timelike};
695
696    #[test]
697    fn handle_truncated_events_on_disk() {
698        let (glean, t) = new_glean(None);
699
700        {
701            let db = EventDatabase::new(t.path()).unwrap();
702            db.write_event_to_disk("events", "{\"timestamp\": 500");
703            db.write_event_to_disk("events", "{\"timestamp\"");
704            db.write_event_to_disk(
705                "events",
706                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
707            );
708        }
709
710        {
711            let db = EventDatabase::new(t.path()).unwrap();
712            db.load_events_from_disk(&glean, false).unwrap();
713            let events = &db.event_stores.read().unwrap()["events"];
714            assert_eq!(1, events.len());
715        }
716    }
717
718    #[test]
719    fn stable_serialization() {
720        let event_empty = RecordedEvent {
721            timestamp: 2,
722            category: "cat".to_string(),
723            name: "name".to_string(),
724            extra: None,
725        };
726
727        let mut data = HashMap::new();
728        data.insert("a key".to_string(), "a value".to_string());
729        let event_data = RecordedEvent {
730            timestamp: 2,
731            category: "cat".to_string(),
732            name: "name".to_string(),
733            extra: Some(data),
734        };
735
736        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
737        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
738
739        assert_eq!(
740            StoredEvent {
741                event: event_empty,
742                execution_counter: None
743            },
744            serde_json::from_str(&event_empty_json).unwrap()
745        );
746        assert_eq!(
747            StoredEvent {
748                event: event_data,
749                execution_counter: None
750            },
751            serde_json::from_str(&event_data_json).unwrap()
752        );
753    }
754
755    #[test]
756    fn deserialize_existing_data() {
757        let event_empty_json = r#"
758{
759  "timestamp": 2,
760  "category": "cat",
761  "name": "name"
762}
763            "#;
764
765        let event_data_json = r#"
766{
767  "timestamp": 2,
768  "category": "cat",
769  "name": "name",
770  "extra": {
771    "a key": "a value"
772  }
773}
774        "#;
775
776        let event_empty = RecordedEvent {
777            timestamp: 2,
778            category: "cat".to_string(),
779            name: "name".to_string(),
780            extra: None,
781        };
782
783        let mut data = HashMap::new();
784        data.insert("a key".to_string(), "a value".to_string());
785        let event_data = RecordedEvent {
786            timestamp: 2,
787            category: "cat".to_string(),
788            name: "name".to_string(),
789            extra: Some(data),
790        };
791
792        assert_eq!(
793            StoredEvent {
794                event: event_empty,
795                execution_counter: None
796            },
797            serde_json::from_str(event_empty_json).unwrap()
798        );
799        assert_eq!(
800            StoredEvent {
801                event: event_data,
802                execution_counter: None
803            },
804            serde_json::from_str(event_data_json).unwrap()
805        );
806    }
807
808    #[test]
809    fn doesnt_record_when_upload_is_disabled() {
810        let (mut glean, dir) = new_glean(None);
811        let db = EventDatabase::new(dir.path()).unwrap();
812
813        let test_storage = "store1";
814        let test_category = "category";
815        let test_name = "name";
816        let test_timestamp = 2;
817        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
818        let event_data = RecordedEvent {
819            timestamp: test_timestamp,
820            category: test_category.to_string(),
821            name: test_name.to_string(),
822            extra: None,
823        };
824
825        // Upload is not yet disabled,
826        // so let's check that everything is getting recorded as expected.
827        db.record(&glean, &test_meta, 2, None);
828        {
829            let event_stores = db.event_stores.read().unwrap();
830            assert_eq!(
831                &StoredEvent {
832                    event: event_data,
833                    execution_counter: None
834                },
835                &event_stores.get(test_storage).unwrap()[0]
836            );
837            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
838        }
839
840        glean.set_upload_enabled(false);
841
842        // Now that upload is disabled, let's check nothing is recorded.
843        db.record(&glean, &test_meta, 2, None);
844        {
845            let event_stores = db.event_stores.read().unwrap();
846            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
847        }
848    }
849
850    #[test]
851    fn normalize_store_of_glean_restarted() {
852        // Make sure stores empty of anything but glean.restarted events normalize without issue.
853        let (glean, _dir) = new_glean(None);
854
855        let store_name = "store-name";
856        let glean_restarted = StoredEvent {
857            event: RecordedEvent {
858                timestamp: 2,
859                category: "glean".into(),
860                name: "restarted".into(),
861                extra: None,
862            },
863            execution_counter: None,
864        };
865        let mut store = vec![glean_restarted.clone()];
866        let glean_start_time = glean.start_time();
867
868        glean
869            .event_storage()
870            .normalize_store(&glean, store_name, &mut store, glean_start_time);
871        assert!(store.is_empty());
872
873        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
874        glean
875            .event_storage()
876            .normalize_store(&glean, store_name, &mut store, glean_start_time);
877        assert!(store.is_empty());
878
879        let mut store = vec![
880            glean_restarted.clone(),
881            glean_restarted.clone(),
882            glean_restarted,
883        ];
884        glean
885            .event_storage()
886            .normalize_store(&glean, store_name, &mut store, glean_start_time);
887        assert!(store.is_empty());
888    }
889
890    #[test]
891    fn normalize_store_of_glean_restarted_on_both_ends() {
892        // Make sure stores with non-glean.restarted events don't get drained too far.
893        let (glean, _dir) = new_glean(None);
894
895        let store_name = "store-name";
896        let glean_restarted = StoredEvent {
897            event: RecordedEvent {
898                timestamp: 2,
899                category: "glean".into(),
900                name: "restarted".into(),
901                extra: None,
902            },
903            execution_counter: None,
904        };
905        let not_glean_restarted = StoredEvent {
906            event: RecordedEvent {
907                timestamp: 20,
908                category: "category".into(),
909                name: "name".into(),
910                extra: None,
911            },
912            execution_counter: None,
913        };
914        let mut store = vec![
915            glean_restarted.clone(),
916            not_glean_restarted.clone(),
917            glean_restarted,
918        ];
919        let glean_start_time = glean.start_time();
920
921        glean
922            .event_storage()
923            .normalize_store(&glean, store_name, &mut store, glean_start_time);
924        assert_eq!(1, store.len());
925        assert_eq!(
926            StoredEvent {
927                event: RecordedEvent {
928                    timestamp: 0,
929                    ..not_glean_restarted.event
930                },
931                execution_counter: None
932            },
933            store[0]
934        );
935    }
936
937    #[test]
938    fn normalize_store_single_run_timestamp_math() {
939        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
940        // ensure the timestamp math works.
941        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
942        let (glean, _dir) = new_glean(None);
943
944        let store_name = "store-name";
945        let glean_restarted = StoredEvent {
946            event: RecordedEvent {
947                timestamp: 2,
948                category: "glean".into(),
949                name: "restarted".into(),
950                extra: None,
951            },
952            execution_counter: None,
953        };
954        let timestamps = [20, 40, 200];
955        let not_glean_restarted = StoredEvent {
956            event: RecordedEvent {
957                timestamp: timestamps[0],
958                category: "category".into(),
959                name: "name".into(),
960                extra: None,
961            },
962            execution_counter: None,
963        };
964        let mut store = vec![
965            glean_restarted.clone(),
966            not_glean_restarted.clone(),
967            StoredEvent {
968                event: RecordedEvent {
969                    timestamp: timestamps[1],
970                    ..not_glean_restarted.event.clone()
971                },
972                execution_counter: None,
973            },
974            StoredEvent {
975                event: RecordedEvent {
976                    timestamp: timestamps[2],
977                    ..not_glean_restarted.event.clone()
978                },
979                execution_counter: None,
980            },
981            glean_restarted,
982        ];
983
984        glean
985            .event_storage()
986            .normalize_store(&glean, store_name, &mut store, glean.start_time());
987        assert_eq!(3, store.len());
988        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
989            assert_eq!(
990                &StoredEvent {
991                    event: RecordedEvent {
992                        timestamp: timestamp - timestamps[0],
993                        ..not_glean_restarted.clone().event
994                    },
995                    execution_counter: None
996                },
997                event
998            );
999        }
1000    }
1001
1002    #[test]
1003    fn normalize_store_multi_run_timestamp_math() {
1004        // With multiple runs of events (separated by `glean.restarted`),
1005        // ensure the timestamp math works.
1006        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
1007        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
1008        let (glean, _dir) = new_glean(None);
1009
1010        let store_name = "store-name";
1011        let glean_restarted = StoredEvent {
1012            event: RecordedEvent {
1013                category: "glean".into(),
1014                name: "restarted".into(),
1015                ..Default::default()
1016            },
1017            execution_counter: None,
1018        };
1019        let not_glean_restarted = StoredEvent {
1020            event: RecordedEvent {
1021                category: "category".into(),
1022                name: "name".into(),
1023                ..Default::default()
1024            },
1025            execution_counter: None,
1026        };
1027
1028        // This scenario represents a run of three events followed by an hour between runs,
1029        // followed by one final event.
1030        let timestamps = [20, 40, 200, 12];
1031        let ecs = [0, 1];
1032        let some_hour = 16;
1033        let startup_date = FixedOffset::east_opt(0)
1034            .unwrap()
1035            .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) // TimeUnit::Minute -- don't put seconds
1036            .unwrap();
1037        let glean_start_time = startup_date.with_hour(some_hour - 1);
1038        let restarted_ts = 2;
1039        let mut store = vec![
1040            StoredEvent {
1041                event: RecordedEvent {
1042                    timestamp: timestamps[0],
1043                    ..not_glean_restarted.event.clone()
1044                },
1045                execution_counter: Some(ecs[0]),
1046            },
1047            StoredEvent {
1048                event: RecordedEvent {
1049                    timestamp: timestamps[1],
1050                    ..not_glean_restarted.event.clone()
1051                },
1052                execution_counter: Some(ecs[0]),
1053            },
1054            StoredEvent {
1055                event: RecordedEvent {
1056                    timestamp: timestamps[2],
1057                    ..not_glean_restarted.event.clone()
1058                },
1059                execution_counter: Some(ecs[0]),
1060            },
1061            StoredEvent {
1062                event: RecordedEvent {
1063                    extra: Some(
1064                        [(
1065                            "glean.startup.date".into(),
1066                            get_iso_time_string(startup_date, TimeUnit::Minute),
1067                        )]
1068                        .into(),
1069                    ),
1070                    timestamp: restarted_ts,
1071                    ..glean_restarted.event.clone()
1072                },
1073                execution_counter: Some(ecs[1]),
1074            },
1075            StoredEvent {
1076                event: RecordedEvent {
1077                    timestamp: timestamps[3],
1078                    ..not_glean_restarted.event.clone()
1079                },
1080                execution_counter: Some(ecs[1]),
1081            },
1082        ];
1083
1084        glean.event_storage().normalize_store(
1085            &glean,
1086            store_name,
1087            &mut store,
1088            glean_start_time.unwrap(),
1089        );
1090        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1091
1092        // Let's check the first three.
1093        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1094            assert_eq!(
1095                StoredEvent {
1096                    event: RecordedEvent {
1097                        timestamp: timestamp - timestamps[0],
1098                        ..not_glean_restarted.event.clone()
1099                    },
1100                    execution_counter: None,
1101                },
1102                event
1103            );
1104        }
1105        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1106        let hour_in_millis = 3600000;
1107        assert_eq!(
1108            store[3],
1109            StoredEvent {
1110                event: RecordedEvent {
1111                    timestamp: hour_in_millis,
1112                    ..glean_restarted.event
1113                },
1114                execution_counter: None,
1115            }
1116        );
1117        // The fifth should have a timestamp based on the new origin.
1118        assert_eq!(
1119            store[4],
1120            StoredEvent {
1121                event: RecordedEvent {
1122                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1123                    ..not_glean_restarted.event
1124                },
1125                execution_counter: None,
1126            }
1127        );
1128    }
1129
1130    #[test]
1131    fn normalize_store_multi_run_client_clocks() {
1132        // With multiple runs of events (separated by `glean.restarted`),
1133        // ensure the timestamp math works. Even when the client clock goes backwards.
1134        let (glean, _dir) = new_glean(None);
1135
1136        let store_name = "store-name";
1137        let glean_restarted = StoredEvent {
1138            event: RecordedEvent {
1139                category: "glean".into(),
1140                name: "restarted".into(),
1141                ..Default::default()
1142            },
1143            execution_counter: None,
1144        };
1145        let not_glean_restarted = StoredEvent {
1146            event: RecordedEvent {
1147                category: "category".into(),
1148                name: "name".into(),
1149                ..Default::default()
1150            },
1151            execution_counter: None,
1152        };
1153
1154        // This scenario represents a run of two events followed by negative one hours between runs,
1155        // followed by two more events.
1156        let timestamps = [20, 40, 12, 200];
1157        let ecs = [0, 1];
1158        let some_hour = 10;
1159        let startup_date = FixedOffset::east_opt(0)
1160            .unwrap()
1161            .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) // TimeUnit::Minute -- don't put seconds
1162            .unwrap();
1163        let glean_start_time = startup_date.with_hour(some_hour + 1);
1164        let restarted_ts = 2;
1165        let mut store = vec![
1166            StoredEvent {
1167                event: RecordedEvent {
1168                    timestamp: timestamps[0],
1169                    ..not_glean_restarted.event.clone()
1170                },
1171                execution_counter: Some(ecs[0]),
1172            },
1173            StoredEvent {
1174                event: RecordedEvent {
1175                    timestamp: timestamps[1],
1176                    ..not_glean_restarted.event.clone()
1177                },
1178                execution_counter: Some(ecs[0]),
1179            },
1180            StoredEvent {
1181                event: RecordedEvent {
1182                    extra: Some(
1183                        [(
1184                            "glean.startup.date".into(),
1185                            get_iso_time_string(startup_date, TimeUnit::Minute),
1186                        )]
1187                        .into(),
1188                    ),
1189                    timestamp: restarted_ts,
1190                    ..glean_restarted.event.clone()
1191                },
1192                execution_counter: Some(ecs[1]),
1193            },
1194            StoredEvent {
1195                event: RecordedEvent {
1196                    timestamp: timestamps[2],
1197                    ..not_glean_restarted.event.clone()
1198                },
1199                execution_counter: Some(ecs[1]),
1200            },
1201            StoredEvent {
1202                event: RecordedEvent {
1203                    timestamp: timestamps[3],
1204                    ..not_glean_restarted.event.clone()
1205                },
1206                execution_counter: Some(ecs[1]),
1207            },
1208        ];
1209
1210        glean.event_storage().normalize_store(
1211            &glean,
1212            store_name,
1213            &mut store,
1214            glean_start_time.unwrap(),
1215        );
1216        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1217
1218        // Let's check the first two.
1219        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1220            assert_eq!(
1221                StoredEvent {
1222                    event: RecordedEvent {
1223                        timestamp: timestamp - timestamps[0],
1224                        ..not_glean_restarted.event.clone()
1225                    },
1226                    execution_counter: None,
1227                },
1228                event
1229            );
1230        }
1231        // The third should be a glean.restarted. Its timestamp should be
1232        // one larger than the largest timestamp seen so far (because that's
1233        // how we ensure monotonic timestamps when client clocks go backwards).
1234        assert_eq!(
1235            store[2],
1236            StoredEvent {
1237                event: RecordedEvent {
1238                    timestamp: store[1].event.timestamp + 1,
1239                    ..glean_restarted.event
1240                },
1241                execution_counter: None,
1242            }
1243        );
1244        // The fifth should have a timestamp based on the new origin.
1245        assert_eq!(
1246            store[3],
1247            StoredEvent {
1248                event: RecordedEvent {
1249                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1250                    ..not_glean_restarted.event
1251                },
1252                execution_counter: None,
1253            }
1254        );
1255        // And we should have an InvalidValue on glean.restarted to show for it.
1256        assert_eq!(
1257            Ok(1),
1258            test_get_num_recorded_errors(
1259                &glean,
1260                &CommonMetricData {
1261                    name: "restarted".into(),
1262                    category: "glean".into(),
1263                    send_in_pings: vec![store_name.into()],
1264                    lifetime: Lifetime::Ping,
1265                    ..Default::default()
1266                }
1267                .into(),
1268                ErrorType::InvalidValue
1269            )
1270        );
1271    }
1272
1273    #[test]
1274    fn normalize_store_non_zero_ec() {
1275        // After the first run, execution_counter will likely be non-zero.
1276        // Ensure normalizing a store that begins with non-zero ec works.
1277        let (glean, _dir) = new_glean(None);
1278
1279        let store_name = "store-name";
1280        let glean_restarted = StoredEvent {
1281            event: RecordedEvent {
1282                timestamp: 2,
1283                category: "glean".into(),
1284                name: "restarted".into(),
1285                extra: None,
1286            },
1287            execution_counter: Some(2),
1288        };
1289        let not_glean_restarted = StoredEvent {
1290            event: RecordedEvent {
1291                timestamp: 20,
1292                category: "category".into(),
1293                name: "name".into(),
1294                extra: None,
1295            },
1296            execution_counter: Some(2),
1297        };
1298        let glean_restarted_2 = StoredEvent {
1299            event: RecordedEvent {
1300                timestamp: 2,
1301                category: "glean".into(),
1302                name: "restarted".into(),
1303                extra: None,
1304            },
1305            execution_counter: Some(3),
1306        };
1307        let mut store = vec![
1308            glean_restarted,
1309            not_glean_restarted.clone(),
1310            glean_restarted_2,
1311        ];
1312        let glean_start_time = glean.start_time();
1313
1314        glean
1315            .event_storage()
1316            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1317
1318        assert_eq!(1, store.len());
1319        assert_eq!(
1320            StoredEvent {
1321                event: RecordedEvent {
1322                    timestamp: 0,
1323                    ..not_glean_restarted.event
1324                },
1325                execution_counter: None
1326            },
1327            store[0]
1328        );
1329        // And we should have no InvalidState errors on glean.restarted.
1330        assert!(test_get_num_recorded_errors(
1331            &glean,
1332            &CommonMetricData {
1333                name: "restarted".into(),
1334                category: "glean".into(),
1335                send_in_pings: vec![store_name.into()],
1336                lifetime: Lifetime::Ping,
1337                ..Default::default()
1338            }
1339            .into(),
1340            ErrorType::InvalidState
1341        )
1342        .is_err());
1343        // (and, just because we're here, double-check there are no InvalidValue either).
1344        assert!(test_get_num_recorded_errors(
1345            &glean,
1346            &CommonMetricData {
1347                name: "restarted".into(),
1348                category: "glean".into(),
1349                send_in_pings: vec![store_name.into()],
1350                lifetime: Lifetime::Ping,
1351                ..Default::default()
1352            }
1353            .into(),
1354            ErrorType::InvalidValue
1355        )
1356        .is_err());
1357    }
1358}