1use std::cmp::Ordering;
6use std::collections::HashMap;
7use std::fs;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufRead;
10use std::io::BufReader;
11use std::io::Write;
12use std::path::{Path, PathBuf};
13use std::sync::{Mutex, RwLock};
14
15use chrono::{DateTime, FixedOffset, Utc};
16
17use serde::{Deserialize, Serialize};
18use serde_json::{json, Value as JsonValue};
19
20use crate::common_metric_data::CommonMetricDataInternal;
21use crate::coverage::record_coverage;
22use crate::error_recording::{record_error, ErrorType};
23use crate::metrics::{DatetimeMetric, TimeUnit};
24use crate::storage::INTERNAL_STORAGE;
25use crate::util::get_iso_time_string;
26use crate::Glean;
27use crate::Result;
28use crate::{CommonMetricData, CounterMetric, Lifetime};
29
30#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
32#[cfg_attr(test, derive(Default))]
33pub struct RecordedEvent {
34 pub timestamp: u64,
38
39 pub category: String,
43
44 pub name: String,
48
49 #[serde(skip_serializing_if = "Option::is_none")]
53 pub extra: Option<HashMap<String, String>>,
54}
55
56#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
58struct StoredEvent {
59 #[serde(flatten)]
60 event: RecordedEvent,
61
62 #[serde(default)]
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub execution_counter: Option<i32>,
69}
70
71#[derive(Debug)]
93pub struct EventDatabase {
94 pub path: PathBuf,
96 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
98 file_lock: Mutex<()>,
100}
101
102impl EventDatabase {
103 pub fn new(data_path: &Path) -> Result<Self> {
110 let path = data_path.join("events");
111 create_dir_all(&path)?;
112
113 Ok(Self {
114 path,
115 event_stores: RwLock::new(HashMap::new()),
116 file_lock: Mutex::new(()),
117 })
118 }
119
120 pub fn flush_pending_events_on_startup(
146 &self,
147 glean: &Glean,
148 trim_data_to_registered_pings: bool,
149 ) -> bool {
150 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
151 Ok(_) => {
152 let stores_with_events: Vec<String> = {
153 self.event_stores
154 .read()
155 .unwrap()
156 .keys()
157 .map(|x| x.to_owned())
158 .collect() };
160 let has_events_events = stores_with_events.contains(&"events".to_owned());
163 let glean_restarted_stores = if has_events_events {
164 stores_with_events
165 .into_iter()
166 .filter(|store| store != "events")
167 .collect()
168 } else {
169 stores_with_events
170 };
171 if !glean_restarted_stores.is_empty() {
172 for store_name in glean_restarted_stores.iter() {
173 CounterMetric::new(CommonMetricData {
174 name: "execution_counter".into(),
175 category: store_name.into(),
176 send_in_pings: vec![INTERNAL_STORAGE.into()],
177 lifetime: Lifetime::Ping,
178 ..Default::default()
179 })
180 .add_sync(glean, 1);
181 }
182 let glean_restarted = CommonMetricData {
183 name: "restarted".into(),
184 category: "glean".into(),
185 send_in_pings: glean_restarted_stores,
186 lifetime: Lifetime::Ping,
187 ..Default::default()
188 };
189 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
190 let mut extra: HashMap<String, String> =
191 [("glean.startup.date".into(), startup)].into();
192 if glean.with_timestamps() {
193 let now = Utc::now();
194 let precise_timestamp = now.timestamp_millis() as u64;
195 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
196 }
197 self.record(
198 glean,
199 &glean_restarted.into(),
200 crate::get_timestamp_ms(),
201 Some(extra),
202 );
203 }
204 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
205 }
206 Err(err) => {
207 log::warn!("Error loading events from disk: {}", err);
208 false
209 }
210 }
211 }
212
213 fn load_events_from_disk(
214 &self,
215 glean: &Glean,
216 trim_data_to_registered_pings: bool,
217 ) -> Result<()> {
218 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
226 let entry = entry?;
227 if entry.file_type()?.is_file() {
228 let store_name = entry.file_name().into_string()?;
229 log::info!("Loading events for {}", store_name);
230 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
231 log::warn!("Trimming {}'s events", store_name);
232 if let Err(err) = fs::remove_file(entry.path()) {
233 match err.kind() {
234 std::io::ErrorKind::NotFound => {
235 }
237 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
238 }
239 }
240 continue;
241 }
242 let file = BufReader::new(File::open(entry.path())?);
243 db.insert(
244 store_name,
245 file.lines()
246 .map_while(Result::ok)
247 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
248 .collect(),
249 );
250 }
251 }
252 Ok(())
253 }
254
255 pub fn record(
272 &self,
273 glean: &Glean,
274 meta: &CommonMetricDataInternal,
275 timestamp: u64,
276 extra: Option<HashMap<String, String>>,
277 ) -> bool {
278 if !glean.is_upload_enabled() {
280 return false;
281 }
282
283 let mut submit_max_capacity_event_ping = false;
284 {
285 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
287 if !glean.is_ping_enabled(store_name) {
288 continue;
289 }
290
291 let store = db.entry(store_name.to_string()).or_default();
292 let execution_counter = CounterMetric::new(CommonMetricData {
293 name: "execution_counter".into(),
294 category: store_name.into(),
295 send_in_pings: vec![INTERNAL_STORAGE.into()],
296 lifetime: Lifetime::Ping,
297 ..Default::default()
298 })
299 .get_value(glean, INTERNAL_STORAGE);
300 let event = StoredEvent {
302 event: RecordedEvent {
303 timestamp,
304 category: meta.inner.category.to_string(),
305 name: meta.inner.name.to_string(),
306 extra: extra.clone(),
307 },
308 execution_counter,
309 };
310 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
312 self.write_event_to_disk(store_name, &event_json);
313 if store_name == "events" && store.len() == glean.get_max_events() {
314 submit_max_capacity_event_ping = true;
315 }
316 }
317 }
318 if submit_max_capacity_event_ping {
319 glean.submit_ping_by_name("events", Some("max_capacity"));
320 true
321 } else {
322 false
323 }
324 }
325
326 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
333 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = OpenOptions::new()
335 .create(true)
336 .append(true)
337 .open(self.path.join(store_name))
338 .and_then(|mut file| writeln!(file, "{}", event_json))
339 {
340 log::warn!("IO error writing event to store '{}': {}", store_name, err);
341 }
342 }
343
344 fn normalize_store(
373 &self,
374 glean: &Glean,
375 store_name: &str,
376 store: &mut Vec<StoredEvent>,
377 glean_start_time: DateTime<FixedOffset>,
378 ) {
379 let is_glean_restarted =
380 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
381 let glean_restarted_meta = |store_name: &str| CommonMetricData {
382 name: "restarted".into(),
383 category: "glean".into(),
384 send_in_pings: vec![store_name.into()],
385 lifetime: Lifetime::Ping,
386 ..Default::default()
387 };
388 store.sort_by(|a, b| {
390 a.execution_counter
391 .cmp(&b.execution_counter)
392 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
393 .then_with(|| {
394 if is_glean_restarted(&a.event) {
395 Ordering::Less
396 } else {
397 Ordering::Greater
398 }
399 })
400 });
401 let final_event = match store
405 .iter()
406 .rposition(|event| !is_glean_restarted(&event.event))
407 {
408 Some(idx) => idx + 1,
409 _ => 0,
410 };
411 store.drain(final_event..);
412 let first_event = store
413 .iter()
414 .position(|event| !is_glean_restarted(&event.event))
415 .unwrap_or(store.len());
416 store.drain(..first_event);
417 if store.is_empty() {
418 return;
420 }
421 let mut cur_ec = 0;
427 let mut intra_group_offset = store[0].event.timestamp;
429 let mut inter_group_offset = 0;
431 let mut highest_ts = 0;
432 for event in store.iter_mut() {
433 let execution_counter = event.execution_counter.take().unwrap_or(0);
434 if is_glean_restarted(&event.event) {
435 cur_ec = execution_counter;
438 let glean_startup_date = event
439 .event
440 .extra
441 .as_mut()
442 .and_then(|extra| {
443 extra.remove("glean.startup.date").and_then(|date_str| {
444 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
445 .map_err(|_| {
446 record_error(
447 glean,
448 &glean_restarted_meta(store_name).into(),
449 ErrorType::InvalidState,
450 format!("Unparseable glean.startup.date '{}'", date_str),
451 None,
452 );
453 })
454 .ok()
455 })
456 })
457 .unwrap_or(glean_start_time);
458 if event
459 .event
460 .extra
461 .as_ref()
462 .is_some_and(|extra| extra.is_empty())
463 {
464 event.event.extra = None;
466 }
467 let ping_start = DatetimeMetric::new(
468 CommonMetricData {
469 name: format!("{}#start", store_name),
470 category: "".into(),
471 send_in_pings: vec![INTERNAL_STORAGE.into()],
472 lifetime: Lifetime::User,
473 ..Default::default()
474 },
475 TimeUnit::Minute,
476 );
477 let ping_start = ping_start
478 .get_value(glean, INTERNAL_STORAGE)
479 .unwrap_or(glean_start_time);
480 let time_from_ping_start_to_glean_restarted =
481 (glean_startup_date - ping_start).num_milliseconds();
482 intra_group_offset = event.event.timestamp;
483 inter_group_offset =
484 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
485 if inter_group_offset < highest_ts {
486 record_error(
487 glean,
488 &glean_restarted_meta(store_name).into(),
489 ErrorType::InvalidValue,
490 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
491 None,
492 );
493 inter_group_offset = highest_ts + 1;
498 }
499 } else if cur_ec == 0 {
500 cur_ec = execution_counter;
502 }
503 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
504 if execution_counter != cur_ec {
505 record_error(
506 glean,
507 &glean_restarted_meta(store_name).into(),
508 ErrorType::InvalidState,
509 format!(
510 "Inconsistent execution counter {} (expected {})",
511 execution_counter, cur_ec
512 ),
513 None,
514 );
515 cur_ec = execution_counter;
517 }
518 if highest_ts > event.event.timestamp {
519 record_error(
522 glean,
523 &glean_restarted_meta(store_name).into(),
524 ErrorType::InvalidState,
525 format!(
526 "Inconsistent previous highest timestamp {} (expected <= {})",
527 highest_ts, event.event.timestamp
528 ),
529 None,
530 );
531 }
533 highest_ts = event.event.timestamp
534 }
535 }
536
537 pub fn snapshot_as_json(
549 &self,
550 glean: &Glean,
551 store_name: &str,
552 clear_store: bool,
553 ) -> Option<JsonValue> {
554 let result = {
555 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
557 if !store.is_empty() {
558 let mut clone;
561 let store = if clear_store {
562 store
563 } else {
564 clone = store.clone();
565 &mut clone
566 };
567 self.normalize_store(glean, store_name, store, glean.start_time());
569 Some(json!(store))
570 } else {
571 log::warn!("Unexpectly got empty event store for '{}'", store_name);
572 None
573 }
574 })
575 };
576
577 if clear_store {
578 self.event_stores
579 .write()
580 .unwrap() .remove(&store_name.to_string());
582
583 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
585 match err.kind() {
586 std::io::ErrorKind::NotFound => {
587 }
589 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
590 }
591 }
592 }
593
594 result
595 }
596
597 pub fn clear_all(&self) -> Result<()> {
599 self.event_stores.write().unwrap().clear();
601
602 let _lock = self.file_lock.lock().unwrap();
604 std::fs::remove_dir_all(&self.path)?;
605 create_dir_all(&self.path)?;
606
607 Ok(())
608 }
609
610 pub fn test_get_value<'a>(
617 &'a self,
618 meta: &'a CommonMetricDataInternal,
619 store_name: &str,
620 ) -> Option<Vec<RecordedEvent>> {
621 record_coverage(&meta.base_identifier());
622
623 let value: Vec<RecordedEvent> = self
624 .event_stores
625 .read()
626 .unwrap() .get(&store_name.to_string())
628 .into_iter()
629 .flatten()
630 .map(|stored_event| stored_event.event.clone())
631 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
632 .collect();
633 if !value.is_empty() {
634 Some(value)
635 } else {
636 None
637 }
638 }
639}
640
641#[cfg(test)]
642mod test {
643 use super::*;
644 use crate::test_get_num_recorded_errors;
645 use crate::tests::new_glean;
646 use chrono::{TimeZone, Timelike};
647
648 #[test]
649 fn handle_truncated_events_on_disk() {
650 let (glean, t) = new_glean(None);
651
652 {
653 let db = EventDatabase::new(t.path()).unwrap();
654 db.write_event_to_disk("events", "{\"timestamp\": 500");
655 db.write_event_to_disk("events", "{\"timestamp\"");
656 db.write_event_to_disk(
657 "events",
658 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
659 );
660 }
661
662 {
663 let db = EventDatabase::new(t.path()).unwrap();
664 db.load_events_from_disk(&glean, false).unwrap();
665 let events = &db.event_stores.read().unwrap()["events"];
666 assert_eq!(1, events.len());
667 }
668 }
669
670 #[test]
671 fn stable_serialization() {
672 let event_empty = RecordedEvent {
673 timestamp: 2,
674 category: "cat".to_string(),
675 name: "name".to_string(),
676 extra: None,
677 };
678
679 let mut data = HashMap::new();
680 data.insert("a key".to_string(), "a value".to_string());
681 let event_data = RecordedEvent {
682 timestamp: 2,
683 category: "cat".to_string(),
684 name: "name".to_string(),
685 extra: Some(data),
686 };
687
688 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
689 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
690
691 assert_eq!(
692 StoredEvent {
693 event: event_empty,
694 execution_counter: None
695 },
696 serde_json::from_str(&event_empty_json).unwrap()
697 );
698 assert_eq!(
699 StoredEvent {
700 event: event_data,
701 execution_counter: None
702 },
703 serde_json::from_str(&event_data_json).unwrap()
704 );
705 }
706
707 #[test]
708 fn deserialize_existing_data() {
709 let event_empty_json = r#"
710{
711 "timestamp": 2,
712 "category": "cat",
713 "name": "name"
714}
715 "#;
716
717 let event_data_json = r#"
718{
719 "timestamp": 2,
720 "category": "cat",
721 "name": "name",
722 "extra": {
723 "a key": "a value"
724 }
725}
726 "#;
727
728 let event_empty = RecordedEvent {
729 timestamp: 2,
730 category: "cat".to_string(),
731 name: "name".to_string(),
732 extra: None,
733 };
734
735 let mut data = HashMap::new();
736 data.insert("a key".to_string(), "a value".to_string());
737 let event_data = RecordedEvent {
738 timestamp: 2,
739 category: "cat".to_string(),
740 name: "name".to_string(),
741 extra: Some(data),
742 };
743
744 assert_eq!(
745 StoredEvent {
746 event: event_empty,
747 execution_counter: None
748 },
749 serde_json::from_str(event_empty_json).unwrap()
750 );
751 assert_eq!(
752 StoredEvent {
753 event: event_data,
754 execution_counter: None
755 },
756 serde_json::from_str(event_data_json).unwrap()
757 );
758 }
759
760 #[test]
761 fn doesnt_record_when_upload_is_disabled() {
762 let (mut glean, dir) = new_glean(None);
763 let db = EventDatabase::new(dir.path()).unwrap();
764
765 let test_storage = "store1";
766 let test_category = "category";
767 let test_name = "name";
768 let test_timestamp = 2;
769 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
770 let event_data = RecordedEvent {
771 timestamp: test_timestamp,
772 category: test_category.to_string(),
773 name: test_name.to_string(),
774 extra: None,
775 };
776
777 db.record(&glean, &test_meta, 2, None);
780 {
781 let event_stores = db.event_stores.read().unwrap();
782 assert_eq!(
783 &StoredEvent {
784 event: event_data,
785 execution_counter: None
786 },
787 &event_stores.get(test_storage).unwrap()[0]
788 );
789 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
790 }
791
792 glean.set_upload_enabled(false);
793
794 db.record(&glean, &test_meta, 2, None);
796 {
797 let event_stores = db.event_stores.read().unwrap();
798 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
799 }
800 }
801
802 #[test]
803 fn normalize_store_of_glean_restarted() {
804 let (glean, _dir) = new_glean(None);
806
807 let store_name = "store-name";
808 let glean_restarted = StoredEvent {
809 event: RecordedEvent {
810 timestamp: 2,
811 category: "glean".into(),
812 name: "restarted".into(),
813 extra: None,
814 },
815 execution_counter: None,
816 };
817 let mut store = vec![glean_restarted.clone()];
818 let glean_start_time = glean.start_time();
819
820 glean
821 .event_storage()
822 .normalize_store(&glean, store_name, &mut store, glean_start_time);
823 assert!(store.is_empty());
824
825 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
826 glean
827 .event_storage()
828 .normalize_store(&glean, store_name, &mut store, glean_start_time);
829 assert!(store.is_empty());
830
831 let mut store = vec![
832 glean_restarted.clone(),
833 glean_restarted.clone(),
834 glean_restarted,
835 ];
836 glean
837 .event_storage()
838 .normalize_store(&glean, store_name, &mut store, glean_start_time);
839 assert!(store.is_empty());
840 }
841
842 #[test]
843 fn normalize_store_of_glean_restarted_on_both_ends() {
844 let (glean, _dir) = new_glean(None);
846
847 let store_name = "store-name";
848 let glean_restarted = StoredEvent {
849 event: RecordedEvent {
850 timestamp: 2,
851 category: "glean".into(),
852 name: "restarted".into(),
853 extra: None,
854 },
855 execution_counter: None,
856 };
857 let not_glean_restarted = StoredEvent {
858 event: RecordedEvent {
859 timestamp: 20,
860 category: "category".into(),
861 name: "name".into(),
862 extra: None,
863 },
864 execution_counter: None,
865 };
866 let mut store = vec![
867 glean_restarted.clone(),
868 not_glean_restarted.clone(),
869 glean_restarted,
870 ];
871 let glean_start_time = glean.start_time();
872
873 glean
874 .event_storage()
875 .normalize_store(&glean, store_name, &mut store, glean_start_time);
876 assert_eq!(1, store.len());
877 assert_eq!(
878 StoredEvent {
879 event: RecordedEvent {
880 timestamp: 0,
881 ..not_glean_restarted.event
882 },
883 execution_counter: None
884 },
885 store[0]
886 );
887 }
888
889 #[test]
890 fn normalize_store_single_run_timestamp_math() {
891 let (glean, _dir) = new_glean(None);
895
896 let store_name = "store-name";
897 let glean_restarted = StoredEvent {
898 event: RecordedEvent {
899 timestamp: 2,
900 category: "glean".into(),
901 name: "restarted".into(),
902 extra: None,
903 },
904 execution_counter: None,
905 };
906 let timestamps = [20, 40, 200];
907 let not_glean_restarted = StoredEvent {
908 event: RecordedEvent {
909 timestamp: timestamps[0],
910 category: "category".into(),
911 name: "name".into(),
912 extra: None,
913 },
914 execution_counter: None,
915 };
916 let mut store = vec![
917 glean_restarted.clone(),
918 not_glean_restarted.clone(),
919 StoredEvent {
920 event: RecordedEvent {
921 timestamp: timestamps[1],
922 ..not_glean_restarted.event.clone()
923 },
924 execution_counter: None,
925 },
926 StoredEvent {
927 event: RecordedEvent {
928 timestamp: timestamps[2],
929 ..not_glean_restarted.event.clone()
930 },
931 execution_counter: None,
932 },
933 glean_restarted,
934 ];
935
936 glean
937 .event_storage()
938 .normalize_store(&glean, store_name, &mut store, glean.start_time());
939 assert_eq!(3, store.len());
940 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
941 assert_eq!(
942 &StoredEvent {
943 event: RecordedEvent {
944 timestamp: timestamp - timestamps[0],
945 ..not_glean_restarted.clone().event
946 },
947 execution_counter: None
948 },
949 event
950 );
951 }
952 }
953
954 #[test]
955 fn normalize_store_multi_run_timestamp_math() {
956 let (glean, _dir) = new_glean(None);
961
962 let store_name = "store-name";
963 let glean_restarted = StoredEvent {
964 event: RecordedEvent {
965 category: "glean".into(),
966 name: "restarted".into(),
967 ..Default::default()
968 },
969 execution_counter: None,
970 };
971 let not_glean_restarted = StoredEvent {
972 event: RecordedEvent {
973 category: "category".into(),
974 name: "name".into(),
975 ..Default::default()
976 },
977 execution_counter: None,
978 };
979
980 let timestamps = [20, 40, 200, 12];
983 let ecs = [0, 1];
984 let some_hour = 16;
985 let startup_date = FixedOffset::east(0)
986 .ymd(2022, 11, 24)
987 .and_hms(some_hour, 29, 0); let glean_start_time = startup_date.with_hour(some_hour - 1);
989 let restarted_ts = 2;
990 let mut store = vec![
991 StoredEvent {
992 event: RecordedEvent {
993 timestamp: timestamps[0],
994 ..not_glean_restarted.event.clone()
995 },
996 execution_counter: Some(ecs[0]),
997 },
998 StoredEvent {
999 event: RecordedEvent {
1000 timestamp: timestamps[1],
1001 ..not_glean_restarted.event.clone()
1002 },
1003 execution_counter: Some(ecs[0]),
1004 },
1005 StoredEvent {
1006 event: RecordedEvent {
1007 timestamp: timestamps[2],
1008 ..not_glean_restarted.event.clone()
1009 },
1010 execution_counter: Some(ecs[0]),
1011 },
1012 StoredEvent {
1013 event: RecordedEvent {
1014 extra: Some(
1015 [(
1016 "glean.startup.date".into(),
1017 get_iso_time_string(startup_date, TimeUnit::Minute),
1018 )]
1019 .into(),
1020 ),
1021 timestamp: restarted_ts,
1022 ..glean_restarted.event.clone()
1023 },
1024 execution_counter: Some(ecs[1]),
1025 },
1026 StoredEvent {
1027 event: RecordedEvent {
1028 timestamp: timestamps[3],
1029 ..not_glean_restarted.event.clone()
1030 },
1031 execution_counter: Some(ecs[1]),
1032 },
1033 ];
1034
1035 glean.event_storage().normalize_store(
1036 &glean,
1037 store_name,
1038 &mut store,
1039 glean_start_time.unwrap(),
1040 );
1041 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1045 assert_eq!(
1046 StoredEvent {
1047 event: RecordedEvent {
1048 timestamp: timestamp - timestamps[0],
1049 ..not_glean_restarted.event.clone()
1050 },
1051 execution_counter: None,
1052 },
1053 event
1054 );
1055 }
1056 let hour_in_millis = 3600000;
1058 assert_eq!(
1059 store[3],
1060 StoredEvent {
1061 event: RecordedEvent {
1062 timestamp: hour_in_millis,
1063 ..glean_restarted.event
1064 },
1065 execution_counter: None,
1066 }
1067 );
1068 assert_eq!(
1070 store[4],
1071 StoredEvent {
1072 event: RecordedEvent {
1073 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1074 ..not_glean_restarted.event
1075 },
1076 execution_counter: None,
1077 }
1078 );
1079 }
1080
1081 #[test]
1082 fn normalize_store_multi_run_client_clocks() {
1083 let (glean, _dir) = new_glean(None);
1086
1087 let store_name = "store-name";
1088 let glean_restarted = StoredEvent {
1089 event: RecordedEvent {
1090 category: "glean".into(),
1091 name: "restarted".into(),
1092 ..Default::default()
1093 },
1094 execution_counter: None,
1095 };
1096 let not_glean_restarted = StoredEvent {
1097 event: RecordedEvent {
1098 category: "category".into(),
1099 name: "name".into(),
1100 ..Default::default()
1101 },
1102 execution_counter: None,
1103 };
1104
1105 let timestamps = [20, 40, 12, 200];
1108 let ecs = [0, 1];
1109 let some_hour = 10;
1110 let startup_date = FixedOffset::east(0)
1111 .ymd(2022, 11, 25)
1112 .and_hms(some_hour, 37, 0); let glean_start_time = startup_date.with_hour(some_hour + 1);
1114 let restarted_ts = 2;
1115 let mut store = vec![
1116 StoredEvent {
1117 event: RecordedEvent {
1118 timestamp: timestamps[0],
1119 ..not_glean_restarted.event.clone()
1120 },
1121 execution_counter: Some(ecs[0]),
1122 },
1123 StoredEvent {
1124 event: RecordedEvent {
1125 timestamp: timestamps[1],
1126 ..not_glean_restarted.event.clone()
1127 },
1128 execution_counter: Some(ecs[0]),
1129 },
1130 StoredEvent {
1131 event: RecordedEvent {
1132 extra: Some(
1133 [(
1134 "glean.startup.date".into(),
1135 get_iso_time_string(startup_date, TimeUnit::Minute),
1136 )]
1137 .into(),
1138 ),
1139 timestamp: restarted_ts,
1140 ..glean_restarted.event.clone()
1141 },
1142 execution_counter: Some(ecs[1]),
1143 },
1144 StoredEvent {
1145 event: RecordedEvent {
1146 timestamp: timestamps[2],
1147 ..not_glean_restarted.event.clone()
1148 },
1149 execution_counter: Some(ecs[1]),
1150 },
1151 StoredEvent {
1152 event: RecordedEvent {
1153 timestamp: timestamps[3],
1154 ..not_glean_restarted.event.clone()
1155 },
1156 execution_counter: Some(ecs[1]),
1157 },
1158 ];
1159
1160 glean.event_storage().normalize_store(
1161 &glean,
1162 store_name,
1163 &mut store,
1164 glean_start_time.unwrap(),
1165 );
1166 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1170 assert_eq!(
1171 StoredEvent {
1172 event: RecordedEvent {
1173 timestamp: timestamp - timestamps[0],
1174 ..not_glean_restarted.event.clone()
1175 },
1176 execution_counter: None,
1177 },
1178 event
1179 );
1180 }
1181 assert_eq!(
1185 store[2],
1186 StoredEvent {
1187 event: RecordedEvent {
1188 timestamp: store[1].event.timestamp + 1,
1189 ..glean_restarted.event
1190 },
1191 execution_counter: None,
1192 }
1193 );
1194 assert_eq!(
1196 store[3],
1197 StoredEvent {
1198 event: RecordedEvent {
1199 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1200 ..not_glean_restarted.event
1201 },
1202 execution_counter: None,
1203 }
1204 );
1205 assert_eq!(
1207 Ok(1),
1208 test_get_num_recorded_errors(
1209 &glean,
1210 &CommonMetricData {
1211 name: "restarted".into(),
1212 category: "glean".into(),
1213 send_in_pings: vec![store_name.into()],
1214 lifetime: Lifetime::Ping,
1215 ..Default::default()
1216 }
1217 .into(),
1218 ErrorType::InvalidValue
1219 )
1220 );
1221 }
1222
1223 #[test]
1224 fn normalize_store_non_zero_ec() {
1225 let (glean, _dir) = new_glean(None);
1228
1229 let store_name = "store-name";
1230 let glean_restarted = StoredEvent {
1231 event: RecordedEvent {
1232 timestamp: 2,
1233 category: "glean".into(),
1234 name: "restarted".into(),
1235 extra: None,
1236 },
1237 execution_counter: Some(2),
1238 };
1239 let not_glean_restarted = StoredEvent {
1240 event: RecordedEvent {
1241 timestamp: 20,
1242 category: "category".into(),
1243 name: "name".into(),
1244 extra: None,
1245 },
1246 execution_counter: Some(2),
1247 };
1248 let glean_restarted_2 = StoredEvent {
1249 event: RecordedEvent {
1250 timestamp: 2,
1251 category: "glean".into(),
1252 name: "restarted".into(),
1253 extra: None,
1254 },
1255 execution_counter: Some(3),
1256 };
1257 let mut store = vec![
1258 glean_restarted,
1259 not_glean_restarted.clone(),
1260 glean_restarted_2,
1261 ];
1262 let glean_start_time = glean.start_time();
1263
1264 glean
1265 .event_storage()
1266 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1267
1268 assert_eq!(1, store.len());
1269 assert_eq!(
1270 StoredEvent {
1271 event: RecordedEvent {
1272 timestamp: 0,
1273 ..not_glean_restarted.event
1274 },
1275 execution_counter: None
1276 },
1277 store[0]
1278 );
1279 assert!(test_get_num_recorded_errors(
1281 &glean,
1282 &CommonMetricData {
1283 name: "restarted".into(),
1284 category: "glean".into(),
1285 send_in_pings: vec![store_name.into()],
1286 lifetime: Lifetime::Ping,
1287 ..Default::default()
1288 }
1289 .into(),
1290 ErrorType::InvalidState
1291 )
1292 .is_err());
1293 assert!(test_get_num_recorded_errors(
1295 &glean,
1296 &CommonMetricData {
1297 name: "restarted".into(),
1298 category: "glean".into(),
1299 send_in_pings: vec![store_name.into()],
1300 lifetime: Lifetime::Ping,
1301 ..Default::default()
1302 }
1303 .into(),
1304 ErrorType::InvalidValue
1305 )
1306 .is_err());
1307 }
1308}