1use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit};
12use crate::storage::INTERNAL_STORAGE;
13use crate::util::local_now_with_offset;
14use crate::{CommonMetricData, Glean, Lifetime};
15use chrono::prelude::*;
16use chrono::Days;
17use once_cell::sync::Lazy;
18use std::sync::{Arc, Condvar, Mutex};
19use std::thread::JoinHandle;
20
21const SCHEDULED_HOUR: u32 = 4;
22
23#[allow(clippy::mutex_atomic)]
26static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> =
27 Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new())));
28
29trait MetricsPingSubmitter {
32 fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>);
35}
36
37trait MetricsPingScheduler {
40 fn start_scheduler(
44 &self,
45 submitter: impl MetricsPingSubmitter + Send + 'static,
46 now: DateTime<FixedOffset>,
47 when: When,
48 );
49}
50
51struct GleanMetricsPingSubmitter {}
53impl MetricsPingSubmitter for GleanMetricsPingSubmitter {
54 fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) {
55 glean.submit_ping_by_name("metrics", reason);
56 get_last_sent_time_metric().set_sync_chrono(glean, now);
58 }
59}
60
61struct GleanMetricsPingScheduler {}
63impl MetricsPingScheduler for GleanMetricsPingScheduler {
64 fn start_scheduler(
65 &self,
66 submitter: impl MetricsPingSubmitter + Send + 'static,
67 now: DateTime<FixedOffset>,
68 when: When,
69 ) {
70 start_scheduler(submitter, now, when);
71 }
72}
73
74pub fn schedule(glean: &Glean) {
78 let now = local_now_with_offset();
79
80 let (cancelled_lock, _condvar) = &**TASK_CONDVAR;
81 if *cancelled_lock.lock().unwrap() {
82 log::debug!("Told to schedule, but already cancelled. Are we in a test?");
83 }
84 *cancelled_lock.lock().unwrap() = false; let submitter = GleanMetricsPingSubmitter {};
87 let scheduler = GleanMetricsPingScheduler {};
88
89 schedule_internal(glean, submitter, scheduler, now)
90}
91
92pub fn cancel() {
94 let (cancelled_lock, condvar) = &**TASK_CONDVAR; *cancelled_lock.lock().unwrap() = true; condvar.notify_all(); }
98
99fn schedule_internal(
100 glean: &Glean,
101 submitter: impl MetricsPingSubmitter + Send + 'static,
102 scheduler: impl MetricsPingScheduler,
103 now: DateTime<FixedOffset>,
104) {
105 let last_sent_build_metric = get_last_sent_build_metric();
106 if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) {
107 if last_sent_build != glean.app_build {
112 last_sent_build_metric.set_sync(glean, &glean.app_build);
113 log::info!("App build changed. Sending 'metrics' ping");
114 submitter.submit_metrics_ping(glean, Some("upgrade"), now);
115 scheduler.start_scheduler(submitter, now, When::Reschedule);
116 return;
117 }
118 } else {
119 last_sent_build_metric.set_sync(glean, &glean.app_build);
121 }
122
123 let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE);
124 if let Some(last_sent) = last_sent_time {
125 log::info!("The 'metrics' ping was last sent on {}", last_sent);
126 }
127
128 let already_sent_today = last_sent_time.is_some_and(|d| d.date_naive() == now.date_naive());
138 let cutoff_time = now
140 .naive_local()
141 .date()
142 .and_hms_opt(SCHEDULED_HOUR, 0, 0)
143 .unwrap()
144 .and_local_timezone(now.timezone())
145 .unwrap();
146
147 if already_sent_today {
148 log::info!("The 'metrics' ping was already sent today, {}", now);
150 scheduler.start_scheduler(submitter, now, When::Tomorrow);
151 } else if now > cutoff_time {
152 log::info!("Sending the 'metrics' ping immediately, {}", now);
154 submitter.submit_metrics_ping(glean, Some("overdue"), now);
155 scheduler.start_scheduler(submitter, now, When::Reschedule);
156 } else {
157 log::info!("The 'metrics' collection is scheduled for today, {}", now);
159 scheduler.start_scheduler(submitter, now, When::Today);
160 }
161}
162
163#[derive(Debug, PartialEq)]
165enum When {
166 Today,
167 Tomorrow,
168 Reschedule,
169}
170
171impl When {
172 fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration {
176 let now_local = now.naive_local();
177
178 let fire_date = match self {
179 Self::Today => now_local.date().and_hms_opt(SCHEDULED_HOUR, 0, 0).unwrap(),
180 Self::Tomorrow | Self::Reschedule => {
183 let next_day = now_local.checked_add_days(Days::new(1)).unwrap();
184 let next_day_date = next_day.date();
185 next_day_date.and_hms_opt(SCHEDULED_HOUR, 0, 0).unwrap()
186 }
187 };
188
189 (fire_date - now_local).to_std().unwrap_or_else(|_| {
190 std::time::Duration::from_secs(24 * 60 * 60)
193 })
194 }
195
196 fn reason(&self) -> &'static str {
198 match self {
199 Self::Today => "today",
200 Self::Tomorrow => "tomorrow",
201 Self::Reschedule => "reschedule",
202 }
203 }
204}
205
206fn start_scheduler(
207 submitter: impl MetricsPingSubmitter + Send + 'static,
208 now: DateTime<FixedOffset>,
209 when: When,
210) -> JoinHandle<()> {
211 let pair = Arc::clone(&TASK_CONDVAR);
212 crate::thread::spawn("glean.mps", move || {
213 let (cancelled_lock, condvar) = &*pair;
214 let mut when = when;
215 let mut now = now;
216 loop {
217 let dur = when.until(now);
218 log::info!("Scheduling for {} after {:?}, reason {:?}", now, dur, when);
219 let mut timed_out = false;
220 {
221 match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| {
222 !*cancelled
223 }) {
224 Err(err) => {
225 log::warn!("Condvar wait failure. MPS exiting. {}", err);
226 break;
227 }
228 Ok((cancelled, wait_result)) => {
229 if *cancelled {
230 log::info!("Metrics Ping Scheduler cancelled. Exiting.");
231 break;
232 } else if wait_result.timed_out() {
233 timed_out = true;
235 } else {
236 log::warn!("Spurious wakeup of the MPS condvar should be impossible.");
241 }
242 }
243 }
244 }
245 if timed_out {
251 log::info!("Time to submit our metrics ping, {:?}", when);
252 let glean = crate::core::global_glean()
253 .expect(
254 "Global Glean not present when trying to send scheduled 'metrics' ping?!",
255 )
256 .lock()
257 .unwrap();
258 submitter.submit_metrics_ping(&glean, Some(when.reason()), now);
259 when = When::Reschedule;
260 }
261 now = local_now_with_offset();
262 }
263 })
264 .expect("Unable to spawn Metrics Ping Scheduler thread.")
265}
266
267fn get_last_sent_time_metric() -> DatetimeMetric {
268 DatetimeMetric::new(
269 CommonMetricData {
270 name: "last_sent_time".into(),
271 category: "mps".into(),
272 send_in_pings: vec![INTERNAL_STORAGE.into()],
273 lifetime: Lifetime::User,
274 ..Default::default()
275 },
276 TimeUnit::Minute,
277 )
278}
279
280fn get_last_sent_build_metric() -> StringMetric {
281 StringMetric::new(CommonMetricData {
282 name: "last_sent_build".into(),
283 category: "mps".into(),
284 send_in_pings: vec![INTERNAL_STORAGE.into()],
285 lifetime: Lifetime::User,
286 ..Default::default()
287 })
288}
289
290#[cfg(test)]
291mod test {
292 use super::*;
293 use crate::tests::new_glean;
294 use std::sync::atomic::{AtomicU32, Ordering};
295
296 use chrono::Duration;
297
298 struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> {
299 submit_validator: F,
300 validator_run_count: Arc<AtomicU32>,
301 }
302 struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> {
303 schedule_validator: F,
304 validator_run_count: Arc<AtomicU32>,
305 }
306 impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> {
307 fn submit_metrics_ping(
308 &self,
309 _glean: &Glean,
310 reason: Option<&str>,
311 now: DateTime<FixedOffset>,
312 ) {
313 (self.submit_validator)(now, reason);
314 self.validator_run_count.fetch_add(1, Ordering::Relaxed);
315 }
316 }
317 impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> {
318 fn start_scheduler(
319 &self,
320 _submitter: impl MetricsPingSubmitter + Send + 'static,
321 now: DateTime<FixedOffset>,
322 when: When,
323 ) {
324 (self.schedule_validator)(now, when);
325 self.validator_run_count.fetch_add(1, Ordering::Relaxed);
326 }
327 }
328
329 fn new_proxies<
330 F1: Fn(DateTime<FixedOffset>, Option<&str>),
331 F2: Fn(DateTime<FixedOffset>, When),
332 >(
333 submit_validator: F1,
334 schedule_validator: F2,
335 ) -> (
336 ValidatingSubmitter<F1>,
337 Arc<AtomicU32>,
338 ValidatingScheduler<F2>,
339 Arc<AtomicU32>,
340 ) {
341 let submitter_count = Arc::new(AtomicU32::new(0));
342 let submitter = ValidatingSubmitter {
343 submit_validator,
344 validator_run_count: Arc::clone(&submitter_count),
345 };
346 let scheduler_count = Arc::new(AtomicU32::new(0));
347 let scheduler = ValidatingScheduler {
348 schedule_validator,
349 validator_run_count: Arc::clone(&scheduler_count),
350 };
351 (submitter, submitter_count, scheduler, scheduler_count)
352 }
353
354 #[test]
357 fn first_run_last_sent_build() {
358 let (mut glean, _t) = new_glean(None);
359
360 glean.app_build = "a build".into();
361 let lsb_metric = get_last_sent_build_metric();
362 assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)));
363
364 let fake_now = FixedOffset::east_opt(0)
365 .unwrap()
366 .with_ymd_and_hms(2022, 11, 15, SCHEDULED_HOUR, 0, 1)
367 .unwrap();
368
369 let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
370 |_, reason| assert_eq!(reason, Some("overdue")),
371 |_, when| assert_eq!(when, When::Reschedule),
372 );
373
374 schedule_internal(&glean, submitter, scheduler, fake_now);
375 assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
376 assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
377
378 assert_eq!(
379 Some(glean.app_build.to_string()),
380 lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))
381 );
382 }
383
384 #[test]
387 fn different_app_builds_submit_and_reschedule() {
388 let (mut glean, _t) = new_glean(None);
389
390 glean.app_build = "a build".into();
391 get_last_sent_build_metric().set_sync(&glean, "a different build");
392
393 let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
394 |_, reason| assert_eq!(reason, Some("upgrade")),
395 |_, when| assert_eq!(when, When::Reschedule),
396 );
397
398 schedule_internal(&glean, submitter, scheduler, local_now_with_offset());
399 assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
400 assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
401 }
402
403 #[test]
406 fn case_1_no_submit_but_schedule_tomorrow() {
407 let (glean, _t) = new_glean(None);
408
409 let fake_now = FixedOffset::east_opt(0)
410 .unwrap()
411 .with_ymd_and_hms(2021, 4, 30, 14, 36, 14)
412 .unwrap();
413 get_last_sent_time_metric().set_sync_chrono(&glean, fake_now);
414
415 let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
416 |_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason),
417 |_, when| assert_eq!(when, When::Tomorrow),
418 );
419 schedule_internal(&glean, submitter, scheduler, fake_now);
420 assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
421 assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
422 }
423
424 #[test]
428 fn case_2_submit_ping_and_reschedule() {
429 let (glean, _t) = new_glean(None);
430
431 let fake_yesterday = FixedOffset::east_opt(0)
432 .unwrap()
433 .with_ymd_and_hms(2021, 4, 29, SCHEDULED_HOUR, 0, 1)
434 .unwrap();
435 get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
436 let fake_now = fake_yesterday + Duration::days(1);
437
438 let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
439 |_, reason| assert_eq!(reason, Some("overdue")),
440 |_, when| assert_eq!(when, When::Reschedule),
441 );
442 schedule_internal(&glean, submitter, scheduler, fake_now);
443 assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
444 assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
445 }
446
447 #[test]
451 fn case_3_no_submit_but_schedule_today() {
452 let (glean, _t) = new_glean(None);
453
454 let fake_yesterday = FixedOffset::east_opt(0)
455 .unwrap()
456 .with_ymd_and_hms(2021, 4, 29, SCHEDULED_HOUR - 1, 0, 1)
457 .unwrap();
458 get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
459 let fake_now = fake_yesterday + Duration::days(1);
460
461 let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
462 |_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason),
463 |_, when| assert_eq!(when, When::Today),
464 );
465 schedule_internal(&glean, submitter, scheduler, fake_now);
466 assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
467 assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
468 }
469
470 #[test]
472 fn when_gets_at_least_some_date_math_correct() {
473 let now = FixedOffset::east_opt(0)
474 .unwrap()
475 .with_ymd_and_hms(2021, 4, 30, 15, 2, 10)
476 .unwrap();
477 assert_ne!(std::time::Duration::from_secs(0), When::Today.until(now));
479 let earlier = now
481 .date_naive()
482 .and_hms_opt(SCHEDULED_HOUR - 1, 0, 0)
483 .unwrap();
484 assert_eq!(
485 std::time::Duration::from_secs(3600),
486 When::Today.until(Utc.from_utc_datetime(&earlier).into())
487 );
488
489 assert_eq!(
493 std::time::Duration::from_secs(46670),
494 When::Tomorrow.until(now)
495 );
496 assert_eq!(
497 std::time::Duration::from_secs(46670),
498 When::Reschedule.until(now)
499 );
500 assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now));
501 assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason());
502 }
503
504 #[test]
505 fn datetime_offset_doesnt_cause_rapid_rescheduling() {
506 let now = FixedOffset::west_opt(3600 * 7)
507 .unwrap()
508 .with_ymd_and_hms(2025, 7, 27, 22, 27, 59)
509 .unwrap();
510
511 let next_schedule = When::Reschedule.until(now);
512
513 let expected_duration = std::time::Duration::from_secs(19921);
515 assert_eq!(expected_duration, next_schedule);
516 }
517
518 #[test]
519 fn todays_scheduling_is_in_localtime() {
520 let now = FixedOffset::west_opt(3600 * 7)
521 .unwrap()
522 .with_ymd_and_hms(2025, 7, 27, 3, 30, 0)
523 .unwrap();
524
525 let next_schedule = When::Today.until(now);
526
527 let expected_duration = std::time::Duration::from_secs(30 * 60);
529 assert_eq!(expected_duration, next_schedule);
530 }
531
532 static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
536
537 #[test]
539 fn cancellable_tasks_can_be_cancelled() {
540 let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
543 let (cancelled_lock, _condvar) = &**TASK_CONDVAR; *cancelled_lock.lock().unwrap() = false;
545
546 let now = FixedOffset::east_opt(0)
549 .unwrap()
550 .with_ymd_and_hms(2021, 4, 30, SCHEDULED_HOUR - 2, 0, 0)
551 .unwrap();
552
553 let proxy_factory = || {
554 new_proxies(
555 |_, reason| {
556 panic!(
557 "Shouldn't submit when testing scheduler. reason: {:?}",
558 reason
559 )
560 },
561 |_, _| panic!("Not even using the scheduler this time."),
562 )
563 };
564
565 let (submitter, submitter_count, _, _) = proxy_factory();
567 let handle = start_scheduler(submitter, now, When::Today);
568 super::cancel();
569 handle.join().unwrap(); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
571
572 let (submitter, submitter_count, _, _) = proxy_factory();
574 *cancelled_lock.lock().unwrap() = false; let handle = start_scheduler(submitter, now, When::Tomorrow);
576 super::cancel();
577 handle.join().unwrap(); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
579
580 let (submitter, submitter_count, _, _) = proxy_factory();
582 *cancelled_lock.lock().unwrap() = false; let handle = start_scheduler(submitter, now, When::Reschedule);
584 super::cancel();
585 handle.join().unwrap(); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
587 }
588
589 #[test]
591 fn immediate_task_runs_immediately() {
592 let _ = env_logger::builder().try_init();
593 let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
596 let (cancelled_lock, _condvar) = &**TASK_CONDVAR; *cancelled_lock.lock().unwrap() = false;
598
599 let (glean, _t) = new_glean(None);
601 assert!(
602 !glean.schedule_metrics_pings,
603 "Real schedulers not allowed in tests!"
604 );
605 assert!(crate::core::setup_glean(glean).is_ok());
606
607 let now = FixedOffset::east_opt(0)
609 .unwrap()
610 .with_ymd_and_hms(2021, 4, 21, 4, 0, 0)
611 .unwrap();
612
613 let (submitter, submitter_count, _, _) = new_proxies(
614 move |_, reason| {
615 assert_eq!(reason, Some("today"));
616 std::thread::spawn(super::cancel);
619 },
620 |_, _| panic!("Not using the scheduler this time."),
621 );
622
623 let handle = start_scheduler(submitter, now, When::Today);
624 handle.join().unwrap();
625 assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
626 }
627}