1use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::coverage::record_coverage;
25use crate::error_recording::{record_error, ErrorType};
26use crate::metrics::{DatetimeMetric, TimeUnit};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37    pub timestamp: u64,
41
42    pub category: String,
46
47    pub name: String,
51
52    #[serde(skip_serializing_if = "Option::is_none")]
56    pub extra: Option<HashMap<String, String>>,
57}
58
59#[derive(
61    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
62)]
63struct StoredEvent {
64    #[serde(flatten)]
65    event: RecordedEvent,
66
67    #[serde(default)]
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub execution_counter: Option<i32>,
74}
75
76#[derive(Debug)]
98pub struct EventDatabase {
99    pub path: PathBuf,
101    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
103    event_store_files: RwLock<HashMap<String, Arc<File>>>,
104    file_lock: Mutex<()>,
106}
107
108impl MallocSizeOf for EventDatabase {
109    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
110        let mut n = 0;
111        n += self.event_stores.read().unwrap().size_of(ops);
112
113        let map = self.event_store_files.read().unwrap();
114        for store_name in map.keys() {
115            n += store_name.size_of(ops);
116            n += mem::size_of::<File>();
118        }
119        n
120    }
121}
122
123impl EventDatabase {
124    pub fn new(data_path: &Path) -> Result<Self> {
131        let path = data_path.join("events");
132        create_dir_all(&path)?;
133
134        Ok(Self {
135            path,
136            event_stores: RwLock::new(HashMap::new()),
137            event_store_files: RwLock::new(HashMap::new()),
138            file_lock: Mutex::new(()),
139        })
140    }
141
142    pub fn flush_pending_events_on_startup(
168        &self,
169        glean: &Glean,
170        trim_data_to_registered_pings: bool,
171    ) -> bool {
172        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
173            Ok(_) => {
174                let stores_with_events: Vec<String> = {
175                    self.event_stores
176                        .read()
177                        .unwrap()
178                        .keys()
179                        .map(|x| x.to_owned())
180                        .collect() };
182                let has_events_events = stores_with_events.contains(&"events".to_owned());
185                let glean_restarted_stores = if has_events_events {
186                    stores_with_events
187                        .into_iter()
188                        .filter(|store| store != "events")
189                        .collect()
190                } else {
191                    stores_with_events
192                };
193                if !glean_restarted_stores.is_empty() {
194                    for store_name in glean_restarted_stores.iter() {
195                        CounterMetric::new(CommonMetricData {
196                            name: "execution_counter".into(),
197                            category: store_name.into(),
198                            send_in_pings: vec![INTERNAL_STORAGE.into()],
199                            lifetime: Lifetime::Ping,
200                            ..Default::default()
201                        })
202                        .add_sync(glean, 1);
203                    }
204                    let glean_restarted = CommonMetricData {
205                        name: "restarted".into(),
206                        category: "glean".into(),
207                        send_in_pings: glean_restarted_stores,
208                        lifetime: Lifetime::Ping,
209                        ..Default::default()
210                    };
211                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
212                    let mut extra: HashMap<String, String> =
213                        [("glean.startup.date".into(), startup)].into();
214                    if glean.with_timestamps() {
215                        let now = Utc::now();
216                        let precise_timestamp = now.timestamp_millis() as u64;
217                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
218                    }
219                    self.record(
220                        glean,
221                        &glean_restarted.into(),
222                        crate::get_timestamp_ms(),
223                        Some(extra),
224                    );
225                }
226                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
227            }
228            Err(err) => {
229                log::warn!("Error loading events from disk: {}", err);
230                false
231            }
232        }
233    }
234
235    fn load_events_from_disk(
236        &self,
237        glean: &Glean,
238        trim_data_to_registered_pings: bool,
239    ) -> Result<()> {
240        let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
248            let entry = entry?;
249            if entry.file_type()?.is_file() {
250                let store_name = entry.file_name().into_string()?;
251                log::info!("Loading events for {}", store_name);
252                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
253                    log::warn!("Trimming {}'s events", store_name);
254                    if let Err(err) = fs::remove_file(entry.path()) {
255                        match err.kind() {
256                            std::io::ErrorKind::NotFound => {
257                                }
259                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
260                        }
261                    }
262                    continue;
263                }
264                let file = BufReader::new(File::open(entry.path())?);
265                db.insert(
266                    store_name,
267                    file.lines()
268                        .map_while(Result::ok)
269                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
270                        .collect(),
271                );
272            }
273        }
274        Ok(())
275    }
276
277    pub fn record(
294        &self,
295        glean: &Glean,
296        meta: &CommonMetricDataInternal,
297        timestamp: u64,
298        extra: Option<HashMap<String, String>>,
299    ) -> bool {
300        if !glean.is_upload_enabled() {
302            return false;
303        }
304
305        let mut submit_max_capacity_event_ping = false;
306        {
307            let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
309                if !glean.is_ping_enabled(store_name) {
310                    continue;
311                }
312
313                let store = db.entry(store_name.to_string()).or_default();
314                let execution_counter = CounterMetric::new(CommonMetricData {
315                    name: "execution_counter".into(),
316                    category: store_name.into(),
317                    send_in_pings: vec![INTERNAL_STORAGE.into()],
318                    lifetime: Lifetime::Ping,
319                    ..Default::default()
320                })
321                .get_value(glean, INTERNAL_STORAGE);
322                let event = StoredEvent {
324                    event: RecordedEvent {
325                        timestamp,
326                        category: meta.inner.category.to_string(),
327                        name: meta.inner.name.to_string(),
328                        extra: extra.clone(),
329                    },
330                    execution_counter,
331                };
332                let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
334                self.write_event_to_disk(store_name, &event_json);
335                if store_name == "events" && store.len() == glean.get_max_events() {
336                    submit_max_capacity_event_ping = true;
337                }
338            }
339        }
340        if submit_max_capacity_event_ping {
341            glean.submit_ping_by_name("events", Some("max_capacity"));
342            true
343        } else {
344            false
345        }
346    }
347
348    fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
349        let mut map = self.event_store_files.write().unwrap();
351        let entry = map.entry(store_name.to_string());
352
353        match entry {
354            Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
355            Entry::Vacant(entry) => {
356                let file = OpenOptions::new()
357                    .create(true)
358                    .append(true)
359                    .open(self.path.join(store_name))?;
360                let file = Arc::new(file);
361                let entry = entry.insert(file);
362                Ok(Arc::clone(entry))
363            }
364        }
365    }
366
367    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
374        let _lock = self.file_lock.lock().unwrap(); let write_res = (|| {
377            let mut file = self.get_event_store(store_name)?;
378            file.write_all(event_json.as_bytes())?;
379            file.write_all(b"\n")?;
380            file.flush()?;
381            Ok::<(), std::io::Error>(())
382        })();
383
384        if let Err(err) = write_res {
385            log::warn!("IO error writing event to store '{}': {}", store_name, err);
386        }
387    }
388
389    fn normalize_store(
418        &self,
419        glean: &Glean,
420        store_name: &str,
421        store: &mut Vec<StoredEvent>,
422        glean_start_time: DateTime<FixedOffset>,
423    ) {
424        let is_glean_restarted =
425            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
426        let glean_restarted_meta = |store_name: &str| CommonMetricData {
427            name: "restarted".into(),
428            category: "glean".into(),
429            send_in_pings: vec![store_name.into()],
430            lifetime: Lifetime::Ping,
431            ..Default::default()
432        };
433        store.sort_by(|a, b| {
435            a.execution_counter
436                .cmp(&b.execution_counter)
437                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
438                .then_with(|| {
439                    if is_glean_restarted(&a.event) {
440                        Ordering::Less
441                    } else {
442                        Ordering::Greater
443                    }
444                })
445        });
446        let final_event = match store
450            .iter()
451            .rposition(|event| !is_glean_restarted(&event.event))
452        {
453            Some(idx) => idx + 1,
454            _ => 0,
455        };
456        store.drain(final_event..);
457        let first_event = store
458            .iter()
459            .position(|event| !is_glean_restarted(&event.event))
460            .unwrap_or(store.len());
461        store.drain(..first_event);
462        if store.is_empty() {
463            return;
465        }
466        let mut cur_ec = 0;
472        let mut intra_group_offset = store[0].event.timestamp;
474        let mut inter_group_offset = 0;
476        let mut highest_ts = 0;
477        for event in store.iter_mut() {
478            let execution_counter = event.execution_counter.take().unwrap_or(0);
479            if is_glean_restarted(&event.event) {
480                cur_ec = execution_counter;
483                let glean_startup_date = event
484                    .event
485                    .extra
486                    .as_mut()
487                    .and_then(|extra| {
488                        extra.remove("glean.startup.date").and_then(|date_str| {
489                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
490                                .map_err(|_| {
491                                    record_error(
492                                        glean,
493                                        &glean_restarted_meta(store_name).into(),
494                                        ErrorType::InvalidState,
495                                        format!("Unparseable glean.startup.date '{}'", date_str),
496                                        None,
497                                    );
498                                })
499                                .ok()
500                        })
501                    })
502                    .unwrap_or(glean_start_time);
503                if event
504                    .event
505                    .extra
506                    .as_ref()
507                    .is_some_and(|extra| extra.is_empty())
508                {
509                    event.event.extra = None;
511                }
512                let ping_start = DatetimeMetric::new(
513                    CommonMetricData {
514                        name: format!("{}#start", store_name),
515                        category: "".into(),
516                        send_in_pings: vec![INTERNAL_STORAGE.into()],
517                        lifetime: Lifetime::User,
518                        ..Default::default()
519                    },
520                    TimeUnit::Minute,
521                );
522                let ping_start = ping_start
523                    .get_value(glean, INTERNAL_STORAGE)
524                    .unwrap_or(glean_start_time);
525                let time_from_ping_start_to_glean_restarted =
526                    (glean_startup_date - ping_start).num_milliseconds();
527                intra_group_offset = event.event.timestamp;
528                inter_group_offset =
529                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
530                if inter_group_offset < highest_ts {
531                    record_error(
532                        glean,
533                        &glean_restarted_meta(store_name).into(),
534                        ErrorType::InvalidValue,
535                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
536                        None,
537                    );
538                    inter_group_offset = highest_ts + 1;
543                }
544            } else if cur_ec == 0 {
545                cur_ec = execution_counter;
547            }
548            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
549            if execution_counter != cur_ec {
550                record_error(
551                    glean,
552                    &glean_restarted_meta(store_name).into(),
553                    ErrorType::InvalidState,
554                    format!(
555                        "Inconsistent execution counter {} (expected {})",
556                        execution_counter, cur_ec
557                    ),
558                    None,
559                );
560                cur_ec = execution_counter;
562            }
563            if highest_ts > event.event.timestamp {
564                record_error(
567                    glean,
568                    &glean_restarted_meta(store_name).into(),
569                    ErrorType::InvalidState,
570                    format!(
571                        "Inconsistent previous highest timestamp {} (expected <= {})",
572                        highest_ts, event.event.timestamp
573                    ),
574                    None,
575                );
576                }
578            highest_ts = event.event.timestamp
579        }
580    }
581
582    pub fn snapshot_as_json(
594        &self,
595        glean: &Glean,
596        store_name: &str,
597        clear_store: bool,
598    ) -> Option<JsonValue> {
599        let result = {
600            let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
602                if !store.is_empty() {
603                    let mut clone;
606                    let store = if clear_store {
607                        store
608                    } else {
609                        clone = store.clone();
610                        &mut clone
611                    };
612                    self.normalize_store(glean, store_name, store, glean.start_time());
614                    Some(json!(store))
615                } else {
616                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
617                    None
618                }
619            })
620        };
621
622        if clear_store {
623            self.event_stores
624                .write()
625                .unwrap() .remove(&store_name.to_string());
627            self.event_store_files
628                .write()
629                .unwrap() .remove(&store_name.to_string());
631
632            let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
634                match err.kind() {
635                    std::io::ErrorKind::NotFound => {
636                        }
638                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
639                }
640            }
641        }
642
643        result
644    }
645
646    pub fn clear_all(&self) -> Result<()> {
648        self.event_stores.write().unwrap().clear();
650        self.event_store_files.write().unwrap().clear();
651
652        let _lock = self.file_lock.lock().unwrap();
654        std::fs::remove_dir_all(&self.path)?;
655        create_dir_all(&self.path)?;
656
657        Ok(())
658    }
659
660    pub fn test_get_value<'a>(
667        &'a self,
668        meta: &'a CommonMetricDataInternal,
669        store_name: &str,
670    ) -> Option<Vec<RecordedEvent>> {
671        record_coverage(&meta.base_identifier());
672
673        let value: Vec<RecordedEvent> = self
674            .event_stores
675            .read()
676            .unwrap() .get(&store_name.to_string())
678            .into_iter()
679            .flatten()
680            .map(|stored_event| stored_event.event.clone())
681            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
682            .collect();
683        if !value.is_empty() {
684            Some(value)
685        } else {
686            None
687        }
688    }
689}
690
691#[cfg(test)]
692mod test {
693    use super::*;
694    use crate::test_get_num_recorded_errors;
695    use crate::tests::new_glean;
696    use chrono::{TimeZone, Timelike};
697
698    #[test]
699    fn handle_truncated_events_on_disk() {
700        let (glean, t) = new_glean(None);
701
702        {
703            let db = EventDatabase::new(t.path()).unwrap();
704            db.write_event_to_disk("events", "{\"timestamp\": 500");
705            db.write_event_to_disk("events", "{\"timestamp\"");
706            db.write_event_to_disk(
707                "events",
708                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
709            );
710        }
711
712        {
713            let db = EventDatabase::new(t.path()).unwrap();
714            db.load_events_from_disk(&glean, false).unwrap();
715            let events = &db.event_stores.read().unwrap()["events"];
716            assert_eq!(1, events.len());
717        }
718    }
719
720    #[test]
721    fn stable_serialization() {
722        let event_empty = RecordedEvent {
723            timestamp: 2,
724            category: "cat".to_string(),
725            name: "name".to_string(),
726            extra: None,
727        };
728
729        let mut data = HashMap::new();
730        data.insert("a key".to_string(), "a value".to_string());
731        let event_data = RecordedEvent {
732            timestamp: 2,
733            category: "cat".to_string(),
734            name: "name".to_string(),
735            extra: Some(data),
736        };
737
738        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
739        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
740
741        assert_eq!(
742            StoredEvent {
743                event: event_empty,
744                execution_counter: None
745            },
746            serde_json::from_str(&event_empty_json).unwrap()
747        );
748        assert_eq!(
749            StoredEvent {
750                event: event_data,
751                execution_counter: None
752            },
753            serde_json::from_str(&event_data_json).unwrap()
754        );
755    }
756
757    #[test]
758    fn deserialize_existing_data() {
759        let event_empty_json = r#"
760{
761  "timestamp": 2,
762  "category": "cat",
763  "name": "name"
764}
765            "#;
766
767        let event_data_json = r#"
768{
769  "timestamp": 2,
770  "category": "cat",
771  "name": "name",
772  "extra": {
773    "a key": "a value"
774  }
775}
776        "#;
777
778        let event_empty = RecordedEvent {
779            timestamp: 2,
780            category: "cat".to_string(),
781            name: "name".to_string(),
782            extra: None,
783        };
784
785        let mut data = HashMap::new();
786        data.insert("a key".to_string(), "a value".to_string());
787        let event_data = RecordedEvent {
788            timestamp: 2,
789            category: "cat".to_string(),
790            name: "name".to_string(),
791            extra: Some(data),
792        };
793
794        assert_eq!(
795            StoredEvent {
796                event: event_empty,
797                execution_counter: None
798            },
799            serde_json::from_str(event_empty_json).unwrap()
800        );
801        assert_eq!(
802            StoredEvent {
803                event: event_data,
804                execution_counter: None
805            },
806            serde_json::from_str(event_data_json).unwrap()
807        );
808    }
809
810    #[test]
811    fn doesnt_record_when_upload_is_disabled() {
812        let (mut glean, dir) = new_glean(None);
813        let db = EventDatabase::new(dir.path()).unwrap();
814
815        let test_storage = "store1";
816        let test_category = "category";
817        let test_name = "name";
818        let test_timestamp = 2;
819        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
820        let event_data = RecordedEvent {
821            timestamp: test_timestamp,
822            category: test_category.to_string(),
823            name: test_name.to_string(),
824            extra: None,
825        };
826
827        db.record(&glean, &test_meta, 2, None);
830        {
831            let event_stores = db.event_stores.read().unwrap();
832            assert_eq!(
833                &StoredEvent {
834                    event: event_data,
835                    execution_counter: None
836                },
837                &event_stores.get(test_storage).unwrap()[0]
838            );
839            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
840        }
841
842        glean.set_upload_enabled(false);
843
844        db.record(&glean, &test_meta, 2, None);
846        {
847            let event_stores = db.event_stores.read().unwrap();
848            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
849        }
850    }
851
852    #[test]
853    fn normalize_store_of_glean_restarted() {
854        let (glean, _dir) = new_glean(None);
856
857        let store_name = "store-name";
858        let glean_restarted = StoredEvent {
859            event: RecordedEvent {
860                timestamp: 2,
861                category: "glean".into(),
862                name: "restarted".into(),
863                extra: None,
864            },
865            execution_counter: None,
866        };
867        let mut store = vec![glean_restarted.clone()];
868        let glean_start_time = glean.start_time();
869
870        glean
871            .event_storage()
872            .normalize_store(&glean, store_name, &mut store, glean_start_time);
873        assert!(store.is_empty());
874
875        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
876        glean
877            .event_storage()
878            .normalize_store(&glean, store_name, &mut store, glean_start_time);
879        assert!(store.is_empty());
880
881        let mut store = vec![
882            glean_restarted.clone(),
883            glean_restarted.clone(),
884            glean_restarted,
885        ];
886        glean
887            .event_storage()
888            .normalize_store(&glean, store_name, &mut store, glean_start_time);
889        assert!(store.is_empty());
890    }
891
892    #[test]
893    fn normalize_store_of_glean_restarted_on_both_ends() {
894        let (glean, _dir) = new_glean(None);
896
897        let store_name = "store-name";
898        let glean_restarted = StoredEvent {
899            event: RecordedEvent {
900                timestamp: 2,
901                category: "glean".into(),
902                name: "restarted".into(),
903                extra: None,
904            },
905            execution_counter: None,
906        };
907        let not_glean_restarted = StoredEvent {
908            event: RecordedEvent {
909                timestamp: 20,
910                category: "category".into(),
911                name: "name".into(),
912                extra: None,
913            },
914            execution_counter: None,
915        };
916        let mut store = vec![
917            glean_restarted.clone(),
918            not_glean_restarted.clone(),
919            glean_restarted,
920        ];
921        let glean_start_time = glean.start_time();
922
923        glean
924            .event_storage()
925            .normalize_store(&glean, store_name, &mut store, glean_start_time);
926        assert_eq!(1, store.len());
927        assert_eq!(
928            StoredEvent {
929                event: RecordedEvent {
930                    timestamp: 0,
931                    ..not_glean_restarted.event
932                },
933                execution_counter: None
934            },
935            store[0]
936        );
937    }
938
939    #[test]
940    fn normalize_store_single_run_timestamp_math() {
941        let (glean, _dir) = new_glean(None);
945
946        let store_name = "store-name";
947        let glean_restarted = StoredEvent {
948            event: RecordedEvent {
949                timestamp: 2,
950                category: "glean".into(),
951                name: "restarted".into(),
952                extra: None,
953            },
954            execution_counter: None,
955        };
956        let timestamps = [20, 40, 200];
957        let not_glean_restarted = StoredEvent {
958            event: RecordedEvent {
959                timestamp: timestamps[0],
960                category: "category".into(),
961                name: "name".into(),
962                extra: None,
963            },
964            execution_counter: None,
965        };
966        let mut store = vec![
967            glean_restarted.clone(),
968            not_glean_restarted.clone(),
969            StoredEvent {
970                event: RecordedEvent {
971                    timestamp: timestamps[1],
972                    ..not_glean_restarted.event.clone()
973                },
974                execution_counter: None,
975            },
976            StoredEvent {
977                event: RecordedEvent {
978                    timestamp: timestamps[2],
979                    ..not_glean_restarted.event.clone()
980                },
981                execution_counter: None,
982            },
983            glean_restarted,
984        ];
985
986        glean
987            .event_storage()
988            .normalize_store(&glean, store_name, &mut store, glean.start_time());
989        assert_eq!(3, store.len());
990        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
991            assert_eq!(
992                &StoredEvent {
993                    event: RecordedEvent {
994                        timestamp: timestamp - timestamps[0],
995                        ..not_glean_restarted.clone().event
996                    },
997                    execution_counter: None
998                },
999                event
1000            );
1001        }
1002    }
1003
1004    #[test]
1005    fn normalize_store_multi_run_timestamp_math() {
1006        let (glean, _dir) = new_glean(None);
1011
1012        let store_name = "store-name";
1013        let glean_restarted = StoredEvent {
1014            event: RecordedEvent {
1015                category: "glean".into(),
1016                name: "restarted".into(),
1017                ..Default::default()
1018            },
1019            execution_counter: None,
1020        };
1021        let not_glean_restarted = StoredEvent {
1022            event: RecordedEvent {
1023                category: "category".into(),
1024                name: "name".into(),
1025                ..Default::default()
1026            },
1027            execution_counter: None,
1028        };
1029
1030        let timestamps = [20, 40, 200, 12];
1033        let ecs = [0, 1];
1034        let some_hour = 16;
1035        let startup_date = FixedOffset::east_opt(0)
1036            .unwrap()
1037            .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1039        let glean_start_time = startup_date.with_hour(some_hour - 1);
1040        let restarted_ts = 2;
1041        let mut store = vec![
1042            StoredEvent {
1043                event: RecordedEvent {
1044                    timestamp: timestamps[0],
1045                    ..not_glean_restarted.event.clone()
1046                },
1047                execution_counter: Some(ecs[0]),
1048            },
1049            StoredEvent {
1050                event: RecordedEvent {
1051                    timestamp: timestamps[1],
1052                    ..not_glean_restarted.event.clone()
1053                },
1054                execution_counter: Some(ecs[0]),
1055            },
1056            StoredEvent {
1057                event: RecordedEvent {
1058                    timestamp: timestamps[2],
1059                    ..not_glean_restarted.event.clone()
1060                },
1061                execution_counter: Some(ecs[0]),
1062            },
1063            StoredEvent {
1064                event: RecordedEvent {
1065                    extra: Some(
1066                        [(
1067                            "glean.startup.date".into(),
1068                            get_iso_time_string(startup_date, TimeUnit::Minute),
1069                        )]
1070                        .into(),
1071                    ),
1072                    timestamp: restarted_ts,
1073                    ..glean_restarted.event.clone()
1074                },
1075                execution_counter: Some(ecs[1]),
1076            },
1077            StoredEvent {
1078                event: RecordedEvent {
1079                    timestamp: timestamps[3],
1080                    ..not_glean_restarted.event.clone()
1081                },
1082                execution_counter: Some(ecs[1]),
1083            },
1084        ];
1085
1086        glean.event_storage().normalize_store(
1087            &glean,
1088            store_name,
1089            &mut store,
1090            glean_start_time.unwrap(),
1091        );
1092        assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1096            assert_eq!(
1097                StoredEvent {
1098                    event: RecordedEvent {
1099                        timestamp: timestamp - timestamps[0],
1100                        ..not_glean_restarted.event.clone()
1101                    },
1102                    execution_counter: None,
1103                },
1104                event
1105            );
1106        }
1107        let hour_in_millis = 3600000;
1109        assert_eq!(
1110            store[3],
1111            StoredEvent {
1112                event: RecordedEvent {
1113                    timestamp: hour_in_millis,
1114                    ..glean_restarted.event
1115                },
1116                execution_counter: None,
1117            }
1118        );
1119        assert_eq!(
1121            store[4],
1122            StoredEvent {
1123                event: RecordedEvent {
1124                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1125                    ..not_glean_restarted.event
1126                },
1127                execution_counter: None,
1128            }
1129        );
1130    }
1131
1132    #[test]
1133    fn normalize_store_multi_run_client_clocks() {
1134        let (glean, _dir) = new_glean(None);
1137
1138        let store_name = "store-name";
1139        let glean_restarted = StoredEvent {
1140            event: RecordedEvent {
1141                category: "glean".into(),
1142                name: "restarted".into(),
1143                ..Default::default()
1144            },
1145            execution_counter: None,
1146        };
1147        let not_glean_restarted = StoredEvent {
1148            event: RecordedEvent {
1149                category: "category".into(),
1150                name: "name".into(),
1151                ..Default::default()
1152            },
1153            execution_counter: None,
1154        };
1155
1156        let timestamps = [20, 40, 12, 200];
1159        let ecs = [0, 1];
1160        let some_hour = 10;
1161        let startup_date = FixedOffset::east_opt(0)
1162            .unwrap()
1163            .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1165        let glean_start_time = startup_date.with_hour(some_hour + 1);
1166        let restarted_ts = 2;
1167        let mut store = vec![
1168            StoredEvent {
1169                event: RecordedEvent {
1170                    timestamp: timestamps[0],
1171                    ..not_glean_restarted.event.clone()
1172                },
1173                execution_counter: Some(ecs[0]),
1174            },
1175            StoredEvent {
1176                event: RecordedEvent {
1177                    timestamp: timestamps[1],
1178                    ..not_glean_restarted.event.clone()
1179                },
1180                execution_counter: Some(ecs[0]),
1181            },
1182            StoredEvent {
1183                event: RecordedEvent {
1184                    extra: Some(
1185                        [(
1186                            "glean.startup.date".into(),
1187                            get_iso_time_string(startup_date, TimeUnit::Minute),
1188                        )]
1189                        .into(),
1190                    ),
1191                    timestamp: restarted_ts,
1192                    ..glean_restarted.event.clone()
1193                },
1194                execution_counter: Some(ecs[1]),
1195            },
1196            StoredEvent {
1197                event: RecordedEvent {
1198                    timestamp: timestamps[2],
1199                    ..not_glean_restarted.event.clone()
1200                },
1201                execution_counter: Some(ecs[1]),
1202            },
1203            StoredEvent {
1204                event: RecordedEvent {
1205                    timestamp: timestamps[3],
1206                    ..not_glean_restarted.event.clone()
1207                },
1208                execution_counter: Some(ecs[1]),
1209            },
1210        ];
1211
1212        glean.event_storage().normalize_store(
1213            &glean,
1214            store_name,
1215            &mut store,
1216            glean_start_time.unwrap(),
1217        );
1218        assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1222            assert_eq!(
1223                StoredEvent {
1224                    event: RecordedEvent {
1225                        timestamp: timestamp - timestamps[0],
1226                        ..not_glean_restarted.event.clone()
1227                    },
1228                    execution_counter: None,
1229                },
1230                event
1231            );
1232        }
1233        assert_eq!(
1237            store[2],
1238            StoredEvent {
1239                event: RecordedEvent {
1240                    timestamp: store[1].event.timestamp + 1,
1241                    ..glean_restarted.event
1242                },
1243                execution_counter: None,
1244            }
1245        );
1246        assert_eq!(
1248            store[3],
1249            StoredEvent {
1250                event: RecordedEvent {
1251                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1252                    ..not_glean_restarted.event
1253                },
1254                execution_counter: None,
1255            }
1256        );
1257        assert_eq!(
1259            Ok(1),
1260            test_get_num_recorded_errors(
1261                &glean,
1262                &CommonMetricData {
1263                    name: "restarted".into(),
1264                    category: "glean".into(),
1265                    send_in_pings: vec![store_name.into()],
1266                    lifetime: Lifetime::Ping,
1267                    ..Default::default()
1268                }
1269                .into(),
1270                ErrorType::InvalidValue
1271            )
1272        );
1273    }
1274
1275    #[test]
1276    fn normalize_store_non_zero_ec() {
1277        let (glean, _dir) = new_glean(None);
1280
1281        let store_name = "store-name";
1282        let glean_restarted = StoredEvent {
1283            event: RecordedEvent {
1284                timestamp: 2,
1285                category: "glean".into(),
1286                name: "restarted".into(),
1287                extra: None,
1288            },
1289            execution_counter: Some(2),
1290        };
1291        let not_glean_restarted = StoredEvent {
1292            event: RecordedEvent {
1293                timestamp: 20,
1294                category: "category".into(),
1295                name: "name".into(),
1296                extra: None,
1297            },
1298            execution_counter: Some(2),
1299        };
1300        let glean_restarted_2 = StoredEvent {
1301            event: RecordedEvent {
1302                timestamp: 2,
1303                category: "glean".into(),
1304                name: "restarted".into(),
1305                extra: None,
1306            },
1307            execution_counter: Some(3),
1308        };
1309        let mut store = vec![
1310            glean_restarted,
1311            not_glean_restarted.clone(),
1312            glean_restarted_2,
1313        ];
1314        let glean_start_time = glean.start_time();
1315
1316        glean
1317            .event_storage()
1318            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1319
1320        assert_eq!(1, store.len());
1321        assert_eq!(
1322            StoredEvent {
1323                event: RecordedEvent {
1324                    timestamp: 0,
1325                    ..not_glean_restarted.event
1326                },
1327                execution_counter: None
1328            },
1329            store[0]
1330        );
1331        assert!(test_get_num_recorded_errors(
1333            &glean,
1334            &CommonMetricData {
1335                name: "restarted".into(),
1336                category: "glean".into(),
1337                send_in_pings: vec![store_name.into()],
1338                lifetime: Lifetime::Ping,
1339                ..Default::default()
1340            }
1341            .into(),
1342            ErrorType::InvalidState
1343        )
1344        .is_err());
1345        assert!(test_get_num_recorded_errors(
1347            &glean,
1348            &CommonMetricData {
1349                name: "restarted".into(),
1350                category: "glean".into(),
1351                send_in_pings: vec![store_name.into()],
1352                lifetime: Lifetime::Ping,
1353                ..Default::default()
1354            }
1355            .into(),
1356            ErrorType::InvalidValue
1357        )
1358        .is_err());
1359    }
1360}