1use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::coverage::record_coverage;
25use crate::error_recording::{record_error, ErrorType};
26use crate::metrics::{DatetimeMetric, TimeUnit};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37 pub timestamp: u64,
41
42 pub category: String,
46
47 pub name: String,
51
52 #[serde(skip_serializing_if = "Option::is_none")]
56 pub extra: Option<HashMap<String, String>>,
57}
58
59#[derive(
61 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
62)]
63struct StoredEvent {
64 #[serde(flatten)]
65 event: RecordedEvent,
66
67 #[serde(default)]
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub execution_counter: Option<i32>,
74}
75
76#[derive(Debug)]
98pub struct EventDatabase {
99 pub path: PathBuf,
101 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
103 event_store_files: RwLock<HashMap<String, Arc<File>>>,
104 file_lock: Mutex<()>,
106}
107
108impl MallocSizeOf for EventDatabase {
109 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
110 let mut n = 0;
111 n += self.event_stores.read().unwrap().size_of(ops);
112
113 let map = self.event_store_files.read().unwrap();
114 for store_name in map.keys() {
115 n += store_name.size_of(ops);
116 n += mem::size_of::<File>();
118 }
119 n
120 }
121}
122
123impl EventDatabase {
124 pub fn new(data_path: &Path) -> Result<Self> {
131 let path = data_path.join("events");
132 create_dir_all(&path)?;
133
134 Ok(Self {
135 path,
136 event_stores: RwLock::new(HashMap::new()),
137 event_store_files: RwLock::new(HashMap::new()),
138 file_lock: Mutex::new(()),
139 })
140 }
141
142 pub fn flush_pending_events_on_startup(
168 &self,
169 glean: &Glean,
170 trim_data_to_registered_pings: bool,
171 ) -> bool {
172 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
173 Ok(_) => {
174 let stores_with_events: Vec<String> = {
175 self.event_stores
176 .read()
177 .unwrap()
178 .keys()
179 .map(|x| x.to_owned())
180 .collect() };
182 let has_events_events = stores_with_events.contains(&"events".to_owned());
185 let glean_restarted_stores = if has_events_events {
186 stores_with_events
187 .into_iter()
188 .filter(|store| store != "events")
189 .collect()
190 } else {
191 stores_with_events
192 };
193 if !glean_restarted_stores.is_empty() {
194 for store_name in glean_restarted_stores.iter() {
195 CounterMetric::new(CommonMetricData {
196 name: "execution_counter".into(),
197 category: store_name.into(),
198 send_in_pings: vec![INTERNAL_STORAGE.into()],
199 lifetime: Lifetime::Ping,
200 ..Default::default()
201 })
202 .add_sync(glean, 1);
203 }
204 let glean_restarted = CommonMetricData {
205 name: "restarted".into(),
206 category: "glean".into(),
207 send_in_pings: glean_restarted_stores,
208 lifetime: Lifetime::Ping,
209 ..Default::default()
210 };
211 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
212 let mut extra: HashMap<String, String> =
213 [("glean.startup.date".into(), startup)].into();
214 if glean.with_timestamps() {
215 let now = Utc::now();
216 let precise_timestamp = now.timestamp_millis() as u64;
217 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
218 }
219 self.record(
220 glean,
221 &glean_restarted.into(),
222 crate::get_timestamp_ms(),
223 Some(extra),
224 );
225 }
226 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
227 }
228 Err(err) => {
229 log::warn!("Error loading events from disk: {}", err);
230 false
231 }
232 }
233 }
234
235 fn load_events_from_disk(
236 &self,
237 glean: &Glean,
238 trim_data_to_registered_pings: bool,
239 ) -> Result<()> {
240 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
248 let entry = entry?;
249 if entry.file_type()?.is_file() {
250 let store_name = entry.file_name().into_string()?;
251 log::info!("Loading events for {}", store_name);
252 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
253 log::warn!("Trimming {}'s events", store_name);
254 if let Err(err) = fs::remove_file(entry.path()) {
255 match err.kind() {
256 std::io::ErrorKind::NotFound => {
257 }
259 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
260 }
261 }
262 continue;
263 }
264 let file = BufReader::new(File::open(entry.path())?);
265 db.insert(
266 store_name,
267 file.lines()
268 .map_while(Result::ok)
269 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
270 .collect(),
271 );
272 }
273 }
274 Ok(())
275 }
276
277 pub fn record(
294 &self,
295 glean: &Glean,
296 meta: &CommonMetricDataInternal,
297 timestamp: u64,
298 extra: Option<HashMap<String, String>>,
299 ) -> bool {
300 if !glean.is_upload_enabled() {
302 return false;
303 }
304
305 let mut submit_max_capacity_event_ping = false;
306 {
307 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
309 if !glean.is_ping_enabled(store_name) {
310 continue;
311 }
312
313 let store = db.entry(store_name.to_string()).or_default();
314 let execution_counter = CounterMetric::new(CommonMetricData {
315 name: "execution_counter".into(),
316 category: store_name.into(),
317 send_in_pings: vec![INTERNAL_STORAGE.into()],
318 lifetime: Lifetime::Ping,
319 ..Default::default()
320 })
321 .get_value(glean, INTERNAL_STORAGE);
322 let event = StoredEvent {
324 event: RecordedEvent {
325 timestamp,
326 category: meta.inner.category.to_string(),
327 name: meta.inner.name.to_string(),
328 extra: extra.clone(),
329 },
330 execution_counter,
331 };
332 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
334 self.write_event_to_disk(store_name, &event_json);
335 if store_name == "events" && store.len() == glean.get_max_events() {
336 submit_max_capacity_event_ping = true;
337 }
338 }
339 }
340 if submit_max_capacity_event_ping {
341 glean.submit_ping_by_name("events", Some("max_capacity"));
342 true
343 } else {
344 false
345 }
346 }
347
348 fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
349 let mut map = self.event_store_files.write().unwrap();
351 let entry = map.entry(store_name.to_string());
352
353 match entry {
354 Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
355 Entry::Vacant(entry) => {
356 let file = OpenOptions::new()
357 .create(true)
358 .append(true)
359 .open(self.path.join(store_name))?;
360 let file = Arc::new(file);
361 let entry = entry.insert(file);
362 Ok(Arc::clone(entry))
363 }
364 }
365 }
366
367 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
374 let _lock = self.file_lock.lock().unwrap(); let write_res = (|| {
377 let mut file = self.get_event_store(store_name)?;
378 file.write_all(event_json.as_bytes())?;
379 file.write_all(b"\n")?;
380 file.flush()?;
381 Ok::<(), std::io::Error>(())
382 })();
383
384 if let Err(err) = write_res {
385 log::warn!("IO error writing event to store '{}': {}", store_name, err);
386 }
387 }
388
389 fn normalize_store(
418 &self,
419 glean: &Glean,
420 store_name: &str,
421 store: &mut Vec<StoredEvent>,
422 glean_start_time: DateTime<FixedOffset>,
423 ) {
424 let is_glean_restarted =
425 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
426 let glean_restarted_meta = |store_name: &str| CommonMetricData {
427 name: "restarted".into(),
428 category: "glean".into(),
429 send_in_pings: vec![store_name.into()],
430 lifetime: Lifetime::Ping,
431 ..Default::default()
432 };
433 store.sort_by(|a, b| {
435 a.execution_counter
436 .cmp(&b.execution_counter)
437 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
438 .then_with(|| {
439 if is_glean_restarted(&a.event) {
440 Ordering::Less
441 } else {
442 Ordering::Greater
443 }
444 })
445 });
446 let final_event = match store
450 .iter()
451 .rposition(|event| !is_glean_restarted(&event.event))
452 {
453 Some(idx) => idx + 1,
454 _ => 0,
455 };
456 store.drain(final_event..);
457 let first_event = store
458 .iter()
459 .position(|event| !is_glean_restarted(&event.event))
460 .unwrap_or(store.len());
461 store.drain(..first_event);
462 if store.is_empty() {
463 return;
465 }
466 let mut cur_ec = 0;
472 let mut intra_group_offset = store[0].event.timestamp;
474 let mut inter_group_offset = 0;
476 let mut highest_ts = 0;
477 for event in store.iter_mut() {
478 let execution_counter = event.execution_counter.take().unwrap_or(0);
479 if is_glean_restarted(&event.event) {
480 cur_ec = execution_counter;
483 let glean_startup_date = event
484 .event
485 .extra
486 .as_mut()
487 .and_then(|extra| {
488 extra.remove("glean.startup.date").and_then(|date_str| {
489 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
490 .map_err(|_| {
491 record_error(
492 glean,
493 &glean_restarted_meta(store_name).into(),
494 ErrorType::InvalidState,
495 format!("Unparseable glean.startup.date '{}'", date_str),
496 None,
497 );
498 })
499 .ok()
500 })
501 })
502 .unwrap_or(glean_start_time);
503 if event
504 .event
505 .extra
506 .as_ref()
507 .is_some_and(|extra| extra.is_empty())
508 {
509 event.event.extra = None;
511 }
512 let ping_start = DatetimeMetric::new(
513 CommonMetricData {
514 name: format!("{}#start", store_name),
515 category: "".into(),
516 send_in_pings: vec![INTERNAL_STORAGE.into()],
517 lifetime: Lifetime::User,
518 ..Default::default()
519 },
520 TimeUnit::Minute,
521 );
522 let ping_start = ping_start
523 .get_value(glean, INTERNAL_STORAGE)
524 .unwrap_or(glean_start_time);
525 let time_from_ping_start_to_glean_restarted =
526 (glean_startup_date - ping_start).num_milliseconds();
527 intra_group_offset = event.event.timestamp;
528 inter_group_offset =
529 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
530 if inter_group_offset < highest_ts {
531 record_error(
532 glean,
533 &glean_restarted_meta(store_name).into(),
534 ErrorType::InvalidValue,
535 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
536 None,
537 );
538 inter_group_offset = highest_ts + 1;
543 }
544 } else if cur_ec == 0 {
545 cur_ec = execution_counter;
547 }
548 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
549 if execution_counter != cur_ec {
550 record_error(
551 glean,
552 &glean_restarted_meta(store_name).into(),
553 ErrorType::InvalidState,
554 format!(
555 "Inconsistent execution counter {} (expected {})",
556 execution_counter, cur_ec
557 ),
558 None,
559 );
560 cur_ec = execution_counter;
562 }
563 if highest_ts > event.event.timestamp {
564 record_error(
567 glean,
568 &glean_restarted_meta(store_name).into(),
569 ErrorType::InvalidState,
570 format!(
571 "Inconsistent previous highest timestamp {} (expected <= {})",
572 highest_ts, event.event.timestamp
573 ),
574 None,
575 );
576 }
578 highest_ts = event.event.timestamp
579 }
580 }
581
582 pub fn snapshot_as_json(
594 &self,
595 glean: &Glean,
596 store_name: &str,
597 clear_store: bool,
598 ) -> Option<JsonValue> {
599 let result = {
600 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
602 if !store.is_empty() {
603 let mut clone;
606 let store = if clear_store {
607 store
608 } else {
609 clone = store.clone();
610 &mut clone
611 };
612 self.normalize_store(glean, store_name, store, glean.start_time());
614 Some(json!(store))
615 } else {
616 log::warn!("Unexpectly got empty event store for '{}'", store_name);
617 None
618 }
619 })
620 };
621
622 if clear_store {
623 self.event_stores
624 .write()
625 .unwrap() .remove(&store_name.to_string());
627 self.event_store_files
628 .write()
629 .unwrap() .remove(&store_name.to_string());
631
632 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
634 match err.kind() {
635 std::io::ErrorKind::NotFound => {
636 }
638 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
639 }
640 }
641 }
642
643 result
644 }
645
646 pub fn clear_all(&self) -> Result<()> {
648 self.event_stores.write().unwrap().clear();
650 self.event_store_files.write().unwrap().clear();
651
652 let _lock = self.file_lock.lock().unwrap();
654 std::fs::remove_dir_all(&self.path)?;
655 create_dir_all(&self.path)?;
656
657 Ok(())
658 }
659
660 pub fn test_get_value<'a>(
667 &'a self,
668 meta: &'a CommonMetricDataInternal,
669 store_name: &str,
670 ) -> Option<Vec<RecordedEvent>> {
671 record_coverage(&meta.base_identifier());
672
673 let value: Vec<RecordedEvent> = self
674 .event_stores
675 .read()
676 .unwrap() .get(&store_name.to_string())
678 .into_iter()
679 .flatten()
680 .map(|stored_event| stored_event.event.clone())
681 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
682 .collect();
683 if !value.is_empty() {
684 Some(value)
685 } else {
686 None
687 }
688 }
689}
690
691#[cfg(test)]
692mod test {
693 use super::*;
694 use crate::test_get_num_recorded_errors;
695 use crate::tests::new_glean;
696 use chrono::{TimeZone, Timelike};
697
698 #[test]
699 fn handle_truncated_events_on_disk() {
700 let (glean, t) = new_glean(None);
701
702 {
703 let db = EventDatabase::new(t.path()).unwrap();
704 db.write_event_to_disk("events", "{\"timestamp\": 500");
705 db.write_event_to_disk("events", "{\"timestamp\"");
706 db.write_event_to_disk(
707 "events",
708 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
709 );
710 }
711
712 {
713 let db = EventDatabase::new(t.path()).unwrap();
714 db.load_events_from_disk(&glean, false).unwrap();
715 let events = &db.event_stores.read().unwrap()["events"];
716 assert_eq!(1, events.len());
717 }
718 }
719
720 #[test]
721 fn stable_serialization() {
722 let event_empty = RecordedEvent {
723 timestamp: 2,
724 category: "cat".to_string(),
725 name: "name".to_string(),
726 extra: None,
727 };
728
729 let mut data = HashMap::new();
730 data.insert("a key".to_string(), "a value".to_string());
731 let event_data = RecordedEvent {
732 timestamp: 2,
733 category: "cat".to_string(),
734 name: "name".to_string(),
735 extra: Some(data),
736 };
737
738 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
739 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
740
741 assert_eq!(
742 StoredEvent {
743 event: event_empty,
744 execution_counter: None
745 },
746 serde_json::from_str(&event_empty_json).unwrap()
747 );
748 assert_eq!(
749 StoredEvent {
750 event: event_data,
751 execution_counter: None
752 },
753 serde_json::from_str(&event_data_json).unwrap()
754 );
755 }
756
757 #[test]
758 fn deserialize_existing_data() {
759 let event_empty_json = r#"
760{
761 "timestamp": 2,
762 "category": "cat",
763 "name": "name"
764}
765 "#;
766
767 let event_data_json = r#"
768{
769 "timestamp": 2,
770 "category": "cat",
771 "name": "name",
772 "extra": {
773 "a key": "a value"
774 }
775}
776 "#;
777
778 let event_empty = RecordedEvent {
779 timestamp: 2,
780 category: "cat".to_string(),
781 name: "name".to_string(),
782 extra: None,
783 };
784
785 let mut data = HashMap::new();
786 data.insert("a key".to_string(), "a value".to_string());
787 let event_data = RecordedEvent {
788 timestamp: 2,
789 category: "cat".to_string(),
790 name: "name".to_string(),
791 extra: Some(data),
792 };
793
794 assert_eq!(
795 StoredEvent {
796 event: event_empty,
797 execution_counter: None
798 },
799 serde_json::from_str(event_empty_json).unwrap()
800 );
801 assert_eq!(
802 StoredEvent {
803 event: event_data,
804 execution_counter: None
805 },
806 serde_json::from_str(event_data_json).unwrap()
807 );
808 }
809
810 #[test]
811 fn doesnt_record_when_upload_is_disabled() {
812 let (mut glean, dir) = new_glean(None);
813 let db = EventDatabase::new(dir.path()).unwrap();
814
815 let test_storage = "store1";
816 let test_category = "category";
817 let test_name = "name";
818 let test_timestamp = 2;
819 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
820 let event_data = RecordedEvent {
821 timestamp: test_timestamp,
822 category: test_category.to_string(),
823 name: test_name.to_string(),
824 extra: None,
825 };
826
827 db.record(&glean, &test_meta, 2, None);
830 {
831 let event_stores = db.event_stores.read().unwrap();
832 assert_eq!(
833 &StoredEvent {
834 event: event_data,
835 execution_counter: None
836 },
837 &event_stores.get(test_storage).unwrap()[0]
838 );
839 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
840 }
841
842 glean.set_upload_enabled(false);
843
844 db.record(&glean, &test_meta, 2, None);
846 {
847 let event_stores = db.event_stores.read().unwrap();
848 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
849 }
850 }
851
852 #[test]
853 fn normalize_store_of_glean_restarted() {
854 let (glean, _dir) = new_glean(None);
856
857 let store_name = "store-name";
858 let glean_restarted = StoredEvent {
859 event: RecordedEvent {
860 timestamp: 2,
861 category: "glean".into(),
862 name: "restarted".into(),
863 extra: None,
864 },
865 execution_counter: None,
866 };
867 let mut store = vec![glean_restarted.clone()];
868 let glean_start_time = glean.start_time();
869
870 glean
871 .event_storage()
872 .normalize_store(&glean, store_name, &mut store, glean_start_time);
873 assert!(store.is_empty());
874
875 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
876 glean
877 .event_storage()
878 .normalize_store(&glean, store_name, &mut store, glean_start_time);
879 assert!(store.is_empty());
880
881 let mut store = vec![
882 glean_restarted.clone(),
883 glean_restarted.clone(),
884 glean_restarted,
885 ];
886 glean
887 .event_storage()
888 .normalize_store(&glean, store_name, &mut store, glean_start_time);
889 assert!(store.is_empty());
890 }
891
892 #[test]
893 fn normalize_store_of_glean_restarted_on_both_ends() {
894 let (glean, _dir) = new_glean(None);
896
897 let store_name = "store-name";
898 let glean_restarted = StoredEvent {
899 event: RecordedEvent {
900 timestamp: 2,
901 category: "glean".into(),
902 name: "restarted".into(),
903 extra: None,
904 },
905 execution_counter: None,
906 };
907 let not_glean_restarted = StoredEvent {
908 event: RecordedEvent {
909 timestamp: 20,
910 category: "category".into(),
911 name: "name".into(),
912 extra: None,
913 },
914 execution_counter: None,
915 };
916 let mut store = vec![
917 glean_restarted.clone(),
918 not_glean_restarted.clone(),
919 glean_restarted,
920 ];
921 let glean_start_time = glean.start_time();
922
923 glean
924 .event_storage()
925 .normalize_store(&glean, store_name, &mut store, glean_start_time);
926 assert_eq!(1, store.len());
927 assert_eq!(
928 StoredEvent {
929 event: RecordedEvent {
930 timestamp: 0,
931 ..not_glean_restarted.event
932 },
933 execution_counter: None
934 },
935 store[0]
936 );
937 }
938
939 #[test]
940 fn normalize_store_single_run_timestamp_math() {
941 let (glean, _dir) = new_glean(None);
945
946 let store_name = "store-name";
947 let glean_restarted = StoredEvent {
948 event: RecordedEvent {
949 timestamp: 2,
950 category: "glean".into(),
951 name: "restarted".into(),
952 extra: None,
953 },
954 execution_counter: None,
955 };
956 let timestamps = [20, 40, 200];
957 let not_glean_restarted = StoredEvent {
958 event: RecordedEvent {
959 timestamp: timestamps[0],
960 category: "category".into(),
961 name: "name".into(),
962 extra: None,
963 },
964 execution_counter: None,
965 };
966 let mut store = vec![
967 glean_restarted.clone(),
968 not_glean_restarted.clone(),
969 StoredEvent {
970 event: RecordedEvent {
971 timestamp: timestamps[1],
972 ..not_glean_restarted.event.clone()
973 },
974 execution_counter: None,
975 },
976 StoredEvent {
977 event: RecordedEvent {
978 timestamp: timestamps[2],
979 ..not_glean_restarted.event.clone()
980 },
981 execution_counter: None,
982 },
983 glean_restarted,
984 ];
985
986 glean
987 .event_storage()
988 .normalize_store(&glean, store_name, &mut store, glean.start_time());
989 assert_eq!(3, store.len());
990 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
991 assert_eq!(
992 &StoredEvent {
993 event: RecordedEvent {
994 timestamp: timestamp - timestamps[0],
995 ..not_glean_restarted.clone().event
996 },
997 execution_counter: None
998 },
999 event
1000 );
1001 }
1002 }
1003
1004 #[test]
1005 fn normalize_store_multi_run_timestamp_math() {
1006 let (glean, _dir) = new_glean(None);
1011
1012 let store_name = "store-name";
1013 let glean_restarted = StoredEvent {
1014 event: RecordedEvent {
1015 category: "glean".into(),
1016 name: "restarted".into(),
1017 ..Default::default()
1018 },
1019 execution_counter: None,
1020 };
1021 let not_glean_restarted = StoredEvent {
1022 event: RecordedEvent {
1023 category: "category".into(),
1024 name: "name".into(),
1025 ..Default::default()
1026 },
1027 execution_counter: None,
1028 };
1029
1030 let timestamps = [20, 40, 200, 12];
1033 let ecs = [0, 1];
1034 let some_hour = 16;
1035 let startup_date = FixedOffset::east_opt(0)
1036 .unwrap()
1037 .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1039 let glean_start_time = startup_date.with_hour(some_hour - 1);
1040 let restarted_ts = 2;
1041 let mut store = vec![
1042 StoredEvent {
1043 event: RecordedEvent {
1044 timestamp: timestamps[0],
1045 ..not_glean_restarted.event.clone()
1046 },
1047 execution_counter: Some(ecs[0]),
1048 },
1049 StoredEvent {
1050 event: RecordedEvent {
1051 timestamp: timestamps[1],
1052 ..not_glean_restarted.event.clone()
1053 },
1054 execution_counter: Some(ecs[0]),
1055 },
1056 StoredEvent {
1057 event: RecordedEvent {
1058 timestamp: timestamps[2],
1059 ..not_glean_restarted.event.clone()
1060 },
1061 execution_counter: Some(ecs[0]),
1062 },
1063 StoredEvent {
1064 event: RecordedEvent {
1065 extra: Some(
1066 [(
1067 "glean.startup.date".into(),
1068 get_iso_time_string(startup_date, TimeUnit::Minute),
1069 )]
1070 .into(),
1071 ),
1072 timestamp: restarted_ts,
1073 ..glean_restarted.event.clone()
1074 },
1075 execution_counter: Some(ecs[1]),
1076 },
1077 StoredEvent {
1078 event: RecordedEvent {
1079 timestamp: timestamps[3],
1080 ..not_glean_restarted.event.clone()
1081 },
1082 execution_counter: Some(ecs[1]),
1083 },
1084 ];
1085
1086 glean.event_storage().normalize_store(
1087 &glean,
1088 store_name,
1089 &mut store,
1090 glean_start_time.unwrap(),
1091 );
1092 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1096 assert_eq!(
1097 StoredEvent {
1098 event: RecordedEvent {
1099 timestamp: timestamp - timestamps[0],
1100 ..not_glean_restarted.event.clone()
1101 },
1102 execution_counter: None,
1103 },
1104 event
1105 );
1106 }
1107 let hour_in_millis = 3600000;
1109 assert_eq!(
1110 store[3],
1111 StoredEvent {
1112 event: RecordedEvent {
1113 timestamp: hour_in_millis,
1114 ..glean_restarted.event
1115 },
1116 execution_counter: None,
1117 }
1118 );
1119 assert_eq!(
1121 store[4],
1122 StoredEvent {
1123 event: RecordedEvent {
1124 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1125 ..not_glean_restarted.event
1126 },
1127 execution_counter: None,
1128 }
1129 );
1130 }
1131
1132 #[test]
1133 fn normalize_store_multi_run_client_clocks() {
1134 let (glean, _dir) = new_glean(None);
1137
1138 let store_name = "store-name";
1139 let glean_restarted = StoredEvent {
1140 event: RecordedEvent {
1141 category: "glean".into(),
1142 name: "restarted".into(),
1143 ..Default::default()
1144 },
1145 execution_counter: None,
1146 };
1147 let not_glean_restarted = StoredEvent {
1148 event: RecordedEvent {
1149 category: "category".into(),
1150 name: "name".into(),
1151 ..Default::default()
1152 },
1153 execution_counter: None,
1154 };
1155
1156 let timestamps = [20, 40, 12, 200];
1159 let ecs = [0, 1];
1160 let some_hour = 10;
1161 let startup_date = FixedOffset::east_opt(0)
1162 .unwrap()
1163 .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1165 let glean_start_time = startup_date.with_hour(some_hour + 1);
1166 let restarted_ts = 2;
1167 let mut store = vec![
1168 StoredEvent {
1169 event: RecordedEvent {
1170 timestamp: timestamps[0],
1171 ..not_glean_restarted.event.clone()
1172 },
1173 execution_counter: Some(ecs[0]),
1174 },
1175 StoredEvent {
1176 event: RecordedEvent {
1177 timestamp: timestamps[1],
1178 ..not_glean_restarted.event.clone()
1179 },
1180 execution_counter: Some(ecs[0]),
1181 },
1182 StoredEvent {
1183 event: RecordedEvent {
1184 extra: Some(
1185 [(
1186 "glean.startup.date".into(),
1187 get_iso_time_string(startup_date, TimeUnit::Minute),
1188 )]
1189 .into(),
1190 ),
1191 timestamp: restarted_ts,
1192 ..glean_restarted.event.clone()
1193 },
1194 execution_counter: Some(ecs[1]),
1195 },
1196 StoredEvent {
1197 event: RecordedEvent {
1198 timestamp: timestamps[2],
1199 ..not_glean_restarted.event.clone()
1200 },
1201 execution_counter: Some(ecs[1]),
1202 },
1203 StoredEvent {
1204 event: RecordedEvent {
1205 timestamp: timestamps[3],
1206 ..not_glean_restarted.event.clone()
1207 },
1208 execution_counter: Some(ecs[1]),
1209 },
1210 ];
1211
1212 glean.event_storage().normalize_store(
1213 &glean,
1214 store_name,
1215 &mut store,
1216 glean_start_time.unwrap(),
1217 );
1218 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1222 assert_eq!(
1223 StoredEvent {
1224 event: RecordedEvent {
1225 timestamp: timestamp - timestamps[0],
1226 ..not_glean_restarted.event.clone()
1227 },
1228 execution_counter: None,
1229 },
1230 event
1231 );
1232 }
1233 assert_eq!(
1237 store[2],
1238 StoredEvent {
1239 event: RecordedEvent {
1240 timestamp: store[1].event.timestamp + 1,
1241 ..glean_restarted.event
1242 },
1243 execution_counter: None,
1244 }
1245 );
1246 assert_eq!(
1248 store[3],
1249 StoredEvent {
1250 event: RecordedEvent {
1251 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1252 ..not_glean_restarted.event
1253 },
1254 execution_counter: None,
1255 }
1256 );
1257 assert_eq!(
1259 Ok(1),
1260 test_get_num_recorded_errors(
1261 &glean,
1262 &CommonMetricData {
1263 name: "restarted".into(),
1264 category: "glean".into(),
1265 send_in_pings: vec![store_name.into()],
1266 lifetime: Lifetime::Ping,
1267 ..Default::default()
1268 }
1269 .into(),
1270 ErrorType::InvalidValue
1271 )
1272 );
1273 }
1274
1275 #[test]
1276 fn normalize_store_non_zero_ec() {
1277 let (glean, _dir) = new_glean(None);
1280
1281 let store_name = "store-name";
1282 let glean_restarted = StoredEvent {
1283 event: RecordedEvent {
1284 timestamp: 2,
1285 category: "glean".into(),
1286 name: "restarted".into(),
1287 extra: None,
1288 },
1289 execution_counter: Some(2),
1290 };
1291 let not_glean_restarted = StoredEvent {
1292 event: RecordedEvent {
1293 timestamp: 20,
1294 category: "category".into(),
1295 name: "name".into(),
1296 extra: None,
1297 },
1298 execution_counter: Some(2),
1299 };
1300 let glean_restarted_2 = StoredEvent {
1301 event: RecordedEvent {
1302 timestamp: 2,
1303 category: "glean".into(),
1304 name: "restarted".into(),
1305 extra: None,
1306 },
1307 execution_counter: Some(3),
1308 };
1309 let mut store = vec![
1310 glean_restarted,
1311 not_glean_restarted.clone(),
1312 glean_restarted_2,
1313 ];
1314 let glean_start_time = glean.start_time();
1315
1316 glean
1317 .event_storage()
1318 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1319
1320 assert_eq!(1, store.len());
1321 assert_eq!(
1322 StoredEvent {
1323 event: RecordedEvent {
1324 timestamp: 0,
1325 ..not_glean_restarted.event
1326 },
1327 execution_counter: None
1328 },
1329 store[0]
1330 );
1331 assert!(test_get_num_recorded_errors(
1333 &glean,
1334 &CommonMetricData {
1335 name: "restarted".into(),
1336 category: "glean".into(),
1337 send_in_pings: vec![store_name.into()],
1338 lifetime: Lifetime::Ping,
1339 ..Default::default()
1340 }
1341 .into(),
1342 ErrorType::InvalidState
1343 )
1344 .is_err());
1345 assert!(test_get_num_recorded_errors(
1347 &glean,
1348 &CommonMetricData {
1349 name: "restarted".into(),
1350 category: "glean".into(),
1351 send_in_pings: vec![store_name.into()],
1352 lifetime: Lifetime::Ping,
1353 ..Default::default()
1354 }
1355 .into(),
1356 ErrorType::InvalidValue
1357 )
1358 .is_err());
1359 }
1360}