1use std::cmp::Ordering;
6use std::collections::HashMap;
7use std::fs;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufRead;
10use std::io::BufReader;
11use std::io::Write;
12use std::path::{Path, PathBuf};
13use std::sync::{Mutex, RwLock};
14
15use chrono::{DateTime, FixedOffset, Utc};
16
17use malloc_size_of::MallocSizeOf;
18use malloc_size_of_derive::MallocSizeOf;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value as JsonValue};
21
22use crate::common_metric_data::CommonMetricDataInternal;
23use crate::coverage::record_coverage;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::storage::INTERNAL_STORAGE;
27use crate::util::get_iso_time_string;
28use crate::Glean;
29use crate::Result;
30use crate::{CommonMetricData, CounterMetric, Lifetime};
31
32#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
34#[cfg_attr(test, derive(Default))]
35pub struct RecordedEvent {
36 pub timestamp: u64,
40
41 pub category: String,
45
46 pub name: String,
50
51 #[serde(skip_serializing_if = "Option::is_none")]
55 pub extra: Option<HashMap<String, String>>,
56}
57
58#[derive(
60 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
61)]
62struct StoredEvent {
63 #[serde(flatten)]
64 event: RecordedEvent,
65
66 #[serde(default)]
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub execution_counter: Option<i32>,
73}
74
75#[derive(Debug)]
97pub struct EventDatabase {
98 pub path: PathBuf,
100 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
102 file_lock: Mutex<()>,
104}
105
106impl MallocSizeOf for EventDatabase {
107 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
108 self.event_stores.read().unwrap().size_of(ops)
109 }
110}
111
112impl EventDatabase {
113 pub fn new(data_path: &Path) -> Result<Self> {
120 let path = data_path.join("events");
121 create_dir_all(&path)?;
122
123 Ok(Self {
124 path,
125 event_stores: RwLock::new(HashMap::new()),
126 file_lock: Mutex::new(()),
127 })
128 }
129
130 pub fn flush_pending_events_on_startup(
156 &self,
157 glean: &Glean,
158 trim_data_to_registered_pings: bool,
159 ) -> bool {
160 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
161 Ok(_) => {
162 let stores_with_events: Vec<String> = {
163 self.event_stores
164 .read()
165 .unwrap()
166 .keys()
167 .map(|x| x.to_owned())
168 .collect() };
170 let has_events_events = stores_with_events.contains(&"events".to_owned());
173 let glean_restarted_stores = if has_events_events {
174 stores_with_events
175 .into_iter()
176 .filter(|store| store != "events")
177 .collect()
178 } else {
179 stores_with_events
180 };
181 if !glean_restarted_stores.is_empty() {
182 for store_name in glean_restarted_stores.iter() {
183 CounterMetric::new(CommonMetricData {
184 name: "execution_counter".into(),
185 category: store_name.into(),
186 send_in_pings: vec![INTERNAL_STORAGE.into()],
187 lifetime: Lifetime::Ping,
188 ..Default::default()
189 })
190 .add_sync(glean, 1);
191 }
192 let glean_restarted = CommonMetricData {
193 name: "restarted".into(),
194 category: "glean".into(),
195 send_in_pings: glean_restarted_stores,
196 lifetime: Lifetime::Ping,
197 ..Default::default()
198 };
199 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
200 let mut extra: HashMap<String, String> =
201 [("glean.startup.date".into(), startup)].into();
202 if glean.with_timestamps() {
203 let now = Utc::now();
204 let precise_timestamp = now.timestamp_millis() as u64;
205 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
206 }
207 self.record(
208 glean,
209 &glean_restarted.into(),
210 crate::get_timestamp_ms(),
211 Some(extra),
212 );
213 }
214 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
215 }
216 Err(err) => {
217 log::warn!("Error loading events from disk: {}", err);
218 false
219 }
220 }
221 }
222
223 fn load_events_from_disk(
224 &self,
225 glean: &Glean,
226 trim_data_to_registered_pings: bool,
227 ) -> Result<()> {
228 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
236 let entry = entry?;
237 if entry.file_type()?.is_file() {
238 let store_name = entry.file_name().into_string()?;
239 log::info!("Loading events for {}", store_name);
240 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
241 log::warn!("Trimming {}'s events", store_name);
242 if let Err(err) = fs::remove_file(entry.path()) {
243 match err.kind() {
244 std::io::ErrorKind::NotFound => {
245 }
247 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
248 }
249 }
250 continue;
251 }
252 let file = BufReader::new(File::open(entry.path())?);
253 db.insert(
254 store_name,
255 file.lines()
256 .map_while(Result::ok)
257 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
258 .collect(),
259 );
260 }
261 }
262 Ok(())
263 }
264
265 pub fn record(
282 &self,
283 glean: &Glean,
284 meta: &CommonMetricDataInternal,
285 timestamp: u64,
286 extra: Option<HashMap<String, String>>,
287 ) -> bool {
288 if !glean.is_upload_enabled() {
290 return false;
291 }
292
293 let mut submit_max_capacity_event_ping = false;
294 {
295 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
297 if !glean.is_ping_enabled(store_name) {
298 continue;
299 }
300
301 let store = db.entry(store_name.to_string()).or_default();
302 let execution_counter = CounterMetric::new(CommonMetricData {
303 name: "execution_counter".into(),
304 category: store_name.into(),
305 send_in_pings: vec![INTERNAL_STORAGE.into()],
306 lifetime: Lifetime::Ping,
307 ..Default::default()
308 })
309 .get_value(glean, INTERNAL_STORAGE);
310 let event = StoredEvent {
312 event: RecordedEvent {
313 timestamp,
314 category: meta.inner.category.to_string(),
315 name: meta.inner.name.to_string(),
316 extra: extra.clone(),
317 },
318 execution_counter,
319 };
320 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
322 self.write_event_to_disk(store_name, &event_json);
323 if store_name == "events" && store.len() == glean.get_max_events() {
324 submit_max_capacity_event_ping = true;
325 }
326 }
327 }
328 if submit_max_capacity_event_ping {
329 glean.submit_ping_by_name("events", Some("max_capacity"));
330 true
331 } else {
332 false
333 }
334 }
335
336 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
343 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = OpenOptions::new()
345 .create(true)
346 .append(true)
347 .open(self.path.join(store_name))
348 .and_then(|mut file| writeln!(file, "{}", event_json))
349 {
350 log::warn!("IO error writing event to store '{}': {}", store_name, err);
351 }
352 }
353
354 fn normalize_store(
383 &self,
384 glean: &Glean,
385 store_name: &str,
386 store: &mut Vec<StoredEvent>,
387 glean_start_time: DateTime<FixedOffset>,
388 ) {
389 let is_glean_restarted =
390 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
391 let glean_restarted_meta = |store_name: &str| CommonMetricData {
392 name: "restarted".into(),
393 category: "glean".into(),
394 send_in_pings: vec![store_name.into()],
395 lifetime: Lifetime::Ping,
396 ..Default::default()
397 };
398 store.sort_by(|a, b| {
400 a.execution_counter
401 .cmp(&b.execution_counter)
402 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
403 .then_with(|| {
404 if is_glean_restarted(&a.event) {
405 Ordering::Less
406 } else {
407 Ordering::Greater
408 }
409 })
410 });
411 let final_event = match store
415 .iter()
416 .rposition(|event| !is_glean_restarted(&event.event))
417 {
418 Some(idx) => idx + 1,
419 _ => 0,
420 };
421 store.drain(final_event..);
422 let first_event = store
423 .iter()
424 .position(|event| !is_glean_restarted(&event.event))
425 .unwrap_or(store.len());
426 store.drain(..first_event);
427 if store.is_empty() {
428 return;
430 }
431 let mut cur_ec = 0;
437 let mut intra_group_offset = store[0].event.timestamp;
439 let mut inter_group_offset = 0;
441 let mut highest_ts = 0;
442 for event in store.iter_mut() {
443 let execution_counter = event.execution_counter.take().unwrap_or(0);
444 if is_glean_restarted(&event.event) {
445 cur_ec = execution_counter;
448 let glean_startup_date = event
449 .event
450 .extra
451 .as_mut()
452 .and_then(|extra| {
453 extra.remove("glean.startup.date").and_then(|date_str| {
454 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
455 .map_err(|_| {
456 record_error(
457 glean,
458 &glean_restarted_meta(store_name).into(),
459 ErrorType::InvalidState,
460 format!("Unparseable glean.startup.date '{}'", date_str),
461 None,
462 );
463 })
464 .ok()
465 })
466 })
467 .unwrap_or(glean_start_time);
468 if event
469 .event
470 .extra
471 .as_ref()
472 .is_some_and(|extra| extra.is_empty())
473 {
474 event.event.extra = None;
476 }
477 let ping_start = DatetimeMetric::new(
478 CommonMetricData {
479 name: format!("{}#start", store_name),
480 category: "".into(),
481 send_in_pings: vec![INTERNAL_STORAGE.into()],
482 lifetime: Lifetime::User,
483 ..Default::default()
484 },
485 TimeUnit::Minute,
486 );
487 let ping_start = ping_start
488 .get_value(glean, INTERNAL_STORAGE)
489 .unwrap_or(glean_start_time);
490 let time_from_ping_start_to_glean_restarted =
491 (glean_startup_date - ping_start).num_milliseconds();
492 intra_group_offset = event.event.timestamp;
493 inter_group_offset =
494 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
495 if inter_group_offset < highest_ts {
496 record_error(
497 glean,
498 &glean_restarted_meta(store_name).into(),
499 ErrorType::InvalidValue,
500 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
501 None,
502 );
503 inter_group_offset = highest_ts + 1;
508 }
509 } else if cur_ec == 0 {
510 cur_ec = execution_counter;
512 }
513 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
514 if execution_counter != cur_ec {
515 record_error(
516 glean,
517 &glean_restarted_meta(store_name).into(),
518 ErrorType::InvalidState,
519 format!(
520 "Inconsistent execution counter {} (expected {})",
521 execution_counter, cur_ec
522 ),
523 None,
524 );
525 cur_ec = execution_counter;
527 }
528 if highest_ts > event.event.timestamp {
529 record_error(
532 glean,
533 &glean_restarted_meta(store_name).into(),
534 ErrorType::InvalidState,
535 format!(
536 "Inconsistent previous highest timestamp {} (expected <= {})",
537 highest_ts, event.event.timestamp
538 ),
539 None,
540 );
541 }
543 highest_ts = event.event.timestamp
544 }
545 }
546
547 pub fn snapshot_as_json(
559 &self,
560 glean: &Glean,
561 store_name: &str,
562 clear_store: bool,
563 ) -> Option<JsonValue> {
564 let result = {
565 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
567 if !store.is_empty() {
568 let mut clone;
571 let store = if clear_store {
572 store
573 } else {
574 clone = store.clone();
575 &mut clone
576 };
577 self.normalize_store(glean, store_name, store, glean.start_time());
579 Some(json!(store))
580 } else {
581 log::warn!("Unexpectly got empty event store for '{}'", store_name);
582 None
583 }
584 })
585 };
586
587 if clear_store {
588 self.event_stores
589 .write()
590 .unwrap() .remove(&store_name.to_string());
592
593 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
595 match err.kind() {
596 std::io::ErrorKind::NotFound => {
597 }
599 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
600 }
601 }
602 }
603
604 result
605 }
606
607 pub fn clear_all(&self) -> Result<()> {
609 self.event_stores.write().unwrap().clear();
611
612 let _lock = self.file_lock.lock().unwrap();
614 std::fs::remove_dir_all(&self.path)?;
615 create_dir_all(&self.path)?;
616
617 Ok(())
618 }
619
620 pub fn test_get_value<'a>(
627 &'a self,
628 meta: &'a CommonMetricDataInternal,
629 store_name: &str,
630 ) -> Option<Vec<RecordedEvent>> {
631 record_coverage(&meta.base_identifier());
632
633 let value: Vec<RecordedEvent> = self
634 .event_stores
635 .read()
636 .unwrap() .get(&store_name.to_string())
638 .into_iter()
639 .flatten()
640 .map(|stored_event| stored_event.event.clone())
641 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
642 .collect();
643 if !value.is_empty() {
644 Some(value)
645 } else {
646 None
647 }
648 }
649}
650
651#[cfg(test)]
652mod test {
653 use super::*;
654 use crate::test_get_num_recorded_errors;
655 use crate::tests::new_glean;
656 use chrono::{TimeZone, Timelike};
657
658 #[test]
659 fn handle_truncated_events_on_disk() {
660 let (glean, t) = new_glean(None);
661
662 {
663 let db = EventDatabase::new(t.path()).unwrap();
664 db.write_event_to_disk("events", "{\"timestamp\": 500");
665 db.write_event_to_disk("events", "{\"timestamp\"");
666 db.write_event_to_disk(
667 "events",
668 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
669 );
670 }
671
672 {
673 let db = EventDatabase::new(t.path()).unwrap();
674 db.load_events_from_disk(&glean, false).unwrap();
675 let events = &db.event_stores.read().unwrap()["events"];
676 assert_eq!(1, events.len());
677 }
678 }
679
680 #[test]
681 fn stable_serialization() {
682 let event_empty = RecordedEvent {
683 timestamp: 2,
684 category: "cat".to_string(),
685 name: "name".to_string(),
686 extra: None,
687 };
688
689 let mut data = HashMap::new();
690 data.insert("a key".to_string(), "a value".to_string());
691 let event_data = RecordedEvent {
692 timestamp: 2,
693 category: "cat".to_string(),
694 name: "name".to_string(),
695 extra: Some(data),
696 };
697
698 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
699 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
700
701 assert_eq!(
702 StoredEvent {
703 event: event_empty,
704 execution_counter: None
705 },
706 serde_json::from_str(&event_empty_json).unwrap()
707 );
708 assert_eq!(
709 StoredEvent {
710 event: event_data,
711 execution_counter: None
712 },
713 serde_json::from_str(&event_data_json).unwrap()
714 );
715 }
716
717 #[test]
718 fn deserialize_existing_data() {
719 let event_empty_json = r#"
720{
721 "timestamp": 2,
722 "category": "cat",
723 "name": "name"
724}
725 "#;
726
727 let event_data_json = r#"
728{
729 "timestamp": 2,
730 "category": "cat",
731 "name": "name",
732 "extra": {
733 "a key": "a value"
734 }
735}
736 "#;
737
738 let event_empty = RecordedEvent {
739 timestamp: 2,
740 category: "cat".to_string(),
741 name: "name".to_string(),
742 extra: None,
743 };
744
745 let mut data = HashMap::new();
746 data.insert("a key".to_string(), "a value".to_string());
747 let event_data = RecordedEvent {
748 timestamp: 2,
749 category: "cat".to_string(),
750 name: "name".to_string(),
751 extra: Some(data),
752 };
753
754 assert_eq!(
755 StoredEvent {
756 event: event_empty,
757 execution_counter: None
758 },
759 serde_json::from_str(event_empty_json).unwrap()
760 );
761 assert_eq!(
762 StoredEvent {
763 event: event_data,
764 execution_counter: None
765 },
766 serde_json::from_str(event_data_json).unwrap()
767 );
768 }
769
770 #[test]
771 fn doesnt_record_when_upload_is_disabled() {
772 let (mut glean, dir) = new_glean(None);
773 let db = EventDatabase::new(dir.path()).unwrap();
774
775 let test_storage = "store1";
776 let test_category = "category";
777 let test_name = "name";
778 let test_timestamp = 2;
779 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
780 let event_data = RecordedEvent {
781 timestamp: test_timestamp,
782 category: test_category.to_string(),
783 name: test_name.to_string(),
784 extra: None,
785 };
786
787 db.record(&glean, &test_meta, 2, None);
790 {
791 let event_stores = db.event_stores.read().unwrap();
792 assert_eq!(
793 &StoredEvent {
794 event: event_data,
795 execution_counter: None
796 },
797 &event_stores.get(test_storage).unwrap()[0]
798 );
799 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
800 }
801
802 glean.set_upload_enabled(false);
803
804 db.record(&glean, &test_meta, 2, None);
806 {
807 let event_stores = db.event_stores.read().unwrap();
808 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
809 }
810 }
811
812 #[test]
813 fn normalize_store_of_glean_restarted() {
814 let (glean, _dir) = new_glean(None);
816
817 let store_name = "store-name";
818 let glean_restarted = StoredEvent {
819 event: RecordedEvent {
820 timestamp: 2,
821 category: "glean".into(),
822 name: "restarted".into(),
823 extra: None,
824 },
825 execution_counter: None,
826 };
827 let mut store = vec![glean_restarted.clone()];
828 let glean_start_time = glean.start_time();
829
830 glean
831 .event_storage()
832 .normalize_store(&glean, store_name, &mut store, glean_start_time);
833 assert!(store.is_empty());
834
835 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
836 glean
837 .event_storage()
838 .normalize_store(&glean, store_name, &mut store, glean_start_time);
839 assert!(store.is_empty());
840
841 let mut store = vec![
842 glean_restarted.clone(),
843 glean_restarted.clone(),
844 glean_restarted,
845 ];
846 glean
847 .event_storage()
848 .normalize_store(&glean, store_name, &mut store, glean_start_time);
849 assert!(store.is_empty());
850 }
851
852 #[test]
853 fn normalize_store_of_glean_restarted_on_both_ends() {
854 let (glean, _dir) = new_glean(None);
856
857 let store_name = "store-name";
858 let glean_restarted = StoredEvent {
859 event: RecordedEvent {
860 timestamp: 2,
861 category: "glean".into(),
862 name: "restarted".into(),
863 extra: None,
864 },
865 execution_counter: None,
866 };
867 let not_glean_restarted = StoredEvent {
868 event: RecordedEvent {
869 timestamp: 20,
870 category: "category".into(),
871 name: "name".into(),
872 extra: None,
873 },
874 execution_counter: None,
875 };
876 let mut store = vec![
877 glean_restarted.clone(),
878 not_glean_restarted.clone(),
879 glean_restarted,
880 ];
881 let glean_start_time = glean.start_time();
882
883 glean
884 .event_storage()
885 .normalize_store(&glean, store_name, &mut store, glean_start_time);
886 assert_eq!(1, store.len());
887 assert_eq!(
888 StoredEvent {
889 event: RecordedEvent {
890 timestamp: 0,
891 ..not_glean_restarted.event
892 },
893 execution_counter: None
894 },
895 store[0]
896 );
897 }
898
899 #[test]
900 fn normalize_store_single_run_timestamp_math() {
901 let (glean, _dir) = new_glean(None);
905
906 let store_name = "store-name";
907 let glean_restarted = StoredEvent {
908 event: RecordedEvent {
909 timestamp: 2,
910 category: "glean".into(),
911 name: "restarted".into(),
912 extra: None,
913 },
914 execution_counter: None,
915 };
916 let timestamps = [20, 40, 200];
917 let not_glean_restarted = StoredEvent {
918 event: RecordedEvent {
919 timestamp: timestamps[0],
920 category: "category".into(),
921 name: "name".into(),
922 extra: None,
923 },
924 execution_counter: None,
925 };
926 let mut store = vec![
927 glean_restarted.clone(),
928 not_glean_restarted.clone(),
929 StoredEvent {
930 event: RecordedEvent {
931 timestamp: timestamps[1],
932 ..not_glean_restarted.event.clone()
933 },
934 execution_counter: None,
935 },
936 StoredEvent {
937 event: RecordedEvent {
938 timestamp: timestamps[2],
939 ..not_glean_restarted.event.clone()
940 },
941 execution_counter: None,
942 },
943 glean_restarted,
944 ];
945
946 glean
947 .event_storage()
948 .normalize_store(&glean, store_name, &mut store, glean.start_time());
949 assert_eq!(3, store.len());
950 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
951 assert_eq!(
952 &StoredEvent {
953 event: RecordedEvent {
954 timestamp: timestamp - timestamps[0],
955 ..not_glean_restarted.clone().event
956 },
957 execution_counter: None
958 },
959 event
960 );
961 }
962 }
963
964 #[test]
965 fn normalize_store_multi_run_timestamp_math() {
966 let (glean, _dir) = new_glean(None);
971
972 let store_name = "store-name";
973 let glean_restarted = StoredEvent {
974 event: RecordedEvent {
975 category: "glean".into(),
976 name: "restarted".into(),
977 ..Default::default()
978 },
979 execution_counter: None,
980 };
981 let not_glean_restarted = StoredEvent {
982 event: RecordedEvent {
983 category: "category".into(),
984 name: "name".into(),
985 ..Default::default()
986 },
987 execution_counter: None,
988 };
989
990 let timestamps = [20, 40, 200, 12];
993 let ecs = [0, 1];
994 let some_hour = 16;
995 let startup_date = FixedOffset::east(0)
996 .ymd(2022, 11, 24)
997 .and_hms(some_hour, 29, 0); let glean_start_time = startup_date.with_hour(some_hour - 1);
999 let restarted_ts = 2;
1000 let mut store = vec![
1001 StoredEvent {
1002 event: RecordedEvent {
1003 timestamp: timestamps[0],
1004 ..not_glean_restarted.event.clone()
1005 },
1006 execution_counter: Some(ecs[0]),
1007 },
1008 StoredEvent {
1009 event: RecordedEvent {
1010 timestamp: timestamps[1],
1011 ..not_glean_restarted.event.clone()
1012 },
1013 execution_counter: Some(ecs[0]),
1014 },
1015 StoredEvent {
1016 event: RecordedEvent {
1017 timestamp: timestamps[2],
1018 ..not_glean_restarted.event.clone()
1019 },
1020 execution_counter: Some(ecs[0]),
1021 },
1022 StoredEvent {
1023 event: RecordedEvent {
1024 extra: Some(
1025 [(
1026 "glean.startup.date".into(),
1027 get_iso_time_string(startup_date, TimeUnit::Minute),
1028 )]
1029 .into(),
1030 ),
1031 timestamp: restarted_ts,
1032 ..glean_restarted.event.clone()
1033 },
1034 execution_counter: Some(ecs[1]),
1035 },
1036 StoredEvent {
1037 event: RecordedEvent {
1038 timestamp: timestamps[3],
1039 ..not_glean_restarted.event.clone()
1040 },
1041 execution_counter: Some(ecs[1]),
1042 },
1043 ];
1044
1045 glean.event_storage().normalize_store(
1046 &glean,
1047 store_name,
1048 &mut store,
1049 glean_start_time.unwrap(),
1050 );
1051 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1055 assert_eq!(
1056 StoredEvent {
1057 event: RecordedEvent {
1058 timestamp: timestamp - timestamps[0],
1059 ..not_glean_restarted.event.clone()
1060 },
1061 execution_counter: None,
1062 },
1063 event
1064 );
1065 }
1066 let hour_in_millis = 3600000;
1068 assert_eq!(
1069 store[3],
1070 StoredEvent {
1071 event: RecordedEvent {
1072 timestamp: hour_in_millis,
1073 ..glean_restarted.event
1074 },
1075 execution_counter: None,
1076 }
1077 );
1078 assert_eq!(
1080 store[4],
1081 StoredEvent {
1082 event: RecordedEvent {
1083 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1084 ..not_glean_restarted.event
1085 },
1086 execution_counter: None,
1087 }
1088 );
1089 }
1090
1091 #[test]
1092 fn normalize_store_multi_run_client_clocks() {
1093 let (glean, _dir) = new_glean(None);
1096
1097 let store_name = "store-name";
1098 let glean_restarted = StoredEvent {
1099 event: RecordedEvent {
1100 category: "glean".into(),
1101 name: "restarted".into(),
1102 ..Default::default()
1103 },
1104 execution_counter: None,
1105 };
1106 let not_glean_restarted = StoredEvent {
1107 event: RecordedEvent {
1108 category: "category".into(),
1109 name: "name".into(),
1110 ..Default::default()
1111 },
1112 execution_counter: None,
1113 };
1114
1115 let timestamps = [20, 40, 12, 200];
1118 let ecs = [0, 1];
1119 let some_hour = 10;
1120 let startup_date = FixedOffset::east(0)
1121 .ymd(2022, 11, 25)
1122 .and_hms(some_hour, 37, 0); let glean_start_time = startup_date.with_hour(some_hour + 1);
1124 let restarted_ts = 2;
1125 let mut store = vec![
1126 StoredEvent {
1127 event: RecordedEvent {
1128 timestamp: timestamps[0],
1129 ..not_glean_restarted.event.clone()
1130 },
1131 execution_counter: Some(ecs[0]),
1132 },
1133 StoredEvent {
1134 event: RecordedEvent {
1135 timestamp: timestamps[1],
1136 ..not_glean_restarted.event.clone()
1137 },
1138 execution_counter: Some(ecs[0]),
1139 },
1140 StoredEvent {
1141 event: RecordedEvent {
1142 extra: Some(
1143 [(
1144 "glean.startup.date".into(),
1145 get_iso_time_string(startup_date, TimeUnit::Minute),
1146 )]
1147 .into(),
1148 ),
1149 timestamp: restarted_ts,
1150 ..glean_restarted.event.clone()
1151 },
1152 execution_counter: Some(ecs[1]),
1153 },
1154 StoredEvent {
1155 event: RecordedEvent {
1156 timestamp: timestamps[2],
1157 ..not_glean_restarted.event.clone()
1158 },
1159 execution_counter: Some(ecs[1]),
1160 },
1161 StoredEvent {
1162 event: RecordedEvent {
1163 timestamp: timestamps[3],
1164 ..not_glean_restarted.event.clone()
1165 },
1166 execution_counter: Some(ecs[1]),
1167 },
1168 ];
1169
1170 glean.event_storage().normalize_store(
1171 &glean,
1172 store_name,
1173 &mut store,
1174 glean_start_time.unwrap(),
1175 );
1176 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1180 assert_eq!(
1181 StoredEvent {
1182 event: RecordedEvent {
1183 timestamp: timestamp - timestamps[0],
1184 ..not_glean_restarted.event.clone()
1185 },
1186 execution_counter: None,
1187 },
1188 event
1189 );
1190 }
1191 assert_eq!(
1195 store[2],
1196 StoredEvent {
1197 event: RecordedEvent {
1198 timestamp: store[1].event.timestamp + 1,
1199 ..glean_restarted.event
1200 },
1201 execution_counter: None,
1202 }
1203 );
1204 assert_eq!(
1206 store[3],
1207 StoredEvent {
1208 event: RecordedEvent {
1209 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1210 ..not_glean_restarted.event
1211 },
1212 execution_counter: None,
1213 }
1214 );
1215 assert_eq!(
1217 Ok(1),
1218 test_get_num_recorded_errors(
1219 &glean,
1220 &CommonMetricData {
1221 name: "restarted".into(),
1222 category: "glean".into(),
1223 send_in_pings: vec![store_name.into()],
1224 lifetime: Lifetime::Ping,
1225 ..Default::default()
1226 }
1227 .into(),
1228 ErrorType::InvalidValue
1229 )
1230 );
1231 }
1232
1233 #[test]
1234 fn normalize_store_non_zero_ec() {
1235 let (glean, _dir) = new_glean(None);
1238
1239 let store_name = "store-name";
1240 let glean_restarted = StoredEvent {
1241 event: RecordedEvent {
1242 timestamp: 2,
1243 category: "glean".into(),
1244 name: "restarted".into(),
1245 extra: None,
1246 },
1247 execution_counter: Some(2),
1248 };
1249 let not_glean_restarted = StoredEvent {
1250 event: RecordedEvent {
1251 timestamp: 20,
1252 category: "category".into(),
1253 name: "name".into(),
1254 extra: None,
1255 },
1256 execution_counter: Some(2),
1257 };
1258 let glean_restarted_2 = StoredEvent {
1259 event: RecordedEvent {
1260 timestamp: 2,
1261 category: "glean".into(),
1262 name: "restarted".into(),
1263 extra: None,
1264 },
1265 execution_counter: Some(3),
1266 };
1267 let mut store = vec![
1268 glean_restarted,
1269 not_glean_restarted.clone(),
1270 glean_restarted_2,
1271 ];
1272 let glean_start_time = glean.start_time();
1273
1274 glean
1275 .event_storage()
1276 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1277
1278 assert_eq!(1, store.len());
1279 assert_eq!(
1280 StoredEvent {
1281 event: RecordedEvent {
1282 timestamp: 0,
1283 ..not_glean_restarted.event
1284 },
1285 execution_counter: None
1286 },
1287 store[0]
1288 );
1289 assert!(test_get_num_recorded_errors(
1291 &glean,
1292 &CommonMetricData {
1293 name: "restarted".into(),
1294 category: "glean".into(),
1295 send_in_pings: vec![store_name.into()],
1296 lifetime: Lifetime::Ping,
1297 ..Default::default()
1298 }
1299 .into(),
1300 ErrorType::InvalidState
1301 )
1302 .is_err());
1303 assert!(test_get_num_recorded_errors(
1305 &glean,
1306 &CommonMetricData {
1307 name: "restarted".into(),
1308 category: "glean".into(),
1309 send_in_pings: vec![store_name.into()],
1310 lifetime: Lifetime::Ping,
1311 ..Default::default()
1312 }
1313 .into(),
1314 ErrorType::InvalidValue
1315 )
1316 .is_err());
1317 }
1318}