Skip to main content

glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::session::{EventSessionContext, SessionMetadata};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33/// Represents the recorded data for a single event.
34#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37    /// The timestamp of when the event was recorded.
38    ///
39    /// This allows to order events from a single process run.
40    pub timestamp: u64,
41
42    /// The event's category.
43    ///
44    /// This is defined by users in the metrics file.
45    pub category: String,
46
47    /// The event's name.
48    ///
49    /// This is defined by users in the metrics file.
50    pub name: String,
51
52    /// A map of all extra data values.
53    ///
54    /// The set of allowed extra keys is defined by users in the metrics file.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub extra: Option<HashMap<String, String>>,
57
58    /// Session metadata attached to this event.
59    ///
60    /// `None` for out-of-session events and events recorded before
61    /// sessions were introduced (backwards compatibility).
62    #[serde(skip_serializing_if = "Option::is_none")]
63    #[serde(default)]
64    pub session: Option<SessionMetadata>,
65}
66
67/// Represents the stored data for a single event.
68#[derive(
69    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
70)]
71struct StoredEvent {
72    #[serde(flatten)]
73    event: RecordedEvent,
74
75    /// The monotonically-increasing execution counter.
76    ///
77    /// Included to allow sending of events across Glean restarts (bug 1716725).
78    /// Is i32 because it is stored in a CounterMetric.
79    #[serde(default)]
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub execution_counter: Option<i32>,
82}
83
84/// This struct handles the in-memory and on-disk storage logic for events.
85///
86/// So that the data survives shutting down of the application, events are stored
87/// in an append-only file on disk, in addition to the store in memory. Each line
88/// of this file records a single event in JSON, exactly as it will be sent in the
89/// ping. There is one file per store.
90///
91/// When restarting the application, these on-disk files are checked, and if any are
92/// found, they are loaded, and a `glean.restarted` event is added before any
93/// further events are collected. This is because the timestamps for these events
94/// may have come from a previous boot of the device, and therefore will not be
95/// compatible with any newly-collected events.
96///
97/// Normalizing all these timestamps happens on serialization for submission (see
98/// `serialize_as_json`) where the client time between restarts is calculated using
99/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
100/// the `execution_counter` stored in events on disk.
101///
102/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
103/// The `glean.restarted` event is, though.
104/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
105#[derive(Debug)]
106pub struct EventDatabase {
107    /// Path to directory of on-disk event files
108    pub path: PathBuf,
109    /// The in-memory list of events
110    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
111    event_store_files: RwLock<HashMap<String, Arc<File>>>,
112    /// A lock to be held when doing operations on the filesystem
113    file_lock: Mutex<()>,
114}
115
116impl MallocSizeOf for EventDatabase {
117    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
118        let mut n = 0;
119        n += self.event_stores.read().unwrap().size_of(ops);
120
121        let map = self.event_store_files.read().unwrap();
122        for store_name in map.keys() {
123            n += store_name.size_of(ops);
124            // `File` doesn't allocate, but `Arc` puts it on the heap.
125            n += mem::size_of::<File>();
126        }
127        n
128    }
129}
130
131impl EventDatabase {
132    /// Creates a new event database.
133    ///
134    /// # Arguments
135    ///
136    /// * `data_path` - The directory to store events in. A new directory
137    /// * `events` - will be created inside of this directory.
138    pub fn new(data_path: &Path) -> Result<Self> {
139        let path = data_path.join("events");
140        create_dir_all(&path)?;
141
142        Ok(Self {
143            path,
144            event_stores: RwLock::new(HashMap::new()),
145            event_store_files: RwLock::new(HashMap::new()),
146            file_lock: Mutex::new(()),
147        })
148    }
149
150    /// Initializes events storage after Glean is fully initialized and ready to send pings.
151    ///
152    /// This must be called once on application startup, e.g. from
153    /// [Glean.initialize], but after we are ready to send pings, since this
154    /// could potentially collect and send the "events" ping.
155    ///
156    /// If there are any events queued on disk, it loads them into memory so
157    /// that the memory and disk representations are in sync.
158    ///
159    /// If event records for the "events" ping are present, they are assembled into
160    /// an "events" ping which is submitted immediately with reason "startup".
161    ///
162    /// If event records for custom pings are present, we increment the custom pings'
163    /// stores' `execution_counter` and record a `glean.restarted`
164    /// event with the current client clock in its `glean.startup.date` extra.
165    ///
166    /// # Arguments
167    ///
168    /// * `glean` - The Glean instance.
169    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
170    ///   any events not belonging to pings previously registered via `register_ping_type`.
171    ///
172    /// # Returns
173    ///
174    /// Whether the "events" ping was submitted.
175    pub fn flush_pending_events_on_startup(
176        &self,
177        glean: &Glean,
178        trim_data_to_registered_pings: bool,
179    ) -> bool {
180        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
181            Ok(_) => {
182                let stores_with_events: Vec<String> = {
183                    self.event_stores
184                        .read()
185                        .unwrap()
186                        .keys()
187                        .map(|x| x.to_owned())
188                        .collect() // safe unwrap, only error case is poisoning
189                };
190                // We do not want to be holding the event stores lock when
191                // submitting a ping or recording new events.
192                let has_events_events = stores_with_events.contains(&"events".to_owned());
193                let glean_restarted_stores = if has_events_events {
194                    stores_with_events
195                        .into_iter()
196                        .filter(|store| store != "events")
197                        .collect()
198                } else {
199                    stores_with_events
200                };
201                if !glean_restarted_stores.is_empty() {
202                    for store_name in glean_restarted_stores.iter() {
203                        CounterMetric::new(CommonMetricData {
204                            name: "execution_counter".into(),
205                            category: store_name.into(),
206                            send_in_pings: vec![INTERNAL_STORAGE.into()],
207                            lifetime: Lifetime::Ping,
208                            ..Default::default()
209                        })
210                        .add_sync(glean, 1);
211                    }
212                    let glean_restarted = CommonMetricData {
213                        name: "restarted".into(),
214                        category: "glean".into(),
215                        send_in_pings: glean_restarted_stores,
216                        lifetime: Lifetime::Ping,
217                        ..Default::default()
218                    };
219                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
220                    let mut extra: HashMap<String, String> =
221                        [("glean.startup.date".into(), startup)].into();
222                    if glean.with_timestamps() {
223                        let now = Utc::now();
224                        let precise_timestamp = now.timestamp_millis() as u64;
225                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
226                    }
227                    self.record(
228                        glean,
229                        &glean_restarted.into(),
230                        crate::get_timestamp_ms(),
231                        Some(extra),
232                        EventSessionContext::OutOfSession,
233                    );
234                }
235                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
236            }
237            Err(err) => {
238                log::warn!("Error loading events from disk: {}", err);
239                false
240            }
241        }
242    }
243
244    fn load_events_from_disk(
245        &self,
246        glean: &Glean,
247        trim_data_to_registered_pings: bool,
248    ) -> Result<()> {
249        // NOTE: The order of locks here is important.
250        // In other code parts we might acquire the `file_lock` when we already have acquired
251        // a lock on `event_stores`.
252        // This is a potential lock-order-inversion.
253        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
254        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
255
256        for entry in fs::read_dir(&self.path)? {
257            let entry = entry?;
258            if entry.file_type()?.is_file() {
259                let store_name = entry.file_name().into_string()?;
260                log::info!("Loading events for {}", store_name);
261                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
262                    log::warn!("Trimming {}'s events", store_name);
263                    if let Err(err) = fs::remove_file(entry.path()) {
264                        match err.kind() {
265                            std::io::ErrorKind::NotFound => {
266                                // silently drop this error, the file was already non-existing
267                            }
268                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
269                        }
270                    }
271                    continue;
272                }
273                let file = BufReader::new(File::open(entry.path())?);
274                db.insert(
275                    store_name,
276                    file.lines()
277                        .map_while(Result::ok)
278                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
279                        .collect(),
280                );
281            }
282        }
283        Ok(())
284    }
285
286    /// Records an event in the desired stores.
287    ///
288    /// # Arguments
289    ///
290    /// * `glean` - The Glean instance.
291    /// * `meta` - The metadata about the event metric. Used to get the category,
292    ///   name and stores for the metric.
293    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
294    ///   monotonically increasing timer (this value is obtained on the
295    ///   platform-specific side).
296    /// * `extra` - Extra data values, mapping strings to strings.
297    /// * `ctx` - The event's session context, conveying both whether session
298    ///   metadata should be attached and what that metadata is.
299    ///
300    /// ## Returns
301    ///
302    /// `true` if a ping was submitted and should be uploaded.
303    /// `false` otherwise.
304    pub fn record(
305        &self,
306        glean: &Glean,
307        meta: &CommonMetricDataInternal,
308        timestamp: u64,
309        extra: Option<HashMap<String, String>>,
310        ctx: EventSessionContext,
311    ) -> bool {
312        // If upload is disabled we don't want to record.
313        if !glean.is_upload_enabled() {
314            return false;
315        }
316
317        // Convert the session context to the optional metadata stored on the event.
318        let session = match ctx {
319            EventSessionContext::OutOfSession => None,
320            EventSessionContext::InSession(session_meta) => Some(session_meta),
321        };
322
323        let mut submit_max_capacity_event_ping = false;
324        {
325            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
326            for store_name in meta.inner.send_in_pings.iter() {
327                if !glean.is_ping_enabled(store_name) {
328                    continue;
329                }
330
331                let store = db.entry(store_name.to_string()).or_default();
332                let execution_counter = CounterMetric::new(CommonMetricData {
333                    name: "execution_counter".into(),
334                    category: store_name.into(),
335                    send_in_pings: vec![INTERNAL_STORAGE.into()],
336                    lifetime: Lifetime::Ping,
337                    ..Default::default()
338                })
339                .get_value(glean, INTERNAL_STORAGE);
340                // Create StoredEvent object, and its JSON form for serialization on disk.
341                let event = StoredEvent {
342                    event: RecordedEvent {
343                        timestamp,
344                        category: meta.inner.category.to_string(),
345                        name: meta.inner.name.to_string(),
346                        extra: extra.clone(),
347                        session: session.clone(),
348                    },
349                    execution_counter,
350                };
351                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
352                store.push(event);
353                self.write_event_to_disk(store_name, &event_json);
354                if store_name == "events" && store.len() == glean.get_max_events() {
355                    submit_max_capacity_event_ping = true;
356                }
357            }
358        }
359        if submit_max_capacity_event_ping {
360            glean.submit_ping_by_name("events", Some("max_capacity"));
361            true
362        } else {
363            false
364        }
365    }
366
367    fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
368        // safe unwrap, only error case is poisoning
369        let mut map = self.event_store_files.write().unwrap();
370        let entry = map.entry(store_name.to_string());
371
372        match entry {
373            Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
374            Entry::Vacant(entry) => {
375                let file = OpenOptions::new()
376                    .create(true)
377                    .append(true)
378                    .open(self.path.join(store_name))?;
379                let file = Arc::new(file);
380                let entry = entry.insert(file);
381                Ok(Arc::clone(entry))
382            }
383        }
384    }
385
386    /// Writes an event to a single store on disk.
387    ///
388    /// # Arguments
389    ///
390    /// * `store_name` - The name of the store.
391    /// * `event_json` - The event content, as a single-line JSON-encoded string.
392    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
393        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
394
395        let write_res = (|| {
396            let mut file = self.get_event_store(store_name)?;
397            file.write_all(event_json.as_bytes())?;
398            file.write_all(b"\n")?;
399            file.flush()?;
400            Ok::<(), std::io::Error>(())
401        })();
402
403        if let Err(err) = write_res {
404            log::warn!("IO error writing event to store '{}': {}", store_name, err);
405        }
406    }
407
408    /// Normalizes the store in-place.
409    ///
410    /// A store may be in any order and contain any number of `glean.restarted` events,
411    /// whose values must be taken into account, along with `execution_counter` values,
412    /// to come up with the correct events with correct `timestamp` values,
413    /// on which we then sort.
414    ///
415    /// 1. Sort by `execution_counter` and `timestamp`,
416    ///    breaking ties so that `glean.restarted` comes first.
417    /// 2. Remove all initial and final `glean.restarted` events
418    /// 3. For each group of events that share a `execution_counter`,
419    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
420    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
421    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
422    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
423    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
424    /// 5. Sort by `timestamp`
425    ///
426    /// In the event that something goes awry, this will record an invalid_state on
427    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
428    /// on client clock weirdness.
429    ///
430    /// # Arguments
431    ///
432    /// * `glean` - Used to report errors
433    /// * `store_name` - The name of the store we're normalizing.
434    /// * `store` - The store we're to normalize.
435    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
436    fn normalize_store(
437        &self,
438        glean: &Glean,
439        store_name: &str,
440        store: &mut Vec<StoredEvent>,
441        glean_start_time: DateTime<FixedOffset>,
442    ) {
443        let is_glean_restarted =
444            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
445        let glean_restarted_meta = |store_name: &str| CommonMetricData {
446            name: "restarted".into(),
447            category: "glean".into(),
448            send_in_pings: vec![store_name.into()],
449            lifetime: Lifetime::Ping,
450            ..Default::default()
451        };
452        // Step 1
453        store.sort_by(|a, b| {
454            a.execution_counter
455                .cmp(&b.execution_counter)
456                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
457                .then_with(|| {
458                    if is_glean_restarted(&a.event) {
459                        Ordering::Less
460                    } else {
461                        Ordering::Greater
462                    }
463                })
464        });
465        // Step 2
466        // Find the index of the first and final non-`glean.restarted` events.
467        // Remove events before the first and after the final.
468        let final_event = match store
469            .iter()
470            .rposition(|event| !is_glean_restarted(&event.event))
471        {
472            Some(idx) => idx + 1,
473            _ => 0,
474        };
475        store.drain(final_event..);
476        let first_event = store
477            .iter()
478            .position(|event| !is_glean_restarted(&event.event))
479            .unwrap_or(store.len());
480        store.drain(..first_event);
481        if store.is_empty() {
482            // There was nothing but `glean.restarted` events. Job's done!
483            return;
484        }
485        // Step 3
486        // It is allowed that there might not be any `glean.restarted` event, nor
487        // `execution_counter` extra values. (This should always be the case for the
488        // "events" ping, for instance).
489        // Other inconsistencies are evidence of errors, and so are logged.
490        let mut cur_ec = 0;
491        // The offset within a group of events with the same `execution_counter`.
492        let mut intra_group_offset = store[0].event.timestamp;
493        // The offset between this group and ping_info.start_date.
494        let mut inter_group_offset = 0;
495        let mut highest_ts = 0;
496        for event in store.iter_mut() {
497            let execution_counter = event.execution_counter.take().unwrap_or(0);
498            if is_glean_restarted(&event.event) {
499                // We've entered the next "event group".
500                // We need a new epoch based on glean.startup.date - ping_info.start_date
501                cur_ec = execution_counter;
502                let glean_startup_date = event
503                    .event
504                    .extra
505                    .as_mut()
506                    .and_then(|extra| {
507                        extra.remove("glean.startup.date").and_then(|date_str| {
508                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
509                                .map_err(|_| {
510                                    record_error(
511                                        glean,
512                                        &glean_restarted_meta(store_name).into(),
513                                        ErrorType::InvalidState,
514                                        format!("Unparseable glean.startup.date '{}'", date_str),
515                                        None,
516                                    );
517                                })
518                                .ok()
519                        })
520                    })
521                    .unwrap_or(glean_start_time);
522                if event
523                    .event
524                    .extra
525                    .as_ref()
526                    .is_some_and(|extra| extra.is_empty())
527                {
528                    // Small optimization to save us sending empty dicts.
529                    event.event.extra = None;
530                }
531                let ping_start = DatetimeMetric::new(
532                    CommonMetricData {
533                        name: format!("{}#start", store_name),
534                        category: "".into(),
535                        send_in_pings: vec![INTERNAL_STORAGE.into()],
536                        lifetime: Lifetime::User,
537                        ..Default::default()
538                    },
539                    TimeUnit::Minute,
540                );
541                let ping_start = ping_start
542                    .get_value(glean, INTERNAL_STORAGE)
543                    .unwrap_or(glean_start_time);
544                let time_from_ping_start_to_glean_restarted =
545                    (glean_startup_date - ping_start).num_milliseconds();
546                intra_group_offset = event.event.timestamp;
547                inter_group_offset =
548                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
549                if inter_group_offset < highest_ts {
550                    record_error(
551                        glean,
552                        &glean_restarted_meta(store_name).into(),
553                        ErrorType::InvalidValue,
554                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
555                        None,
556                    );
557                    // The client's clock went backwards enough that this event group's
558                    // glean.restarted looks like it happened _before_ the final event of the previous group.
559                    // Or, it went ahead enough to overflow u64.
560                    // Adjust things so this group starts 1ms after the previous one.
561                    inter_group_offset = highest_ts + 1;
562                }
563            } else if cur_ec == 0 {
564                // bug 1811872 - cur_ec might need initialization.
565                cur_ec = execution_counter;
566            }
567            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
568            if execution_counter != cur_ec {
569                record_error(
570                    glean,
571                    &glean_restarted_meta(store_name).into(),
572                    ErrorType::InvalidState,
573                    format!(
574                        "Inconsistent execution counter {} (expected {})",
575                        execution_counter, cur_ec
576                    ),
577                    None,
578                );
579                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
580                cur_ec = execution_counter;
581            }
582
583            // event timestamp is a `u64`, but BigQuery uses `i64` (signed!) everywhere. Let's clamp the value to make
584            // sure we stay within bounds.
585            if event.event.timestamp > i64::MAX as u64 {
586                glean
587                    .additional_metrics
588                    .event_timestamp_clamped
589                    .add_sync(glean, 1);
590                log::warn!(
591                    "Calculated event timestamp was too high. Got: {}, max: {}",
592                    event.event.timestamp,
593                    i64::MAX,
594                );
595                event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
596            }
597
598            if highest_ts > event.event.timestamp {
599                // Even though we sorted everything, something in the
600                // execution_counter or glean.startup.date math went awry.
601                record_error(
602                    glean,
603                    &glean_restarted_meta(store_name).into(),
604                    ErrorType::InvalidState,
605                    format!(
606                        "Inconsistent previous highest timestamp {} (expected <= {})",
607                        highest_ts, event.event.timestamp
608                    ),
609                    None,
610                );
611                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
612            }
613            highest_ts = event.event.timestamp
614        }
615    }
616
617    /// Gets a snapshot of the stored event data as a JsonValue.
618    ///
619    /// # Arguments
620    ///
621    /// * `glean` - the Glean instance.
622    /// * `store_name` - The name of the desired store.
623    /// * `clear_store` - Whether to clear the store after snapshotting.
624    ///
625    /// # Returns
626    ///
627    /// A array of events, JSON encoded, if any. Otherwise `None`.
628    pub fn snapshot_as_json(
629        &self,
630        glean: &Glean,
631        store_name: &str,
632        clear_store: bool,
633    ) -> Option<JsonValue> {
634        let result = {
635            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
636            db.get_mut(&store_name.to_string()).and_then(|store| {
637                if !store.is_empty() {
638                    // Normalization happens in-place, so if we're not clearing,
639                    // operate on a clone.
640                    let mut clone;
641                    let store = if clear_store {
642                        store
643                    } else {
644                        clone = store.clone();
645                        &mut clone
646                    };
647                    // We may need to normalize event timestamps across multiple restarts.
648                    self.normalize_store(glean, store_name, store, glean.start_time());
649                    Some(json!(store))
650                } else {
651                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
652                    None
653                }
654            })
655        };
656
657        if clear_store {
658            self.event_stores
659                .write()
660                .unwrap() // safe unwrap, only error case is poisoning
661                .remove(&store_name.to_string());
662            self.event_store_files
663                .write()
664                .unwrap() // safe unwrap, only error case is poisoning
665                .remove(&store_name.to_string());
666
667            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
668            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
669                match err.kind() {
670                    std::io::ErrorKind::NotFound => {
671                        // silently drop this error, the file was already non-existing
672                    }
673                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
674                }
675            }
676        }
677
678        result
679    }
680
681    /// Clears all stored events, both in memory and on-disk.
682    pub fn clear_all(&self) -> Result<()> {
683        // safe unwrap, only error case is poisoning
684        self.event_stores.write().unwrap().clear();
685        self.event_store_files.write().unwrap().clear();
686
687        // safe unwrap, only error case is poisoning
688        let _lock = self.file_lock.lock().unwrap();
689        std::fs::remove_dir_all(&self.path)?;
690        create_dir_all(&self.path)?;
691
692        Ok(())
693    }
694
695    /// **Test-only API (exported for FFI purposes).**
696    ///
697    /// Gets the vector of currently stored events for the given event metric in
698    /// the given store.
699    ///
700    /// This doesn't clear the stored value.
701    pub fn test_get_value<'a>(
702        &'a self,
703        meta: &'a CommonMetricDataInternal,
704        store_name: &str,
705    ) -> Option<Vec<RecordedEvent>> {
706        let value: Vec<RecordedEvent> = self
707            .event_stores
708            .read()
709            .unwrap() // safe unwrap, only error case is poisoning
710            .get(&store_name.to_string())
711            .into_iter()
712            .flatten()
713            .map(|stored_event| stored_event.event.clone())
714            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
715            .collect();
716        if !value.is_empty() {
717            Some(value)
718        } else {
719            None
720        }
721    }
722}
723
724#[cfg(test)]
725mod test {
726    use super::*;
727    use crate::test_get_num_recorded_errors;
728    use crate::tests::new_glean;
729    use chrono::{TimeZone, Timelike};
730
731    #[test]
732    fn handle_truncated_events_on_disk() {
733        let (glean, t) = new_glean(None);
734
735        {
736            let db = EventDatabase::new(t.path()).unwrap();
737            db.write_event_to_disk("events", "{\"timestamp\": 500");
738            db.write_event_to_disk("events", "{\"timestamp\"");
739            db.write_event_to_disk(
740                "events",
741                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
742            );
743        }
744
745        {
746            let db = EventDatabase::new(t.path()).unwrap();
747            db.load_events_from_disk(&glean, false).unwrap();
748            let events = &db.event_stores.read().unwrap()["events"];
749            assert_eq!(1, events.len());
750        }
751    }
752
753    #[test]
754    fn stable_serialization() {
755        let event_empty = RecordedEvent {
756            timestamp: 2,
757            category: "cat".to_string(),
758            name: "name".to_string(),
759            extra: None,
760            session: None,
761        };
762
763        let mut data = HashMap::new();
764        data.insert("a key".to_string(), "a value".to_string());
765        let event_data = RecordedEvent {
766            timestamp: 2,
767            category: "cat".to_string(),
768            name: "name".to_string(),
769            extra: Some(data),
770            session: None,
771        };
772
773        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
774        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
775
776        assert_eq!(
777            StoredEvent {
778                event: event_empty,
779                execution_counter: None
780            },
781            serde_json::from_str(&event_empty_json).unwrap()
782        );
783        assert_eq!(
784            StoredEvent {
785                event: event_data,
786                execution_counter: None
787            },
788            serde_json::from_str(&event_data_json).unwrap()
789        );
790    }
791
792    #[test]
793    fn deserialize_existing_data() {
794        let event_empty_json = r#"
795{
796  "timestamp": 2,
797  "category": "cat",
798  "name": "name"
799}
800            "#;
801
802        let event_data_json = r#"
803{
804  "timestamp": 2,
805  "category": "cat",
806  "name": "name",
807  "extra": {
808    "a key": "a value"
809  }
810}
811        "#;
812
813        let event_empty = RecordedEvent {
814            timestamp: 2,
815            category: "cat".to_string(),
816            name: "name".to_string(),
817            extra: None,
818            session: None,
819        };
820
821        let mut data = HashMap::new();
822        data.insert("a key".to_string(), "a value".to_string());
823        let event_data = RecordedEvent {
824            timestamp: 2,
825            category: "cat".to_string(),
826            name: "name".to_string(),
827            extra: Some(data),
828            session: None,
829        };
830
831        assert_eq!(
832            StoredEvent {
833                event: event_empty,
834                execution_counter: None
835            },
836            serde_json::from_str(event_empty_json).unwrap()
837        );
838        assert_eq!(
839            StoredEvent {
840                event: event_data,
841                execution_counter: None
842            },
843            serde_json::from_str(event_data_json).unwrap()
844        );
845    }
846
847    #[test]
848    fn doesnt_record_when_upload_is_disabled() {
849        let (mut glean, dir) = new_glean(None);
850        let db = EventDatabase::new(dir.path()).unwrap();
851
852        let test_storage = "store1";
853        let test_category = "category";
854        let test_name = "name";
855        let test_timestamp = 2;
856        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
857        let event_data = RecordedEvent {
858            timestamp: test_timestamp,
859            category: test_category.to_string(),
860            name: test_name.to_string(),
861            extra: None,
862            session: None,
863        };
864
865        // Upload is not yet disabled,
866        // so let's check that everything is getting recorded as expected.
867        db.record(
868            &glean,
869            &test_meta,
870            2,
871            None,
872            EventSessionContext::OutOfSession,
873        );
874        {
875            let event_stores = db.event_stores.read().unwrap();
876            assert_eq!(
877                &StoredEvent {
878                    event: event_data,
879                    execution_counter: None
880                },
881                &event_stores.get(test_storage).unwrap()[0]
882            );
883            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
884        }
885
886        glean.set_upload_enabled(false);
887
888        // Now that upload is disabled, let's check nothing is recorded.
889        db.record(
890            &glean,
891            &test_meta,
892            2,
893            None,
894            EventSessionContext::OutOfSession,
895        );
896        {
897            let event_stores = db.event_stores.read().unwrap();
898            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
899        }
900    }
901
902    #[test]
903    fn normalize_store_of_glean_restarted() {
904        // Make sure stores empty of anything but glean.restarted events normalize without issue.
905        let (glean, _dir) = new_glean(None);
906
907        let store_name = "store-name";
908        let glean_restarted = StoredEvent {
909            event: RecordedEvent {
910                timestamp: 2,
911                category: "glean".into(),
912                name: "restarted".into(),
913                extra: None,
914                session: None,
915            },
916            execution_counter: None,
917        };
918        let mut store = vec![glean_restarted.clone()];
919        let glean_start_time = glean.start_time();
920
921        glean
922            .event_storage()
923            .normalize_store(&glean, store_name, &mut store, glean_start_time);
924        assert!(store.is_empty());
925
926        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
927        glean
928            .event_storage()
929            .normalize_store(&glean, store_name, &mut store, glean_start_time);
930        assert!(store.is_empty());
931
932        let mut store = vec![
933            glean_restarted.clone(),
934            glean_restarted.clone(),
935            glean_restarted,
936        ];
937        glean
938            .event_storage()
939            .normalize_store(&glean, store_name, &mut store, glean_start_time);
940        assert!(store.is_empty());
941    }
942
943    #[test]
944    fn normalize_store_of_glean_restarted_on_both_ends() {
945        // Make sure stores with non-glean.restarted events don't get drained too far.
946        let (glean, _dir) = new_glean(None);
947
948        let store_name = "store-name";
949        let glean_restarted = StoredEvent {
950            event: RecordedEvent {
951                timestamp: 2,
952                category: "glean".into(),
953                name: "restarted".into(),
954                extra: None,
955                session: None,
956            },
957            execution_counter: None,
958        };
959        let not_glean_restarted = StoredEvent {
960            event: RecordedEvent {
961                timestamp: 20,
962                category: "category".into(),
963                name: "name".into(),
964                extra: None,
965                session: None,
966            },
967            execution_counter: None,
968        };
969        let mut store = vec![
970            glean_restarted.clone(),
971            not_glean_restarted.clone(),
972            glean_restarted,
973        ];
974        let glean_start_time = glean.start_time();
975
976        glean
977            .event_storage()
978            .normalize_store(&glean, store_name, &mut store, glean_start_time);
979        assert_eq!(1, store.len());
980        assert_eq!(
981            StoredEvent {
982                event: RecordedEvent {
983                    timestamp: 0,
984                    ..not_glean_restarted.event
985                },
986                execution_counter: None
987            },
988            store[0]
989        );
990    }
991
992    #[test]
993    fn normalize_store_single_run_timestamp_math() {
994        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
995        // ensure the timestamp math works.
996        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
997        let (glean, _dir) = new_glean(None);
998
999        let store_name = "store-name";
1000        let glean_restarted = StoredEvent {
1001            event: RecordedEvent {
1002                timestamp: 2,
1003                category: "glean".into(),
1004                name: "restarted".into(),
1005                extra: None,
1006                session: None,
1007            },
1008            execution_counter: None,
1009        };
1010        let timestamps = [20, 40, 200];
1011        let not_glean_restarted = StoredEvent {
1012            event: RecordedEvent {
1013                timestamp: timestamps[0],
1014                category: "category".into(),
1015                name: "name".into(),
1016                extra: None,
1017                session: None,
1018            },
1019            execution_counter: None,
1020        };
1021        let mut store = vec![
1022            glean_restarted.clone(),
1023            not_glean_restarted.clone(),
1024            StoredEvent {
1025                event: RecordedEvent {
1026                    timestamp: timestamps[1],
1027                    ..not_glean_restarted.event.clone()
1028                },
1029                execution_counter: None,
1030            },
1031            StoredEvent {
1032                event: RecordedEvent {
1033                    timestamp: timestamps[2],
1034                    ..not_glean_restarted.event.clone()
1035                },
1036                execution_counter: None,
1037            },
1038            glean_restarted,
1039        ];
1040
1041        glean
1042            .event_storage()
1043            .normalize_store(&glean, store_name, &mut store, glean.start_time());
1044        assert_eq!(3, store.len());
1045        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1046            assert_eq!(
1047                &StoredEvent {
1048                    event: RecordedEvent {
1049                        timestamp: timestamp - timestamps[0],
1050                        ..not_glean_restarted.clone().event
1051                    },
1052                    execution_counter: None
1053                },
1054                event
1055            );
1056        }
1057    }
1058
1059    #[test]
1060    fn normalize_store_multi_run_timestamp_math() {
1061        // With multiple runs of events (separated by `glean.restarted`),
1062        // ensure the timestamp math works.
1063        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
1064        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
1065        let (glean, _dir) = new_glean(None);
1066
1067        let store_name = "store-name";
1068        let glean_restarted = StoredEvent {
1069            event: RecordedEvent {
1070                category: "glean".into(),
1071                name: "restarted".into(),
1072                ..Default::default()
1073            },
1074            execution_counter: None,
1075        };
1076        let not_glean_restarted = StoredEvent {
1077            event: RecordedEvent {
1078                category: "category".into(),
1079                name: "name".into(),
1080                ..Default::default()
1081            },
1082            execution_counter: None,
1083        };
1084
1085        // This scenario represents a run of three events followed by an hour between runs,
1086        // followed by one final event.
1087        let timestamps = [20, 40, 200, 12];
1088        let ecs = [0, 1];
1089        let some_hour = 16;
1090        let startup_date = FixedOffset::east_opt(0)
1091            .unwrap()
1092            .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) // TimeUnit::Minute -- don't put seconds
1093            .unwrap();
1094        let glean_start_time = startup_date.with_hour(some_hour - 1);
1095        let restarted_ts = 2;
1096        let mut store = vec![
1097            StoredEvent {
1098                event: RecordedEvent {
1099                    timestamp: timestamps[0],
1100                    ..not_glean_restarted.event.clone()
1101                },
1102                execution_counter: Some(ecs[0]),
1103            },
1104            StoredEvent {
1105                event: RecordedEvent {
1106                    timestamp: timestamps[1],
1107                    ..not_glean_restarted.event.clone()
1108                },
1109                execution_counter: Some(ecs[0]),
1110            },
1111            StoredEvent {
1112                event: RecordedEvent {
1113                    timestamp: timestamps[2],
1114                    ..not_glean_restarted.event.clone()
1115                },
1116                execution_counter: Some(ecs[0]),
1117            },
1118            StoredEvent {
1119                event: RecordedEvent {
1120                    extra: Some(
1121                        [(
1122                            "glean.startup.date".into(),
1123                            get_iso_time_string(startup_date, TimeUnit::Minute),
1124                        )]
1125                        .into(),
1126                    ),
1127                    timestamp: restarted_ts,
1128                    ..glean_restarted.event.clone()
1129                },
1130                execution_counter: Some(ecs[1]),
1131            },
1132            StoredEvent {
1133                event: RecordedEvent {
1134                    timestamp: timestamps[3],
1135                    ..not_glean_restarted.event.clone()
1136                },
1137                execution_counter: Some(ecs[1]),
1138            },
1139        ];
1140
1141        glean.event_storage().normalize_store(
1142            &glean,
1143            store_name,
1144            &mut store,
1145            glean_start_time.unwrap(),
1146        );
1147        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1148
1149        // Let's check the first three.
1150        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1151            assert_eq!(
1152                StoredEvent {
1153                    event: RecordedEvent {
1154                        timestamp: timestamp - timestamps[0],
1155                        ..not_glean_restarted.event.clone()
1156                    },
1157                    execution_counter: None,
1158                },
1159                event
1160            );
1161        }
1162        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1163        let hour_in_millis = 3600000;
1164        assert_eq!(
1165            store[3],
1166            StoredEvent {
1167                event: RecordedEvent {
1168                    timestamp: hour_in_millis,
1169                    ..glean_restarted.event
1170                },
1171                execution_counter: None,
1172            }
1173        );
1174        // The fifth should have a timestamp based on the new origin.
1175        assert_eq!(
1176            store[4],
1177            StoredEvent {
1178                event: RecordedEvent {
1179                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1180                    ..not_glean_restarted.event
1181                },
1182                execution_counter: None,
1183            }
1184        );
1185    }
1186
1187    #[test]
1188    fn normalize_store_multi_run_client_clocks() {
1189        // With multiple runs of events (separated by `glean.restarted`),
1190        // ensure the timestamp math works. Even when the client clock goes backwards.
1191        let (glean, _dir) = new_glean(None);
1192
1193        let store_name = "store-name";
1194        let glean_restarted = StoredEvent {
1195            event: RecordedEvent {
1196                category: "glean".into(),
1197                name: "restarted".into(),
1198                ..Default::default()
1199            },
1200            execution_counter: None,
1201        };
1202        let not_glean_restarted = StoredEvent {
1203            event: RecordedEvent {
1204                category: "category".into(),
1205                name: "name".into(),
1206                ..Default::default()
1207            },
1208            execution_counter: None,
1209        };
1210
1211        // This scenario represents a run of two events followed by negative one hours between runs,
1212        // followed by two more events.
1213        let timestamps = [20, 40, 12, 200];
1214        let ecs = [0, 1];
1215        let some_hour = 10;
1216        let startup_date = FixedOffset::east_opt(0)
1217            .unwrap()
1218            .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) // TimeUnit::Minute -- don't put seconds
1219            .unwrap();
1220        let glean_start_time = startup_date.with_hour(some_hour + 1);
1221        let restarted_ts = 2;
1222        let mut store = vec![
1223            StoredEvent {
1224                event: RecordedEvent {
1225                    timestamp: timestamps[0],
1226                    ..not_glean_restarted.event.clone()
1227                },
1228                execution_counter: Some(ecs[0]),
1229            },
1230            StoredEvent {
1231                event: RecordedEvent {
1232                    timestamp: timestamps[1],
1233                    ..not_glean_restarted.event.clone()
1234                },
1235                execution_counter: Some(ecs[0]),
1236            },
1237            StoredEvent {
1238                event: RecordedEvent {
1239                    extra: Some(
1240                        [(
1241                            "glean.startup.date".into(),
1242                            get_iso_time_string(startup_date, TimeUnit::Minute),
1243                        )]
1244                        .into(),
1245                    ),
1246                    timestamp: restarted_ts,
1247                    ..glean_restarted.event.clone()
1248                },
1249                execution_counter: Some(ecs[1]),
1250            },
1251            StoredEvent {
1252                event: RecordedEvent {
1253                    timestamp: timestamps[2],
1254                    ..not_glean_restarted.event.clone()
1255                },
1256                execution_counter: Some(ecs[1]),
1257            },
1258            StoredEvent {
1259                event: RecordedEvent {
1260                    timestamp: timestamps[3],
1261                    ..not_glean_restarted.event.clone()
1262                },
1263                execution_counter: Some(ecs[1]),
1264            },
1265        ];
1266
1267        glean.event_storage().normalize_store(
1268            &glean,
1269            store_name,
1270            &mut store,
1271            glean_start_time.unwrap(),
1272        );
1273        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1274
1275        // Let's check the first two.
1276        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1277            assert_eq!(
1278                StoredEvent {
1279                    event: RecordedEvent {
1280                        timestamp: timestamp - timestamps[0],
1281                        ..not_glean_restarted.event.clone()
1282                    },
1283                    execution_counter: None,
1284                },
1285                event
1286            );
1287        }
1288        // The third should be a glean.restarted. Its timestamp should be
1289        // one larger than the largest timestamp seen so far (because that's
1290        // how we ensure monotonic timestamps when client clocks go backwards).
1291        assert_eq!(
1292            store[2],
1293            StoredEvent {
1294                event: RecordedEvent {
1295                    timestamp: store[1].event.timestamp + 1,
1296                    ..glean_restarted.event
1297                },
1298                execution_counter: None,
1299            }
1300        );
1301        // The fifth should have a timestamp based on the new origin.
1302        assert_eq!(
1303            store[3],
1304            StoredEvent {
1305                event: RecordedEvent {
1306                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1307                    ..not_glean_restarted.event
1308                },
1309                execution_counter: None,
1310            }
1311        );
1312        // And we should have an InvalidValue on glean.restarted to show for it.
1313        assert_eq!(
1314            Ok(1),
1315            test_get_num_recorded_errors(
1316                &glean,
1317                &CommonMetricData {
1318                    name: "restarted".into(),
1319                    category: "glean".into(),
1320                    send_in_pings: vec![store_name.into()],
1321                    lifetime: Lifetime::Ping,
1322                    ..Default::default()
1323                }
1324                .into(),
1325                ErrorType::InvalidValue
1326            )
1327        );
1328    }
1329
1330    #[test]
1331    fn normalize_store_non_zero_ec() {
1332        // After the first run, execution_counter will likely be non-zero.
1333        // Ensure normalizing a store that begins with non-zero ec works.
1334        let (glean, _dir) = new_glean(None);
1335
1336        let store_name = "store-name";
1337        let glean_restarted = StoredEvent {
1338            event: RecordedEvent {
1339                timestamp: 2,
1340                category: "glean".into(),
1341                name: "restarted".into(),
1342                extra: None,
1343                session: None,
1344            },
1345            execution_counter: Some(2),
1346        };
1347        let not_glean_restarted = StoredEvent {
1348            event: RecordedEvent {
1349                timestamp: 20,
1350                category: "category".into(),
1351                name: "name".into(),
1352                extra: None,
1353                session: None,
1354            },
1355            execution_counter: Some(2),
1356        };
1357        let glean_restarted_2 = StoredEvent {
1358            event: RecordedEvent {
1359                timestamp: 2,
1360                category: "glean".into(),
1361                name: "restarted".into(),
1362                extra: None,
1363                session: None,
1364            },
1365            execution_counter: Some(3),
1366        };
1367        let mut store = vec![
1368            glean_restarted,
1369            not_glean_restarted.clone(),
1370            glean_restarted_2,
1371        ];
1372        let glean_start_time = glean.start_time();
1373
1374        glean
1375            .event_storage()
1376            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1377
1378        assert_eq!(1, store.len());
1379        assert_eq!(
1380            StoredEvent {
1381                event: RecordedEvent {
1382                    timestamp: 0,
1383                    ..not_glean_restarted.event
1384                },
1385                execution_counter: None
1386            },
1387            store[0]
1388        );
1389        // And we should have no InvalidState errors on glean.restarted.
1390        assert!(test_get_num_recorded_errors(
1391            &glean,
1392            &CommonMetricData {
1393                name: "restarted".into(),
1394                category: "glean".into(),
1395                send_in_pings: vec![store_name.into()],
1396                lifetime: Lifetime::Ping,
1397                ..Default::default()
1398            }
1399            .into(),
1400            ErrorType::InvalidState
1401        )
1402        .is_err());
1403        // (and, just because we're here, double-check there are no InvalidValue either).
1404        assert!(test_get_num_recorded_errors(
1405            &glean,
1406            &CommonMetricData {
1407                name: "restarted".into(),
1408                category: "glean".into(),
1409                send_in_pings: vec![store_name.into()],
1410                lifetime: Lifetime::Ping,
1411                ..Default::default()
1412            }
1413            .into(),
1414            ErrorType::InvalidValue
1415        )
1416        .is_err());
1417    }
1418
1419    #[test]
1420    fn normalize_store_clamps_timestamp() {
1421        let (glean, _dir) = new_glean(None);
1422
1423        let store_name = "store-name";
1424        let event = RecordedEvent {
1425            category: "category".into(),
1426            name: "name".into(),
1427            ..Default::default()
1428        };
1429
1430        let timestamps = [
1431            0,
1432            (i64::MAX / 2) as u64,
1433            i64::MAX as _,
1434            (i64::MAX as u64) + 1,
1435        ];
1436        let mut store = timestamps
1437            .into_iter()
1438            .map(|timestamp| StoredEvent {
1439                event: RecordedEvent {
1440                    timestamp,
1441                    ..event.clone()
1442                },
1443                execution_counter: None,
1444            })
1445            .collect();
1446
1447        let glean_start_time = glean.start_time();
1448        glean
1449            .event_storage()
1450            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1451        assert_eq!(4, store.len());
1452
1453        assert_eq!(0, store[0].event.timestamp);
1454        assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1455        assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1456        assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1457
1458        let error_count = glean
1459            .additional_metrics
1460            .event_timestamp_clamped
1461            .get_value(&glean, "health");
1462        assert_eq!(Some(1), error_count);
1463    }
1464}