1use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::session::{EventSessionContext, SessionMetadata};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37 pub timestamp: u64,
41
42 pub category: String,
46
47 pub name: String,
51
52 #[serde(skip_serializing_if = "Option::is_none")]
56 pub extra: Option<HashMap<String, String>>,
57
58 #[serde(skip_serializing_if = "Option::is_none")]
63 #[serde(default)]
64 pub session: Option<SessionMetadata>,
65}
66
67#[derive(
69 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
70)]
71struct StoredEvent {
72 #[serde(flatten)]
73 event: RecordedEvent,
74
75 #[serde(default)]
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub execution_counter: Option<i32>,
82}
83
84#[derive(Debug)]
106pub struct EventDatabase {
107 pub path: PathBuf,
109 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
111 event_store_files: RwLock<HashMap<String, Arc<File>>>,
112 file_lock: Mutex<()>,
114}
115
116impl MallocSizeOf for EventDatabase {
117 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
118 let mut n = 0;
119 n += self.event_stores.read().unwrap().size_of(ops);
120
121 let map = self.event_store_files.read().unwrap();
122 for store_name in map.keys() {
123 n += store_name.size_of(ops);
124 n += mem::size_of::<File>();
126 }
127 n
128 }
129}
130
131impl EventDatabase {
132 pub fn new(data_path: &Path) -> Result<Self> {
139 let path = data_path.join("events");
140 create_dir_all(&path)?;
141
142 Ok(Self {
143 path,
144 event_stores: RwLock::new(HashMap::new()),
145 event_store_files: RwLock::new(HashMap::new()),
146 file_lock: Mutex::new(()),
147 })
148 }
149
150 pub fn flush_pending_events_on_startup(
176 &self,
177 glean: &Glean,
178 trim_data_to_registered_pings: bool,
179 ) -> bool {
180 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
181 Ok(_) => {
182 let stores_with_events: Vec<String> = {
183 self.event_stores
184 .read()
185 .unwrap()
186 .keys()
187 .map(|x| x.to_owned())
188 .collect() };
190 let has_events_events = stores_with_events.contains(&"events".to_owned());
193 let glean_restarted_stores = if has_events_events {
194 stores_with_events
195 .into_iter()
196 .filter(|store| store != "events")
197 .collect()
198 } else {
199 stores_with_events
200 };
201 if !glean_restarted_stores.is_empty() {
202 for store_name in glean_restarted_stores.iter() {
203 CounterMetric::new(CommonMetricData {
204 name: "execution_counter".into(),
205 category: store_name.into(),
206 send_in_pings: vec![INTERNAL_STORAGE.into()],
207 lifetime: Lifetime::Ping,
208 ..Default::default()
209 })
210 .add_sync(glean, 1);
211 }
212 let glean_restarted = CommonMetricData {
213 name: "restarted".into(),
214 category: "glean".into(),
215 send_in_pings: glean_restarted_stores,
216 lifetime: Lifetime::Ping,
217 ..Default::default()
218 };
219 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
220 let mut extra: HashMap<String, String> =
221 [("glean.startup.date".into(), startup)].into();
222 if glean.with_timestamps() {
223 let now = Utc::now();
224 let precise_timestamp = now.timestamp_millis() as u64;
225 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
226 }
227 self.record(
228 glean,
229 &glean_restarted.into(),
230 crate::get_timestamp_ms(),
231 Some(extra),
232 EventSessionContext::OutOfSession,
233 );
234 }
235 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
236 }
237 Err(err) => {
238 log::warn!("Error loading events from disk: {}", err);
239 false
240 }
241 }
242 }
243
244 fn load_events_from_disk(
245 &self,
246 glean: &Glean,
247 trim_data_to_registered_pings: bool,
248 ) -> Result<()> {
249 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
257 let entry = entry?;
258 if entry.file_type()?.is_file() {
259 let store_name = entry.file_name().into_string()?;
260 log::info!("Loading events for {}", store_name);
261 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
262 log::warn!("Trimming {}'s events", store_name);
263 if let Err(err) = fs::remove_file(entry.path()) {
264 match err.kind() {
265 std::io::ErrorKind::NotFound => {
266 }
268 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
269 }
270 }
271 continue;
272 }
273 let file = BufReader::new(File::open(entry.path())?);
274 db.insert(
275 store_name,
276 file.lines()
277 .map_while(Result::ok)
278 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
279 .collect(),
280 );
281 }
282 }
283 Ok(())
284 }
285
286 pub fn record(
305 &self,
306 glean: &Glean,
307 meta: &CommonMetricDataInternal,
308 timestamp: u64,
309 extra: Option<HashMap<String, String>>,
310 ctx: EventSessionContext,
311 ) -> bool {
312 if !glean.is_upload_enabled() {
314 return false;
315 }
316
317 let session = match ctx {
319 EventSessionContext::OutOfSession => None,
320 EventSessionContext::InSession(session_meta) => Some(session_meta),
321 };
322
323 let mut submit_max_capacity_event_ping = false;
324 {
325 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
327 if !glean.is_ping_enabled(store_name) {
328 continue;
329 }
330
331 let store = db.entry(store_name.to_string()).or_default();
332 let execution_counter = CounterMetric::new(CommonMetricData {
333 name: "execution_counter".into(),
334 category: store_name.into(),
335 send_in_pings: vec![INTERNAL_STORAGE.into()],
336 lifetime: Lifetime::Ping,
337 ..Default::default()
338 })
339 .get_value(glean, INTERNAL_STORAGE);
340 let event = StoredEvent {
342 event: RecordedEvent {
343 timestamp,
344 category: meta.inner.category.to_string(),
345 name: meta.inner.name.to_string(),
346 extra: extra.clone(),
347 session: session.clone(),
348 },
349 execution_counter,
350 };
351 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
353 self.write_event_to_disk(store_name, &event_json);
354 if store_name == "events" && store.len() == glean.get_max_events() {
355 submit_max_capacity_event_ping = true;
356 }
357 }
358 }
359 if submit_max_capacity_event_ping {
360 glean.submit_ping_by_name("events", Some("max_capacity"));
361 true
362 } else {
363 false
364 }
365 }
366
367 fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
368 let mut map = self.event_store_files.write().unwrap();
370 let entry = map.entry(store_name.to_string());
371
372 match entry {
373 Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
374 Entry::Vacant(entry) => {
375 let file = OpenOptions::new()
376 .create(true)
377 .append(true)
378 .open(self.path.join(store_name))?;
379 let file = Arc::new(file);
380 let entry = entry.insert(file);
381 Ok(Arc::clone(entry))
382 }
383 }
384 }
385
386 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
393 let _lock = self.file_lock.lock().unwrap(); let write_res = (|| {
396 let mut file = self.get_event_store(store_name)?;
397 file.write_all(event_json.as_bytes())?;
398 file.write_all(b"\n")?;
399 file.flush()?;
400 Ok::<(), std::io::Error>(())
401 })();
402
403 if let Err(err) = write_res {
404 log::warn!("IO error writing event to store '{}': {}", store_name, err);
405 }
406 }
407
408 fn normalize_store(
437 &self,
438 glean: &Glean,
439 store_name: &str,
440 store: &mut Vec<StoredEvent>,
441 glean_start_time: DateTime<FixedOffset>,
442 ) {
443 let is_glean_restarted =
444 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
445 let glean_restarted_meta = |store_name: &str| CommonMetricData {
446 name: "restarted".into(),
447 category: "glean".into(),
448 send_in_pings: vec![store_name.into()],
449 lifetime: Lifetime::Ping,
450 ..Default::default()
451 };
452 store.sort_by(|a, b| {
454 a.execution_counter
455 .cmp(&b.execution_counter)
456 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
457 .then_with(|| {
458 if is_glean_restarted(&a.event) {
459 Ordering::Less
460 } else {
461 Ordering::Greater
462 }
463 })
464 });
465 let final_event = match store
469 .iter()
470 .rposition(|event| !is_glean_restarted(&event.event))
471 {
472 Some(idx) => idx + 1,
473 _ => 0,
474 };
475 store.drain(final_event..);
476 let first_event = store
477 .iter()
478 .position(|event| !is_glean_restarted(&event.event))
479 .unwrap_or(store.len());
480 store.drain(..first_event);
481 if store.is_empty() {
482 return;
484 }
485 let mut cur_ec = 0;
491 let mut intra_group_offset = store[0].event.timestamp;
493 let mut inter_group_offset = 0;
495 let mut highest_ts = 0;
496 for event in store.iter_mut() {
497 let execution_counter = event.execution_counter.take().unwrap_or(0);
498 if is_glean_restarted(&event.event) {
499 cur_ec = execution_counter;
502 let glean_startup_date = event
503 .event
504 .extra
505 .as_mut()
506 .and_then(|extra| {
507 extra.remove("glean.startup.date").and_then(|date_str| {
508 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
509 .map_err(|_| {
510 record_error(
511 glean,
512 &glean_restarted_meta(store_name).into(),
513 ErrorType::InvalidState,
514 format!("Unparseable glean.startup.date '{}'", date_str),
515 None,
516 );
517 })
518 .ok()
519 })
520 })
521 .unwrap_or(glean_start_time);
522 if event
523 .event
524 .extra
525 .as_ref()
526 .is_some_and(|extra| extra.is_empty())
527 {
528 event.event.extra = None;
530 }
531 let ping_start = DatetimeMetric::new(
532 CommonMetricData {
533 name: format!("{}#start", store_name),
534 category: "".into(),
535 send_in_pings: vec![INTERNAL_STORAGE.into()],
536 lifetime: Lifetime::User,
537 ..Default::default()
538 },
539 TimeUnit::Minute,
540 );
541 let ping_start = ping_start
542 .get_value(glean, INTERNAL_STORAGE)
543 .unwrap_or(glean_start_time);
544 let time_from_ping_start_to_glean_restarted =
545 (glean_startup_date - ping_start).num_milliseconds();
546 intra_group_offset = event.event.timestamp;
547 inter_group_offset =
548 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
549 if inter_group_offset < highest_ts {
550 record_error(
551 glean,
552 &glean_restarted_meta(store_name).into(),
553 ErrorType::InvalidValue,
554 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
555 None,
556 );
557 inter_group_offset = highest_ts + 1;
562 }
563 } else if cur_ec == 0 {
564 cur_ec = execution_counter;
566 }
567 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
568 if execution_counter != cur_ec {
569 record_error(
570 glean,
571 &glean_restarted_meta(store_name).into(),
572 ErrorType::InvalidState,
573 format!(
574 "Inconsistent execution counter {} (expected {})",
575 execution_counter, cur_ec
576 ),
577 None,
578 );
579 cur_ec = execution_counter;
581 }
582
583 if event.event.timestamp > i64::MAX as u64 {
586 glean
587 .additional_metrics
588 .event_timestamp_clamped
589 .add_sync(glean, 1);
590 log::warn!(
591 "Calculated event timestamp was too high. Got: {}, max: {}",
592 event.event.timestamp,
593 i64::MAX,
594 );
595 event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
596 }
597
598 if highest_ts > event.event.timestamp {
599 record_error(
602 glean,
603 &glean_restarted_meta(store_name).into(),
604 ErrorType::InvalidState,
605 format!(
606 "Inconsistent previous highest timestamp {} (expected <= {})",
607 highest_ts, event.event.timestamp
608 ),
609 None,
610 );
611 }
613 highest_ts = event.event.timestamp
614 }
615 }
616
617 pub fn snapshot_as_json(
629 &self,
630 glean: &Glean,
631 store_name: &str,
632 clear_store: bool,
633 ) -> Option<JsonValue> {
634 let result = {
635 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
637 if !store.is_empty() {
638 let mut clone;
641 let store = if clear_store {
642 store
643 } else {
644 clone = store.clone();
645 &mut clone
646 };
647 self.normalize_store(glean, store_name, store, glean.start_time());
649 Some(json!(store))
650 } else {
651 log::warn!("Unexpectly got empty event store for '{}'", store_name);
652 None
653 }
654 })
655 };
656
657 if clear_store {
658 self.event_stores
659 .write()
660 .unwrap() .remove(&store_name.to_string());
662 self.event_store_files
663 .write()
664 .unwrap() .remove(&store_name.to_string());
666
667 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
669 match err.kind() {
670 std::io::ErrorKind::NotFound => {
671 }
673 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
674 }
675 }
676 }
677
678 result
679 }
680
681 pub fn clear_all(&self) -> Result<()> {
683 self.event_stores.write().unwrap().clear();
685 self.event_store_files.write().unwrap().clear();
686
687 let _lock = self.file_lock.lock().unwrap();
689 std::fs::remove_dir_all(&self.path)?;
690 create_dir_all(&self.path)?;
691
692 Ok(())
693 }
694
695 pub fn test_get_value<'a>(
702 &'a self,
703 meta: &'a CommonMetricDataInternal,
704 store_name: &str,
705 ) -> Option<Vec<RecordedEvent>> {
706 let value: Vec<RecordedEvent> = self
707 .event_stores
708 .read()
709 .unwrap() .get(&store_name.to_string())
711 .into_iter()
712 .flatten()
713 .map(|stored_event| stored_event.event.clone())
714 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
715 .collect();
716 if !value.is_empty() {
717 Some(value)
718 } else {
719 None
720 }
721 }
722}
723
724#[cfg(test)]
725mod test {
726 use super::*;
727 use crate::test_get_num_recorded_errors;
728 use crate::tests::new_glean;
729 use chrono::{TimeZone, Timelike};
730
731 #[test]
732 fn handle_truncated_events_on_disk() {
733 let (glean, t) = new_glean(None);
734
735 {
736 let db = EventDatabase::new(t.path()).unwrap();
737 db.write_event_to_disk("events", "{\"timestamp\": 500");
738 db.write_event_to_disk("events", "{\"timestamp\"");
739 db.write_event_to_disk(
740 "events",
741 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
742 );
743 }
744
745 {
746 let db = EventDatabase::new(t.path()).unwrap();
747 db.load_events_from_disk(&glean, false).unwrap();
748 let events = &db.event_stores.read().unwrap()["events"];
749 assert_eq!(1, events.len());
750 }
751 }
752
753 #[test]
754 fn stable_serialization() {
755 let event_empty = RecordedEvent {
756 timestamp: 2,
757 category: "cat".to_string(),
758 name: "name".to_string(),
759 extra: None,
760 session: None,
761 };
762
763 let mut data = HashMap::new();
764 data.insert("a key".to_string(), "a value".to_string());
765 let event_data = RecordedEvent {
766 timestamp: 2,
767 category: "cat".to_string(),
768 name: "name".to_string(),
769 extra: Some(data),
770 session: None,
771 };
772
773 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
774 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
775
776 assert_eq!(
777 StoredEvent {
778 event: event_empty,
779 execution_counter: None
780 },
781 serde_json::from_str(&event_empty_json).unwrap()
782 );
783 assert_eq!(
784 StoredEvent {
785 event: event_data,
786 execution_counter: None
787 },
788 serde_json::from_str(&event_data_json).unwrap()
789 );
790 }
791
792 #[test]
793 fn deserialize_existing_data() {
794 let event_empty_json = r#"
795{
796 "timestamp": 2,
797 "category": "cat",
798 "name": "name"
799}
800 "#;
801
802 let event_data_json = r#"
803{
804 "timestamp": 2,
805 "category": "cat",
806 "name": "name",
807 "extra": {
808 "a key": "a value"
809 }
810}
811 "#;
812
813 let event_empty = RecordedEvent {
814 timestamp: 2,
815 category: "cat".to_string(),
816 name: "name".to_string(),
817 extra: None,
818 session: None,
819 };
820
821 let mut data = HashMap::new();
822 data.insert("a key".to_string(), "a value".to_string());
823 let event_data = RecordedEvent {
824 timestamp: 2,
825 category: "cat".to_string(),
826 name: "name".to_string(),
827 extra: Some(data),
828 session: None,
829 };
830
831 assert_eq!(
832 StoredEvent {
833 event: event_empty,
834 execution_counter: None
835 },
836 serde_json::from_str(event_empty_json).unwrap()
837 );
838 assert_eq!(
839 StoredEvent {
840 event: event_data,
841 execution_counter: None
842 },
843 serde_json::from_str(event_data_json).unwrap()
844 );
845 }
846
847 #[test]
848 fn doesnt_record_when_upload_is_disabled() {
849 let (mut glean, dir) = new_glean(None);
850 let db = EventDatabase::new(dir.path()).unwrap();
851
852 let test_storage = "store1";
853 let test_category = "category";
854 let test_name = "name";
855 let test_timestamp = 2;
856 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
857 let event_data = RecordedEvent {
858 timestamp: test_timestamp,
859 category: test_category.to_string(),
860 name: test_name.to_string(),
861 extra: None,
862 session: None,
863 };
864
865 db.record(
868 &glean,
869 &test_meta,
870 2,
871 None,
872 EventSessionContext::OutOfSession,
873 );
874 {
875 let event_stores = db.event_stores.read().unwrap();
876 assert_eq!(
877 &StoredEvent {
878 event: event_data,
879 execution_counter: None
880 },
881 &event_stores.get(test_storage).unwrap()[0]
882 );
883 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
884 }
885
886 glean.set_upload_enabled(false);
887
888 db.record(
890 &glean,
891 &test_meta,
892 2,
893 None,
894 EventSessionContext::OutOfSession,
895 );
896 {
897 let event_stores = db.event_stores.read().unwrap();
898 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
899 }
900 }
901
902 #[test]
903 fn normalize_store_of_glean_restarted() {
904 let (glean, _dir) = new_glean(None);
906
907 let store_name = "store-name";
908 let glean_restarted = StoredEvent {
909 event: RecordedEvent {
910 timestamp: 2,
911 category: "glean".into(),
912 name: "restarted".into(),
913 extra: None,
914 session: None,
915 },
916 execution_counter: None,
917 };
918 let mut store = vec![glean_restarted.clone()];
919 let glean_start_time = glean.start_time();
920
921 glean
922 .event_storage()
923 .normalize_store(&glean, store_name, &mut store, glean_start_time);
924 assert!(store.is_empty());
925
926 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
927 glean
928 .event_storage()
929 .normalize_store(&glean, store_name, &mut store, glean_start_time);
930 assert!(store.is_empty());
931
932 let mut store = vec![
933 glean_restarted.clone(),
934 glean_restarted.clone(),
935 glean_restarted,
936 ];
937 glean
938 .event_storage()
939 .normalize_store(&glean, store_name, &mut store, glean_start_time);
940 assert!(store.is_empty());
941 }
942
943 #[test]
944 fn normalize_store_of_glean_restarted_on_both_ends() {
945 let (glean, _dir) = new_glean(None);
947
948 let store_name = "store-name";
949 let glean_restarted = StoredEvent {
950 event: RecordedEvent {
951 timestamp: 2,
952 category: "glean".into(),
953 name: "restarted".into(),
954 extra: None,
955 session: None,
956 },
957 execution_counter: None,
958 };
959 let not_glean_restarted = StoredEvent {
960 event: RecordedEvent {
961 timestamp: 20,
962 category: "category".into(),
963 name: "name".into(),
964 extra: None,
965 session: None,
966 },
967 execution_counter: None,
968 };
969 let mut store = vec![
970 glean_restarted.clone(),
971 not_glean_restarted.clone(),
972 glean_restarted,
973 ];
974 let glean_start_time = glean.start_time();
975
976 glean
977 .event_storage()
978 .normalize_store(&glean, store_name, &mut store, glean_start_time);
979 assert_eq!(1, store.len());
980 assert_eq!(
981 StoredEvent {
982 event: RecordedEvent {
983 timestamp: 0,
984 ..not_glean_restarted.event
985 },
986 execution_counter: None
987 },
988 store[0]
989 );
990 }
991
992 #[test]
993 fn normalize_store_single_run_timestamp_math() {
994 let (glean, _dir) = new_glean(None);
998
999 let store_name = "store-name";
1000 let glean_restarted = StoredEvent {
1001 event: RecordedEvent {
1002 timestamp: 2,
1003 category: "glean".into(),
1004 name: "restarted".into(),
1005 extra: None,
1006 session: None,
1007 },
1008 execution_counter: None,
1009 };
1010 let timestamps = [20, 40, 200];
1011 let not_glean_restarted = StoredEvent {
1012 event: RecordedEvent {
1013 timestamp: timestamps[0],
1014 category: "category".into(),
1015 name: "name".into(),
1016 extra: None,
1017 session: None,
1018 },
1019 execution_counter: None,
1020 };
1021 let mut store = vec![
1022 glean_restarted.clone(),
1023 not_glean_restarted.clone(),
1024 StoredEvent {
1025 event: RecordedEvent {
1026 timestamp: timestamps[1],
1027 ..not_glean_restarted.event.clone()
1028 },
1029 execution_counter: None,
1030 },
1031 StoredEvent {
1032 event: RecordedEvent {
1033 timestamp: timestamps[2],
1034 ..not_glean_restarted.event.clone()
1035 },
1036 execution_counter: None,
1037 },
1038 glean_restarted,
1039 ];
1040
1041 glean
1042 .event_storage()
1043 .normalize_store(&glean, store_name, &mut store, glean.start_time());
1044 assert_eq!(3, store.len());
1045 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1046 assert_eq!(
1047 &StoredEvent {
1048 event: RecordedEvent {
1049 timestamp: timestamp - timestamps[0],
1050 ..not_glean_restarted.clone().event
1051 },
1052 execution_counter: None
1053 },
1054 event
1055 );
1056 }
1057 }
1058
1059 #[test]
1060 fn normalize_store_multi_run_timestamp_math() {
1061 let (glean, _dir) = new_glean(None);
1066
1067 let store_name = "store-name";
1068 let glean_restarted = StoredEvent {
1069 event: RecordedEvent {
1070 category: "glean".into(),
1071 name: "restarted".into(),
1072 ..Default::default()
1073 },
1074 execution_counter: None,
1075 };
1076 let not_glean_restarted = StoredEvent {
1077 event: RecordedEvent {
1078 category: "category".into(),
1079 name: "name".into(),
1080 ..Default::default()
1081 },
1082 execution_counter: None,
1083 };
1084
1085 let timestamps = [20, 40, 200, 12];
1088 let ecs = [0, 1];
1089 let some_hour = 16;
1090 let startup_date = FixedOffset::east_opt(0)
1091 .unwrap()
1092 .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1094 let glean_start_time = startup_date.with_hour(some_hour - 1);
1095 let restarted_ts = 2;
1096 let mut store = vec![
1097 StoredEvent {
1098 event: RecordedEvent {
1099 timestamp: timestamps[0],
1100 ..not_glean_restarted.event.clone()
1101 },
1102 execution_counter: Some(ecs[0]),
1103 },
1104 StoredEvent {
1105 event: RecordedEvent {
1106 timestamp: timestamps[1],
1107 ..not_glean_restarted.event.clone()
1108 },
1109 execution_counter: Some(ecs[0]),
1110 },
1111 StoredEvent {
1112 event: RecordedEvent {
1113 timestamp: timestamps[2],
1114 ..not_glean_restarted.event.clone()
1115 },
1116 execution_counter: Some(ecs[0]),
1117 },
1118 StoredEvent {
1119 event: RecordedEvent {
1120 extra: Some(
1121 [(
1122 "glean.startup.date".into(),
1123 get_iso_time_string(startup_date, TimeUnit::Minute),
1124 )]
1125 .into(),
1126 ),
1127 timestamp: restarted_ts,
1128 ..glean_restarted.event.clone()
1129 },
1130 execution_counter: Some(ecs[1]),
1131 },
1132 StoredEvent {
1133 event: RecordedEvent {
1134 timestamp: timestamps[3],
1135 ..not_glean_restarted.event.clone()
1136 },
1137 execution_counter: Some(ecs[1]),
1138 },
1139 ];
1140
1141 glean.event_storage().normalize_store(
1142 &glean,
1143 store_name,
1144 &mut store,
1145 glean_start_time.unwrap(),
1146 );
1147 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1151 assert_eq!(
1152 StoredEvent {
1153 event: RecordedEvent {
1154 timestamp: timestamp - timestamps[0],
1155 ..not_glean_restarted.event.clone()
1156 },
1157 execution_counter: None,
1158 },
1159 event
1160 );
1161 }
1162 let hour_in_millis = 3600000;
1164 assert_eq!(
1165 store[3],
1166 StoredEvent {
1167 event: RecordedEvent {
1168 timestamp: hour_in_millis,
1169 ..glean_restarted.event
1170 },
1171 execution_counter: None,
1172 }
1173 );
1174 assert_eq!(
1176 store[4],
1177 StoredEvent {
1178 event: RecordedEvent {
1179 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1180 ..not_glean_restarted.event
1181 },
1182 execution_counter: None,
1183 }
1184 );
1185 }
1186
1187 #[test]
1188 fn normalize_store_multi_run_client_clocks() {
1189 let (glean, _dir) = new_glean(None);
1192
1193 let store_name = "store-name";
1194 let glean_restarted = StoredEvent {
1195 event: RecordedEvent {
1196 category: "glean".into(),
1197 name: "restarted".into(),
1198 ..Default::default()
1199 },
1200 execution_counter: None,
1201 };
1202 let not_glean_restarted = StoredEvent {
1203 event: RecordedEvent {
1204 category: "category".into(),
1205 name: "name".into(),
1206 ..Default::default()
1207 },
1208 execution_counter: None,
1209 };
1210
1211 let timestamps = [20, 40, 12, 200];
1214 let ecs = [0, 1];
1215 let some_hour = 10;
1216 let startup_date = FixedOffset::east_opt(0)
1217 .unwrap()
1218 .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1220 let glean_start_time = startup_date.with_hour(some_hour + 1);
1221 let restarted_ts = 2;
1222 let mut store = vec![
1223 StoredEvent {
1224 event: RecordedEvent {
1225 timestamp: timestamps[0],
1226 ..not_glean_restarted.event.clone()
1227 },
1228 execution_counter: Some(ecs[0]),
1229 },
1230 StoredEvent {
1231 event: RecordedEvent {
1232 timestamp: timestamps[1],
1233 ..not_glean_restarted.event.clone()
1234 },
1235 execution_counter: Some(ecs[0]),
1236 },
1237 StoredEvent {
1238 event: RecordedEvent {
1239 extra: Some(
1240 [(
1241 "glean.startup.date".into(),
1242 get_iso_time_string(startup_date, TimeUnit::Minute),
1243 )]
1244 .into(),
1245 ),
1246 timestamp: restarted_ts,
1247 ..glean_restarted.event.clone()
1248 },
1249 execution_counter: Some(ecs[1]),
1250 },
1251 StoredEvent {
1252 event: RecordedEvent {
1253 timestamp: timestamps[2],
1254 ..not_glean_restarted.event.clone()
1255 },
1256 execution_counter: Some(ecs[1]),
1257 },
1258 StoredEvent {
1259 event: RecordedEvent {
1260 timestamp: timestamps[3],
1261 ..not_glean_restarted.event.clone()
1262 },
1263 execution_counter: Some(ecs[1]),
1264 },
1265 ];
1266
1267 glean.event_storage().normalize_store(
1268 &glean,
1269 store_name,
1270 &mut store,
1271 glean_start_time.unwrap(),
1272 );
1273 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1277 assert_eq!(
1278 StoredEvent {
1279 event: RecordedEvent {
1280 timestamp: timestamp - timestamps[0],
1281 ..not_glean_restarted.event.clone()
1282 },
1283 execution_counter: None,
1284 },
1285 event
1286 );
1287 }
1288 assert_eq!(
1292 store[2],
1293 StoredEvent {
1294 event: RecordedEvent {
1295 timestamp: store[1].event.timestamp + 1,
1296 ..glean_restarted.event
1297 },
1298 execution_counter: None,
1299 }
1300 );
1301 assert_eq!(
1303 store[3],
1304 StoredEvent {
1305 event: RecordedEvent {
1306 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1307 ..not_glean_restarted.event
1308 },
1309 execution_counter: None,
1310 }
1311 );
1312 assert_eq!(
1314 Ok(1),
1315 test_get_num_recorded_errors(
1316 &glean,
1317 &CommonMetricData {
1318 name: "restarted".into(),
1319 category: "glean".into(),
1320 send_in_pings: vec![store_name.into()],
1321 lifetime: Lifetime::Ping,
1322 ..Default::default()
1323 }
1324 .into(),
1325 ErrorType::InvalidValue
1326 )
1327 );
1328 }
1329
1330 #[test]
1331 fn normalize_store_non_zero_ec() {
1332 let (glean, _dir) = new_glean(None);
1335
1336 let store_name = "store-name";
1337 let glean_restarted = StoredEvent {
1338 event: RecordedEvent {
1339 timestamp: 2,
1340 category: "glean".into(),
1341 name: "restarted".into(),
1342 extra: None,
1343 session: None,
1344 },
1345 execution_counter: Some(2),
1346 };
1347 let not_glean_restarted = StoredEvent {
1348 event: RecordedEvent {
1349 timestamp: 20,
1350 category: "category".into(),
1351 name: "name".into(),
1352 extra: None,
1353 session: None,
1354 },
1355 execution_counter: Some(2),
1356 };
1357 let glean_restarted_2 = StoredEvent {
1358 event: RecordedEvent {
1359 timestamp: 2,
1360 category: "glean".into(),
1361 name: "restarted".into(),
1362 extra: None,
1363 session: None,
1364 },
1365 execution_counter: Some(3),
1366 };
1367 let mut store = vec![
1368 glean_restarted,
1369 not_glean_restarted.clone(),
1370 glean_restarted_2,
1371 ];
1372 let glean_start_time = glean.start_time();
1373
1374 glean
1375 .event_storage()
1376 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1377
1378 assert_eq!(1, store.len());
1379 assert_eq!(
1380 StoredEvent {
1381 event: RecordedEvent {
1382 timestamp: 0,
1383 ..not_glean_restarted.event
1384 },
1385 execution_counter: None
1386 },
1387 store[0]
1388 );
1389 assert!(test_get_num_recorded_errors(
1391 &glean,
1392 &CommonMetricData {
1393 name: "restarted".into(),
1394 category: "glean".into(),
1395 send_in_pings: vec![store_name.into()],
1396 lifetime: Lifetime::Ping,
1397 ..Default::default()
1398 }
1399 .into(),
1400 ErrorType::InvalidState
1401 )
1402 .is_err());
1403 assert!(test_get_num_recorded_errors(
1405 &glean,
1406 &CommonMetricData {
1407 name: "restarted".into(),
1408 category: "glean".into(),
1409 send_in_pings: vec![store_name.into()],
1410 lifetime: Lifetime::Ping,
1411 ..Default::default()
1412 }
1413 .into(),
1414 ErrorType::InvalidValue
1415 )
1416 .is_err());
1417 }
1418
1419 #[test]
1420 fn normalize_store_clamps_timestamp() {
1421 let (glean, _dir) = new_glean(None);
1422
1423 let store_name = "store-name";
1424 let event = RecordedEvent {
1425 category: "category".into(),
1426 name: "name".into(),
1427 ..Default::default()
1428 };
1429
1430 let timestamps = [
1431 0,
1432 (i64::MAX / 2) as u64,
1433 i64::MAX as _,
1434 (i64::MAX as u64) + 1,
1435 ];
1436 let mut store = timestamps
1437 .into_iter()
1438 .map(|timestamp| StoredEvent {
1439 event: RecordedEvent {
1440 timestamp,
1441 ..event.clone()
1442 },
1443 execution_counter: None,
1444 })
1445 .collect();
1446
1447 let glean_start_time = glean.start_time();
1448 glean
1449 .event_storage()
1450 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1451 assert_eq!(4, store.len());
1452
1453 assert_eq!(0, store[0].event.timestamp);
1454 assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1455 assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1456 assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1457
1458 let error_count = glean
1459 .additional_metrics
1460 .event_timestamp_clamped
1461 .get_value(&glean, "health");
1462 assert_eq!(Some(1), error_count);
1463 }
1464}