1use std::cmp::Ordering;
6use std::collections::HashMap;
7use std::fs::{create_dir_all, File, OpenOptions};
8use std::io::BufRead;
9use std::io::BufReader;
10use std::io::Write;
11use std::path::{Path, PathBuf};
12use std::sync::{Arc, Mutex, RwLock};
13use std::{fs, mem};
14
15use chrono::{DateTime, FixedOffset, Utc};
16
17use malloc_size_of::MallocSizeOf;
18use malloc_size_of_derive::MallocSizeOf;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value as JsonValue};
21
22use crate::common_metric_data::CommonMetricDataInternal;
23use crate::coverage::record_coverage;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::storage::INTERNAL_STORAGE;
27use crate::util::get_iso_time_string;
28use crate::Glean;
29use crate::Result;
30use crate::{CommonMetricData, CounterMetric, Lifetime};
31
32#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
34#[cfg_attr(test, derive(Default))]
35pub struct RecordedEvent {
36 pub timestamp: u64,
40
41 pub category: String,
45
46 pub name: String,
50
51 #[serde(skip_serializing_if = "Option::is_none")]
55 pub extra: Option<HashMap<String, String>>,
56}
57
58#[derive(
60 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
61)]
62struct StoredEvent {
63 #[serde(flatten)]
64 event: RecordedEvent,
65
66 #[serde(default)]
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub execution_counter: Option<i32>,
73}
74
75#[derive(Debug)]
97pub struct EventDatabase {
98 pub path: PathBuf,
100 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
102 event_store_files: RwLock<HashMap<String, Arc<File>>>,
103 file_lock: Mutex<()>,
105}
106
107impl MallocSizeOf for EventDatabase {
108 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
109 let mut n = 0;
110 n += self.event_stores.read().unwrap().size_of(ops);
111
112 let map = self.event_store_files.read().unwrap();
113 for store_name in map.keys() {
114 n += store_name.size_of(ops);
115 n += mem::size_of::<File>();
117 }
118 n
119 }
120}
121
122impl EventDatabase {
123 pub fn new(data_path: &Path) -> Result<Self> {
130 let path = data_path.join("events");
131 create_dir_all(&path)?;
132
133 Ok(Self {
134 path,
135 event_stores: RwLock::new(HashMap::new()),
136 event_store_files: RwLock::new(HashMap::new()),
137 file_lock: Mutex::new(()),
138 })
139 }
140
141 pub fn flush_pending_events_on_startup(
167 &self,
168 glean: &Glean,
169 trim_data_to_registered_pings: bool,
170 ) -> bool {
171 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
172 Ok(_) => {
173 let stores_with_events: Vec<String> = {
174 self.event_stores
175 .read()
176 .unwrap()
177 .keys()
178 .map(|x| x.to_owned())
179 .collect() };
181 let has_events_events = stores_with_events.contains(&"events".to_owned());
184 let glean_restarted_stores = if has_events_events {
185 stores_with_events
186 .into_iter()
187 .filter(|store| store != "events")
188 .collect()
189 } else {
190 stores_with_events
191 };
192 if !glean_restarted_stores.is_empty() {
193 for store_name in glean_restarted_stores.iter() {
194 CounterMetric::new(CommonMetricData {
195 name: "execution_counter".into(),
196 category: store_name.into(),
197 send_in_pings: vec![INTERNAL_STORAGE.into()],
198 lifetime: Lifetime::Ping,
199 ..Default::default()
200 })
201 .add_sync(glean, 1);
202 }
203 let glean_restarted = CommonMetricData {
204 name: "restarted".into(),
205 category: "glean".into(),
206 send_in_pings: glean_restarted_stores,
207 lifetime: Lifetime::Ping,
208 ..Default::default()
209 };
210 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
211 let mut extra: HashMap<String, String> =
212 [("glean.startup.date".into(), startup)].into();
213 if glean.with_timestamps() {
214 let now = Utc::now();
215 let precise_timestamp = now.timestamp_millis() as u64;
216 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
217 }
218 self.record(
219 glean,
220 &glean_restarted.into(),
221 crate::get_timestamp_ms(),
222 Some(extra),
223 );
224 }
225 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
226 }
227 Err(err) => {
228 log::warn!("Error loading events from disk: {}", err);
229 false
230 }
231 }
232 }
233
234 fn load_events_from_disk(
235 &self,
236 glean: &Glean,
237 trim_data_to_registered_pings: bool,
238 ) -> Result<()> {
239 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
247 let entry = entry?;
248 if entry.file_type()?.is_file() {
249 let store_name = entry.file_name().into_string()?;
250 log::info!("Loading events for {}", store_name);
251 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
252 log::warn!("Trimming {}'s events", store_name);
253 if let Err(err) = fs::remove_file(entry.path()) {
254 match err.kind() {
255 std::io::ErrorKind::NotFound => {
256 }
258 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
259 }
260 }
261 continue;
262 }
263 let file = BufReader::new(File::open(entry.path())?);
264 db.insert(
265 store_name,
266 file.lines()
267 .map_while(Result::ok)
268 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
269 .collect(),
270 );
271 }
272 }
273 Ok(())
274 }
275
276 pub fn record(
293 &self,
294 glean: &Glean,
295 meta: &CommonMetricDataInternal,
296 timestamp: u64,
297 extra: Option<HashMap<String, String>>,
298 ) -> bool {
299 if !glean.is_upload_enabled() {
301 return false;
302 }
303
304 let mut submit_max_capacity_event_ping = false;
305 {
306 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
308 if !glean.is_ping_enabled(store_name) {
309 continue;
310 }
311
312 let store = db.entry(store_name.to_string()).or_default();
313 let execution_counter = CounterMetric::new(CommonMetricData {
314 name: "execution_counter".into(),
315 category: store_name.into(),
316 send_in_pings: vec![INTERNAL_STORAGE.into()],
317 lifetime: Lifetime::Ping,
318 ..Default::default()
319 })
320 .get_value(glean, INTERNAL_STORAGE);
321 let event = StoredEvent {
323 event: RecordedEvent {
324 timestamp,
325 category: meta.inner.category.to_string(),
326 name: meta.inner.name.to_string(),
327 extra: extra.clone(),
328 },
329 execution_counter,
330 };
331 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
333 self.write_event_to_disk(store_name, &event_json);
334 if store_name == "events" && store.len() == glean.get_max_events() {
335 submit_max_capacity_event_ping = true;
336 }
337 }
338 }
339 if submit_max_capacity_event_ping {
340 glean.submit_ping_by_name("events", Some("max_capacity"));
341 true
342 } else {
343 false
344 }
345 }
346
347 fn get_event_store(&self, store_name: &str) -> Arc<File> {
348 Arc::clone(
349 self.event_store_files
350 .write()
351 .unwrap()
352 .entry(store_name.to_string())
353 .or_insert_with(|| {
354 Arc::new(
355 OpenOptions::new()
356 .create(true)
357 .append(true)
358 .open(self.path.join(store_name))
359 .unwrap(),
360 )
361 }),
362 )
363 }
364
365 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
372 let _lock = self.file_lock.lock().unwrap(); let mut file = self.get_event_store(store_name);
374
375 let write_res = (|| {
376 file.write_all(event_json.as_bytes())?;
377 file.write_all(b"\n")?;
378 file.flush()?;
379 Ok::<(), std::io::Error>(())
380 })();
381
382 if let Err(err) = write_res {
383 log::warn!("IO error writing event to store '{}': {}", store_name, err);
384 }
385 }
386
387 fn normalize_store(
416 &self,
417 glean: &Glean,
418 store_name: &str,
419 store: &mut Vec<StoredEvent>,
420 glean_start_time: DateTime<FixedOffset>,
421 ) {
422 let is_glean_restarted =
423 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
424 let glean_restarted_meta = |store_name: &str| CommonMetricData {
425 name: "restarted".into(),
426 category: "glean".into(),
427 send_in_pings: vec![store_name.into()],
428 lifetime: Lifetime::Ping,
429 ..Default::default()
430 };
431 store.sort_by(|a, b| {
433 a.execution_counter
434 .cmp(&b.execution_counter)
435 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
436 .then_with(|| {
437 if is_glean_restarted(&a.event) {
438 Ordering::Less
439 } else {
440 Ordering::Greater
441 }
442 })
443 });
444 let final_event = match store
448 .iter()
449 .rposition(|event| !is_glean_restarted(&event.event))
450 {
451 Some(idx) => idx + 1,
452 _ => 0,
453 };
454 store.drain(final_event..);
455 let first_event = store
456 .iter()
457 .position(|event| !is_glean_restarted(&event.event))
458 .unwrap_or(store.len());
459 store.drain(..first_event);
460 if store.is_empty() {
461 return;
463 }
464 let mut cur_ec = 0;
470 let mut intra_group_offset = store[0].event.timestamp;
472 let mut inter_group_offset = 0;
474 let mut highest_ts = 0;
475 for event in store.iter_mut() {
476 let execution_counter = event.execution_counter.take().unwrap_or(0);
477 if is_glean_restarted(&event.event) {
478 cur_ec = execution_counter;
481 let glean_startup_date = event
482 .event
483 .extra
484 .as_mut()
485 .and_then(|extra| {
486 extra.remove("glean.startup.date").and_then(|date_str| {
487 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
488 .map_err(|_| {
489 record_error(
490 glean,
491 &glean_restarted_meta(store_name).into(),
492 ErrorType::InvalidState,
493 format!("Unparseable glean.startup.date '{}'", date_str),
494 None,
495 );
496 })
497 .ok()
498 })
499 })
500 .unwrap_or(glean_start_time);
501 if event
502 .event
503 .extra
504 .as_ref()
505 .is_some_and(|extra| extra.is_empty())
506 {
507 event.event.extra = None;
509 }
510 let ping_start = DatetimeMetric::new(
511 CommonMetricData {
512 name: format!("{}#start", store_name),
513 category: "".into(),
514 send_in_pings: vec![INTERNAL_STORAGE.into()],
515 lifetime: Lifetime::User,
516 ..Default::default()
517 },
518 TimeUnit::Minute,
519 );
520 let ping_start = ping_start
521 .get_value(glean, INTERNAL_STORAGE)
522 .unwrap_or(glean_start_time);
523 let time_from_ping_start_to_glean_restarted =
524 (glean_startup_date - ping_start).num_milliseconds();
525 intra_group_offset = event.event.timestamp;
526 inter_group_offset =
527 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
528 if inter_group_offset < highest_ts {
529 record_error(
530 glean,
531 &glean_restarted_meta(store_name).into(),
532 ErrorType::InvalidValue,
533 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
534 None,
535 );
536 inter_group_offset = highest_ts + 1;
541 }
542 } else if cur_ec == 0 {
543 cur_ec = execution_counter;
545 }
546 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
547 if execution_counter != cur_ec {
548 record_error(
549 glean,
550 &glean_restarted_meta(store_name).into(),
551 ErrorType::InvalidState,
552 format!(
553 "Inconsistent execution counter {} (expected {})",
554 execution_counter, cur_ec
555 ),
556 None,
557 );
558 cur_ec = execution_counter;
560 }
561 if highest_ts > event.event.timestamp {
562 record_error(
565 glean,
566 &glean_restarted_meta(store_name).into(),
567 ErrorType::InvalidState,
568 format!(
569 "Inconsistent previous highest timestamp {} (expected <= {})",
570 highest_ts, event.event.timestamp
571 ),
572 None,
573 );
574 }
576 highest_ts = event.event.timestamp
577 }
578 }
579
580 pub fn snapshot_as_json(
592 &self,
593 glean: &Glean,
594 store_name: &str,
595 clear_store: bool,
596 ) -> Option<JsonValue> {
597 let result = {
598 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
600 if !store.is_empty() {
601 let mut clone;
604 let store = if clear_store {
605 store
606 } else {
607 clone = store.clone();
608 &mut clone
609 };
610 self.normalize_store(glean, store_name, store, glean.start_time());
612 Some(json!(store))
613 } else {
614 log::warn!("Unexpectly got empty event store for '{}'", store_name);
615 None
616 }
617 })
618 };
619
620 if clear_store {
621 self.event_stores
622 .write()
623 .unwrap() .remove(&store_name.to_string());
625 self.event_store_files
626 .write()
627 .unwrap() .remove(&store_name.to_string());
629
630 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
632 match err.kind() {
633 std::io::ErrorKind::NotFound => {
634 }
636 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
637 }
638 }
639 }
640
641 result
642 }
643
644 pub fn clear_all(&self) -> Result<()> {
646 self.event_stores.write().unwrap().clear();
648 self.event_store_files.write().unwrap().clear();
649
650 let _lock = self.file_lock.lock().unwrap();
652 std::fs::remove_dir_all(&self.path)?;
653 create_dir_all(&self.path)?;
654
655 Ok(())
656 }
657
658 pub fn test_get_value<'a>(
665 &'a self,
666 meta: &'a CommonMetricDataInternal,
667 store_name: &str,
668 ) -> Option<Vec<RecordedEvent>> {
669 record_coverage(&meta.base_identifier());
670
671 let value: Vec<RecordedEvent> = self
672 .event_stores
673 .read()
674 .unwrap() .get(&store_name.to_string())
676 .into_iter()
677 .flatten()
678 .map(|stored_event| stored_event.event.clone())
679 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
680 .collect();
681 if !value.is_empty() {
682 Some(value)
683 } else {
684 None
685 }
686 }
687}
688
689#[cfg(test)]
690mod test {
691 use super::*;
692 use crate::test_get_num_recorded_errors;
693 use crate::tests::new_glean;
694 use chrono::{TimeZone, Timelike};
695
696 #[test]
697 fn handle_truncated_events_on_disk() {
698 let (glean, t) = new_glean(None);
699
700 {
701 let db = EventDatabase::new(t.path()).unwrap();
702 db.write_event_to_disk("events", "{\"timestamp\": 500");
703 db.write_event_to_disk("events", "{\"timestamp\"");
704 db.write_event_to_disk(
705 "events",
706 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
707 );
708 }
709
710 {
711 let db = EventDatabase::new(t.path()).unwrap();
712 db.load_events_from_disk(&glean, false).unwrap();
713 let events = &db.event_stores.read().unwrap()["events"];
714 assert_eq!(1, events.len());
715 }
716 }
717
718 #[test]
719 fn stable_serialization() {
720 let event_empty = RecordedEvent {
721 timestamp: 2,
722 category: "cat".to_string(),
723 name: "name".to_string(),
724 extra: None,
725 };
726
727 let mut data = HashMap::new();
728 data.insert("a key".to_string(), "a value".to_string());
729 let event_data = RecordedEvent {
730 timestamp: 2,
731 category: "cat".to_string(),
732 name: "name".to_string(),
733 extra: Some(data),
734 };
735
736 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
737 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
738
739 assert_eq!(
740 StoredEvent {
741 event: event_empty,
742 execution_counter: None
743 },
744 serde_json::from_str(&event_empty_json).unwrap()
745 );
746 assert_eq!(
747 StoredEvent {
748 event: event_data,
749 execution_counter: None
750 },
751 serde_json::from_str(&event_data_json).unwrap()
752 );
753 }
754
755 #[test]
756 fn deserialize_existing_data() {
757 let event_empty_json = r#"
758{
759 "timestamp": 2,
760 "category": "cat",
761 "name": "name"
762}
763 "#;
764
765 let event_data_json = r#"
766{
767 "timestamp": 2,
768 "category": "cat",
769 "name": "name",
770 "extra": {
771 "a key": "a value"
772 }
773}
774 "#;
775
776 let event_empty = RecordedEvent {
777 timestamp: 2,
778 category: "cat".to_string(),
779 name: "name".to_string(),
780 extra: None,
781 };
782
783 let mut data = HashMap::new();
784 data.insert("a key".to_string(), "a value".to_string());
785 let event_data = RecordedEvent {
786 timestamp: 2,
787 category: "cat".to_string(),
788 name: "name".to_string(),
789 extra: Some(data),
790 };
791
792 assert_eq!(
793 StoredEvent {
794 event: event_empty,
795 execution_counter: None
796 },
797 serde_json::from_str(event_empty_json).unwrap()
798 );
799 assert_eq!(
800 StoredEvent {
801 event: event_data,
802 execution_counter: None
803 },
804 serde_json::from_str(event_data_json).unwrap()
805 );
806 }
807
808 #[test]
809 fn doesnt_record_when_upload_is_disabled() {
810 let (mut glean, dir) = new_glean(None);
811 let db = EventDatabase::new(dir.path()).unwrap();
812
813 let test_storage = "store1";
814 let test_category = "category";
815 let test_name = "name";
816 let test_timestamp = 2;
817 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
818 let event_data = RecordedEvent {
819 timestamp: test_timestamp,
820 category: test_category.to_string(),
821 name: test_name.to_string(),
822 extra: None,
823 };
824
825 db.record(&glean, &test_meta, 2, None);
828 {
829 let event_stores = db.event_stores.read().unwrap();
830 assert_eq!(
831 &StoredEvent {
832 event: event_data,
833 execution_counter: None
834 },
835 &event_stores.get(test_storage).unwrap()[0]
836 );
837 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
838 }
839
840 glean.set_upload_enabled(false);
841
842 db.record(&glean, &test_meta, 2, None);
844 {
845 let event_stores = db.event_stores.read().unwrap();
846 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
847 }
848 }
849
850 #[test]
851 fn normalize_store_of_glean_restarted() {
852 let (glean, _dir) = new_glean(None);
854
855 let store_name = "store-name";
856 let glean_restarted = StoredEvent {
857 event: RecordedEvent {
858 timestamp: 2,
859 category: "glean".into(),
860 name: "restarted".into(),
861 extra: None,
862 },
863 execution_counter: None,
864 };
865 let mut store = vec![glean_restarted.clone()];
866 let glean_start_time = glean.start_time();
867
868 glean
869 .event_storage()
870 .normalize_store(&glean, store_name, &mut store, glean_start_time);
871 assert!(store.is_empty());
872
873 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
874 glean
875 .event_storage()
876 .normalize_store(&glean, store_name, &mut store, glean_start_time);
877 assert!(store.is_empty());
878
879 let mut store = vec![
880 glean_restarted.clone(),
881 glean_restarted.clone(),
882 glean_restarted,
883 ];
884 glean
885 .event_storage()
886 .normalize_store(&glean, store_name, &mut store, glean_start_time);
887 assert!(store.is_empty());
888 }
889
890 #[test]
891 fn normalize_store_of_glean_restarted_on_both_ends() {
892 let (glean, _dir) = new_glean(None);
894
895 let store_name = "store-name";
896 let glean_restarted = StoredEvent {
897 event: RecordedEvent {
898 timestamp: 2,
899 category: "glean".into(),
900 name: "restarted".into(),
901 extra: None,
902 },
903 execution_counter: None,
904 };
905 let not_glean_restarted = StoredEvent {
906 event: RecordedEvent {
907 timestamp: 20,
908 category: "category".into(),
909 name: "name".into(),
910 extra: None,
911 },
912 execution_counter: None,
913 };
914 let mut store = vec![
915 glean_restarted.clone(),
916 not_glean_restarted.clone(),
917 glean_restarted,
918 ];
919 let glean_start_time = glean.start_time();
920
921 glean
922 .event_storage()
923 .normalize_store(&glean, store_name, &mut store, glean_start_time);
924 assert_eq!(1, store.len());
925 assert_eq!(
926 StoredEvent {
927 event: RecordedEvent {
928 timestamp: 0,
929 ..not_glean_restarted.event
930 },
931 execution_counter: None
932 },
933 store[0]
934 );
935 }
936
937 #[test]
938 fn normalize_store_single_run_timestamp_math() {
939 let (glean, _dir) = new_glean(None);
943
944 let store_name = "store-name";
945 let glean_restarted = StoredEvent {
946 event: RecordedEvent {
947 timestamp: 2,
948 category: "glean".into(),
949 name: "restarted".into(),
950 extra: None,
951 },
952 execution_counter: None,
953 };
954 let timestamps = [20, 40, 200];
955 let not_glean_restarted = StoredEvent {
956 event: RecordedEvent {
957 timestamp: timestamps[0],
958 category: "category".into(),
959 name: "name".into(),
960 extra: None,
961 },
962 execution_counter: None,
963 };
964 let mut store = vec![
965 glean_restarted.clone(),
966 not_glean_restarted.clone(),
967 StoredEvent {
968 event: RecordedEvent {
969 timestamp: timestamps[1],
970 ..not_glean_restarted.event.clone()
971 },
972 execution_counter: None,
973 },
974 StoredEvent {
975 event: RecordedEvent {
976 timestamp: timestamps[2],
977 ..not_glean_restarted.event.clone()
978 },
979 execution_counter: None,
980 },
981 glean_restarted,
982 ];
983
984 glean
985 .event_storage()
986 .normalize_store(&glean, store_name, &mut store, glean.start_time());
987 assert_eq!(3, store.len());
988 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
989 assert_eq!(
990 &StoredEvent {
991 event: RecordedEvent {
992 timestamp: timestamp - timestamps[0],
993 ..not_glean_restarted.clone().event
994 },
995 execution_counter: None
996 },
997 event
998 );
999 }
1000 }
1001
1002 #[test]
1003 fn normalize_store_multi_run_timestamp_math() {
1004 let (glean, _dir) = new_glean(None);
1009
1010 let store_name = "store-name";
1011 let glean_restarted = StoredEvent {
1012 event: RecordedEvent {
1013 category: "glean".into(),
1014 name: "restarted".into(),
1015 ..Default::default()
1016 },
1017 execution_counter: None,
1018 };
1019 let not_glean_restarted = StoredEvent {
1020 event: RecordedEvent {
1021 category: "category".into(),
1022 name: "name".into(),
1023 ..Default::default()
1024 },
1025 execution_counter: None,
1026 };
1027
1028 let timestamps = [20, 40, 200, 12];
1031 let ecs = [0, 1];
1032 let some_hour = 16;
1033 let startup_date = FixedOffset::east_opt(0)
1034 .unwrap()
1035 .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1037 let glean_start_time = startup_date.with_hour(some_hour - 1);
1038 let restarted_ts = 2;
1039 let mut store = vec![
1040 StoredEvent {
1041 event: RecordedEvent {
1042 timestamp: timestamps[0],
1043 ..not_glean_restarted.event.clone()
1044 },
1045 execution_counter: Some(ecs[0]),
1046 },
1047 StoredEvent {
1048 event: RecordedEvent {
1049 timestamp: timestamps[1],
1050 ..not_glean_restarted.event.clone()
1051 },
1052 execution_counter: Some(ecs[0]),
1053 },
1054 StoredEvent {
1055 event: RecordedEvent {
1056 timestamp: timestamps[2],
1057 ..not_glean_restarted.event.clone()
1058 },
1059 execution_counter: Some(ecs[0]),
1060 },
1061 StoredEvent {
1062 event: RecordedEvent {
1063 extra: Some(
1064 [(
1065 "glean.startup.date".into(),
1066 get_iso_time_string(startup_date, TimeUnit::Minute),
1067 )]
1068 .into(),
1069 ),
1070 timestamp: restarted_ts,
1071 ..glean_restarted.event.clone()
1072 },
1073 execution_counter: Some(ecs[1]),
1074 },
1075 StoredEvent {
1076 event: RecordedEvent {
1077 timestamp: timestamps[3],
1078 ..not_glean_restarted.event.clone()
1079 },
1080 execution_counter: Some(ecs[1]),
1081 },
1082 ];
1083
1084 glean.event_storage().normalize_store(
1085 &glean,
1086 store_name,
1087 &mut store,
1088 glean_start_time.unwrap(),
1089 );
1090 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1094 assert_eq!(
1095 StoredEvent {
1096 event: RecordedEvent {
1097 timestamp: timestamp - timestamps[0],
1098 ..not_glean_restarted.event.clone()
1099 },
1100 execution_counter: None,
1101 },
1102 event
1103 );
1104 }
1105 let hour_in_millis = 3600000;
1107 assert_eq!(
1108 store[3],
1109 StoredEvent {
1110 event: RecordedEvent {
1111 timestamp: hour_in_millis,
1112 ..glean_restarted.event
1113 },
1114 execution_counter: None,
1115 }
1116 );
1117 assert_eq!(
1119 store[4],
1120 StoredEvent {
1121 event: RecordedEvent {
1122 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1123 ..not_glean_restarted.event
1124 },
1125 execution_counter: None,
1126 }
1127 );
1128 }
1129
1130 #[test]
1131 fn normalize_store_multi_run_client_clocks() {
1132 let (glean, _dir) = new_glean(None);
1135
1136 let store_name = "store-name";
1137 let glean_restarted = StoredEvent {
1138 event: RecordedEvent {
1139 category: "glean".into(),
1140 name: "restarted".into(),
1141 ..Default::default()
1142 },
1143 execution_counter: None,
1144 };
1145 let not_glean_restarted = StoredEvent {
1146 event: RecordedEvent {
1147 category: "category".into(),
1148 name: "name".into(),
1149 ..Default::default()
1150 },
1151 execution_counter: None,
1152 };
1153
1154 let timestamps = [20, 40, 12, 200];
1157 let ecs = [0, 1];
1158 let some_hour = 10;
1159 let startup_date = FixedOffset::east_opt(0)
1160 .unwrap()
1161 .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1163 let glean_start_time = startup_date.with_hour(some_hour + 1);
1164 let restarted_ts = 2;
1165 let mut store = vec![
1166 StoredEvent {
1167 event: RecordedEvent {
1168 timestamp: timestamps[0],
1169 ..not_glean_restarted.event.clone()
1170 },
1171 execution_counter: Some(ecs[0]),
1172 },
1173 StoredEvent {
1174 event: RecordedEvent {
1175 timestamp: timestamps[1],
1176 ..not_glean_restarted.event.clone()
1177 },
1178 execution_counter: Some(ecs[0]),
1179 },
1180 StoredEvent {
1181 event: RecordedEvent {
1182 extra: Some(
1183 [(
1184 "glean.startup.date".into(),
1185 get_iso_time_string(startup_date, TimeUnit::Minute),
1186 )]
1187 .into(),
1188 ),
1189 timestamp: restarted_ts,
1190 ..glean_restarted.event.clone()
1191 },
1192 execution_counter: Some(ecs[1]),
1193 },
1194 StoredEvent {
1195 event: RecordedEvent {
1196 timestamp: timestamps[2],
1197 ..not_glean_restarted.event.clone()
1198 },
1199 execution_counter: Some(ecs[1]),
1200 },
1201 StoredEvent {
1202 event: RecordedEvent {
1203 timestamp: timestamps[3],
1204 ..not_glean_restarted.event.clone()
1205 },
1206 execution_counter: Some(ecs[1]),
1207 },
1208 ];
1209
1210 glean.event_storage().normalize_store(
1211 &glean,
1212 store_name,
1213 &mut store,
1214 glean_start_time.unwrap(),
1215 );
1216 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1220 assert_eq!(
1221 StoredEvent {
1222 event: RecordedEvent {
1223 timestamp: timestamp - timestamps[0],
1224 ..not_glean_restarted.event.clone()
1225 },
1226 execution_counter: None,
1227 },
1228 event
1229 );
1230 }
1231 assert_eq!(
1235 store[2],
1236 StoredEvent {
1237 event: RecordedEvent {
1238 timestamp: store[1].event.timestamp + 1,
1239 ..glean_restarted.event
1240 },
1241 execution_counter: None,
1242 }
1243 );
1244 assert_eq!(
1246 store[3],
1247 StoredEvent {
1248 event: RecordedEvent {
1249 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1250 ..not_glean_restarted.event
1251 },
1252 execution_counter: None,
1253 }
1254 );
1255 assert_eq!(
1257 Ok(1),
1258 test_get_num_recorded_errors(
1259 &glean,
1260 &CommonMetricData {
1261 name: "restarted".into(),
1262 category: "glean".into(),
1263 send_in_pings: vec![store_name.into()],
1264 lifetime: Lifetime::Ping,
1265 ..Default::default()
1266 }
1267 .into(),
1268 ErrorType::InvalidValue
1269 )
1270 );
1271 }
1272
1273 #[test]
1274 fn normalize_store_non_zero_ec() {
1275 let (glean, _dir) = new_glean(None);
1278
1279 let store_name = "store-name";
1280 let glean_restarted = StoredEvent {
1281 event: RecordedEvent {
1282 timestamp: 2,
1283 category: "glean".into(),
1284 name: "restarted".into(),
1285 extra: None,
1286 },
1287 execution_counter: Some(2),
1288 };
1289 let not_glean_restarted = StoredEvent {
1290 event: RecordedEvent {
1291 timestamp: 20,
1292 category: "category".into(),
1293 name: "name".into(),
1294 extra: None,
1295 },
1296 execution_counter: Some(2),
1297 };
1298 let glean_restarted_2 = StoredEvent {
1299 event: RecordedEvent {
1300 timestamp: 2,
1301 category: "glean".into(),
1302 name: "restarted".into(),
1303 extra: None,
1304 },
1305 execution_counter: Some(3),
1306 };
1307 let mut store = vec![
1308 glean_restarted,
1309 not_glean_restarted.clone(),
1310 glean_restarted_2,
1311 ];
1312 let glean_start_time = glean.start_time();
1313
1314 glean
1315 .event_storage()
1316 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1317
1318 assert_eq!(1, store.len());
1319 assert_eq!(
1320 StoredEvent {
1321 event: RecordedEvent {
1322 timestamp: 0,
1323 ..not_glean_restarted.event
1324 },
1325 execution_counter: None
1326 },
1327 store[0]
1328 );
1329 assert!(test_get_num_recorded_errors(
1331 &glean,
1332 &CommonMetricData {
1333 name: "restarted".into(),
1334 category: "glean".into(),
1335 send_in_pings: vec![store_name.into()],
1336 lifetime: Lifetime::Ping,
1337 ..Default::default()
1338 }
1339 .into(),
1340 ErrorType::InvalidState
1341 )
1342 .is_err());
1343 assert!(test_get_num_recorded_errors(
1345 &glean,
1346 &CommonMetricData {
1347 name: "restarted".into(),
1348 category: "glean".into(),
1349 send_in_pings: vec![store_name.into()],
1350 lifetime: Lifetime::Ping,
1351 ..Default::default()
1352 }
1353 .into(),
1354 ErrorType::InvalidValue
1355 )
1356 .is_err());
1357 }
1358}