1use sqlx::postgres::PgListener;
7use sqlx::PgPool;
8use std::cmp::Reverse;
9use std::collections::BinaryHeap;
10use std::sync::Arc;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::{oneshot, Notify};
13use tokio::time::sleep_until;
14use tracing::{debug, error, info, warn};
15
16#[cfg(feature = "test-fault-injection")]
17use crate::fault_injection::FaultInjector;
18
19#[derive(Debug, Clone)]
21pub struct LongPollConfig {
22 pub enabled: bool,
25
26 pub notifier_poll_interval: Duration,
31
32 pub timer_grace_period: Duration,
37}
38
39impl Default for LongPollConfig {
40 fn default() -> Self {
41 Self {
42 enabled: true,
43 notifier_poll_interval: Duration::from_secs(60),
44 timer_grace_period: Duration::from_millis(100),
45 }
46 }
47}
48
49struct RefreshResult {
51 orch_timers: Vec<i64>, worker_timers: Vec<i64>, }
54
55pub struct Notifier {
57 pg_listener: PgListener,
59 pool: PgPool,
60 schema_name: String,
61
62 orch_heap: BinaryHeap<Reverse<Instant>>,
64 worker_heap: BinaryHeap<Reverse<Instant>>,
65
66 orch_notify: Arc<Notify>,
68 worker_notify: Arc<Notify>,
69
70 next_refresh: Instant,
72
73 pending_refresh: Option<oneshot::Receiver<RefreshResult>>,
75
76 config: LongPollConfig,
78
79 #[cfg(feature = "test-fault-injection")]
81 fault_injector: Option<Arc<FaultInjector>>,
82}
83
84impl Notifier {
85 pub async fn new(
87 pool: PgPool,
88 schema_name: String,
89 orch_notify: Arc<Notify>,
90 worker_notify: Arc<Notify>,
91 config: LongPollConfig,
92 ) -> Result<Self, sqlx::Error> {
93 Self::new_internal(
94 pool,
95 schema_name,
96 orch_notify,
97 worker_notify,
98 config,
99 #[cfg(feature = "test-fault-injection")]
100 None,
101 )
102 .await
103 }
104
105 #[cfg(feature = "test-fault-injection")]
107 pub async fn new_with_fault_injection(
108 pool: PgPool,
109 schema_name: String,
110 orch_notify: Arc<Notify>,
111 worker_notify: Arc<Notify>,
112 config: LongPollConfig,
113 fault_injector: Arc<FaultInjector>,
114 ) -> Result<Self, sqlx::Error> {
115 Self::new_internal(
116 pool,
117 schema_name,
118 orch_notify,
119 worker_notify,
120 config,
121 Some(fault_injector),
122 )
123 .await
124 }
125
126 async fn new_internal(
127 pool: PgPool,
128 schema_name: String,
129 orch_notify: Arc<Notify>,
130 worker_notify: Arc<Notify>,
131 config: LongPollConfig,
132 #[cfg(feature = "test-fault-injection")] fault_injector: Option<Arc<FaultInjector>>,
133 ) -> Result<Self, sqlx::Error> {
134 let pg_listener = PgListener::connect_with(&pool).await?;
135
136 let mut notifier = Self {
137 pg_listener,
138 pool,
139 schema_name,
140 orch_heap: BinaryHeap::new(),
141 worker_heap: BinaryHeap::new(),
142 orch_notify,
143 worker_notify,
144 next_refresh: Instant::now(), pending_refresh: None,
146 config,
147 #[cfg(feature = "test-fault-injection")]
148 fault_injector,
149 };
150
151 notifier.subscribe_channels().await?;
152
153 info!(
154 target = "duroxide::providers::postgres::notifier",
155 schema = %notifier.schema_name,
156 "Notifier started, listening for NOTIFY events"
157 );
158
159 Ok(notifier)
160 }
161
162 async fn subscribe_channels(&mut self) -> Result<(), sqlx::Error> {
164 let orch_channel = format!("{}_orch_work", self.schema_name);
165 let worker_channel = format!("{}_worker_work", self.schema_name);
166
167 self.pg_listener.listen(&orch_channel).await?;
168 self.pg_listener.listen(&worker_channel).await?;
169
170 debug!(
171 target = "duroxide::providers::postgres::notifier",
172 orch_channel = %orch_channel,
173 worker_channel = %worker_channel,
174 "Subscribed to NOTIFY channels"
175 );
176
177 Ok(())
178 }
179
180 pub async fn run(&mut self) {
182 loop {
183 #[cfg(feature = "test-fault-injection")]
185 if let Some(ref fi) = self.fault_injector {
186 if fi.should_notifier_panic() {
187 panic!("Fault injection: notifier panic triggered");
188 }
189 if fi.should_reconnect() {
190 warn!(
191 target = "duroxide::providers::postgres::notifier",
192 "Fault injection: forcing reconnect"
193 );
194 self.handle_reconnect().await;
195 continue;
196 }
197 }
198
199 let next_timer = self.earliest_timer();
201 let refresh_in_progress = self.pending_refresh.is_some();
202
203 let next_wake = if refresh_in_progress {
204 next_timer.unwrap_or_else(|| Instant::now() + Duration::from_secs(60))
206 } else {
207 match next_timer {
208 Some(t) => t.min(self.next_refresh),
209 None => self.next_refresh,
210 }
211 };
212
213 tokio::select! {
214 result = self.pg_listener.recv() => {
216 match result {
217 Ok(notification) => {
218 self.handle_notify(notification);
219 }
220 Err(e) => {
221 warn!(
222 target = "duroxide::providers::postgres::notifier",
223 error = %e,
224 "LISTEN connection error, reconnecting..."
225 );
226 self.handle_reconnect().await;
227 }
228 }
229 }
230
231 _ = sleep_until(next_wake.into()) => {
233 self.pop_and_wake_expired_timers();
234 self.maybe_start_refresh();
235 }
236
237 Some(result) = async {
239 match &mut self.pending_refresh {
240 Some(rx) => rx.await.ok(),
241 None => std::future::pending().await,
242 }
243 } => {
244 self.pending_refresh = None;
245 self.handle_refresh_result(result);
246 }
247 }
248 }
249 }
250
251 fn earliest_timer(&self) -> Option<Instant> {
253 let orch = self.orch_heap.peek().map(|r| r.0);
254 let worker = self.worker_heap.peek().map(|r| r.0);
255 match (orch, worker) {
256 (Some(a), Some(b)) => Some(a.min(b)),
257 (Some(a), None) => Some(a),
258 (None, Some(b)) => Some(b),
259 (None, None) => None,
260 }
261 }
262
263 fn handle_notify(&mut self, notification: sqlx::postgres::PgNotification) {
265 let now_ms = current_epoch_ms();
266 let window_end_ms = now_ms + self.config.notifier_poll_interval.as_millis() as i64;
267 let now_instant = Instant::now();
268
269 let is_orch = notification.channel().ends_with("_orch_work");
270
271 let action = parse_notify_action(
272 notification.payload(),
273 now_ms,
274 window_end_ms,
275 self.config.timer_grace_period,
276 now_instant,
277 );
278
279 match action {
280 NotifyAction::WakeNow => {
281 debug!(
282 target = "duroxide::providers::postgres::notifier",
283 channel = %notification.channel(),
284 payload = %notification.payload(),
285 "Immediate work, waking dispatchers"
286 );
287 self.wake_dispatchers(is_orch);
288 }
289 NotifyAction::AddTimer { fire_at } => {
290 debug!(
291 target = "duroxide::providers::postgres::notifier",
292 channel = %notification.channel(),
293 payload = %notification.payload(),
294 "Future timer, adding to heap"
295 );
296 if is_orch {
297 self.orch_heap.push(Reverse(fire_at));
298 } else {
299 self.worker_heap.push(Reverse(fire_at));
300 }
301 }
302 NotifyAction::Ignore => {
303 debug!(
304 target = "duroxide::providers::postgres::notifier",
305 channel = %notification.channel(),
306 payload = %notification.payload(),
307 "Timer beyond window, ignoring"
308 );
309 }
310 }
311 }
312
313 fn wake_dispatchers(&self, is_orch: bool) {
320 if is_orch {
321 self.orch_notify.notify_one();
322 } else {
323 self.worker_notify.notify_waiters();
324 }
325 }
326
327 fn pop_and_wake_expired_timers(&mut self) {
329 let now = Instant::now();
330
331 while let Some(Reverse(fire_at)) = self.orch_heap.peek() {
333 if *fire_at <= now {
334 self.orch_heap.pop();
335 self.orch_notify.notify_one();
336 } else {
337 break;
338 }
339 }
340
341 while let Some(Reverse(fire_at)) = self.worker_heap.peek() {
343 if *fire_at <= now {
344 self.worker_heap.pop();
345 self.worker_notify.notify_waiters();
346 } else {
347 break;
348 }
349 }
350 }
351
352 fn maybe_start_refresh(&mut self) {
354 if self.pending_refresh.is_some() || Instant::now() < self.next_refresh {
355 return;
356 }
357
358 let (tx, rx) = oneshot::channel();
359 self.pending_refresh = Some(rx);
360
361 let pool = self.pool.clone();
362 let schema = self.schema_name.clone();
363 let now_ms = current_epoch_ms();
364 let window_end_ms = now_ms + self.config.notifier_poll_interval.as_millis() as i64;
365
366 #[cfg(feature = "test-fault-injection")]
368 let fault_injector = self.fault_injector.clone();
369
370 debug!(
371 target = "duroxide::providers::postgres::notifier",
372 now_ms = now_ms,
373 window_end_ms = window_end_ms,
374 "Starting refresh query"
375 );
376
377 tokio::spawn(async move {
378 #[cfg(feature = "test-fault-injection")]
380 if let Some(ref fi) = fault_injector {
381 let delay = fi.get_refresh_delay();
382 if !delay.is_zero() {
383 tokio::time::sleep(delay).await;
384 }
385 }
386
387 #[cfg(feature = "test-fault-injection")]
389 if let Some(ref fi) = fault_injector {
390 if fi.should_refresh_error() {
391 warn!(
392 target = "duroxide::providers::postgres::notifier",
393 "Fault injection: simulating refresh error"
394 );
395 let _ = tx.send(RefreshResult {
397 orch_timers: Vec::new(),
398 worker_timers: Vec::new(),
399 });
400 return;
401 }
402 }
403
404 let orch_timers = sqlx::query_scalar::<_, i64>(&format!(
407 "SELECT (EXTRACT(EPOCH FROM visible_at) * 1000)::BIGINT
408 FROM {schema}.orchestrator_queue
409 WHERE (EXTRACT(EPOCH FROM visible_at) * 1000)::BIGINT > $1
410 AND (EXTRACT(EPOCH FROM visible_at) * 1000)::BIGINT <= $2
411 AND lock_token IS NULL"
412 ))
413 .bind(now_ms)
414 .bind(window_end_ms)
415 .fetch_all(&pool)
416 .await
417 .unwrap_or_default();
418
419 let worker_timers: Vec<i64> = Vec::new();
423
424 let _ = tx.send(RefreshResult {
426 orch_timers,
427 worker_timers,
428 });
429 });
430 }
431
432 fn handle_refresh_result(&mut self, result: RefreshResult) {
434 let now_ms = current_epoch_ms();
435 let now_instant = Instant::now();
436
437 debug!(
438 target = "duroxide::providers::postgres::notifier",
439 orch_count = result.orch_timers.len(),
440 worker_count = result.worker_timers.len(),
441 "Refresh query completed"
442 );
443
444 for fire_at in timers_from_refresh(
446 &result.orch_timers,
447 now_ms,
448 self.config.timer_grace_period,
449 now_instant,
450 ) {
451 self.orch_heap.push(Reverse(fire_at));
452 }
453
454 for fire_at in timers_from_refresh(
456 &result.worker_timers,
457 now_ms,
458 self.config.timer_grace_period,
459 now_instant,
460 ) {
461 self.worker_heap.push(Reverse(fire_at));
462 }
463
464 self.next_refresh = Instant::now() + self.config.notifier_poll_interval;
466 }
467
468 async fn handle_reconnect(&mut self) {
470 tokio::time::sleep(Duration::from_secs(1)).await;
472
473 match PgListener::connect_with(&self.pool).await {
475 Ok(listener) => {
476 self.pg_listener = listener;
477
478 if self.subscribe_channels().await.is_ok() {
479 info!(
480 target = "duroxide::providers::postgres::notifier",
481 "Reconnected to PostgreSQL LISTEN"
482 );
483
484 self.orch_notify.notify_one();
486 self.worker_notify.notify_waiters();
487
488 self.next_refresh = Instant::now();
490 }
491 }
492 Err(e) => {
493 error!(
494 target = "duroxide::providers::postgres::notifier",
495 error = %e,
496 "Failed to reconnect, will retry on next loop iteration"
497 );
498 }
499 }
500 }
502}
503
504fn current_epoch_ms() -> i64 {
506 SystemTime::now()
507 .duration_since(UNIX_EPOCH)
508 .unwrap_or_default()
509 .as_millis() as i64
510}
511
512#[derive(Debug, Clone, PartialEq, Eq)]
514pub enum NotifyAction {
515 WakeNow,
517 AddTimer { fire_at: Instant },
519 Ignore,
521}
522
523pub fn parse_notify_action(
527 payload: &str,
528 now_ms: i64,
529 window_end_ms: i64,
530 grace_period: Duration,
531 now_instant: Instant,
532) -> NotifyAction {
533 let visible_at_ms: i64 = payload.parse().unwrap_or(0); if visible_at_ms <= now_ms {
536 NotifyAction::WakeNow
538 } else if visible_at_ms <= window_end_ms {
539 let delay_ms = (visible_at_ms - now_ms) + grace_period.as_millis() as i64;
541 let fire_at = now_instant + Duration::from_millis(delay_ms as u64);
542 NotifyAction::AddTimer { fire_at }
543 } else {
544 NotifyAction::Ignore
546 }
547}
548
549pub fn timers_from_refresh(
565 visible_at_times: &[i64],
566 now_ms: i64,
567 grace_period: Duration,
568 now_instant: Instant,
569) -> Vec<Instant> {
570 visible_at_times
571 .iter()
572 .filter_map(|&visible_at_ms| {
573 let delay_ms = (visible_at_ms - now_ms) + grace_period.as_millis() as i64;
574 if delay_ms > 0 {
575 Some(now_instant + Duration::from_millis(delay_ms as u64))
576 } else {
577 None
578 }
579 })
580 .collect()
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586
587 #[test]
592 fn test_current_epoch_ms() {
593 let ms = current_epoch_ms();
594 assert!(ms > 1_577_836_800_000); assert!(ms < 2_524_608_000_000); }
598
599 #[test]
600 fn test_longpoll_config_default() {
601 let config = LongPollConfig::default();
602 assert!(config.enabled);
603 assert_eq!(config.notifier_poll_interval, Duration::from_secs(60));
604 assert_eq!(config.timer_grace_period, Duration::from_millis(100));
605 }
606
607 #[test]
612 fn notify_immediate_work_wakes_dispatchers() {
613 let now_ms = 1_700_000_000_000i64;
615 let window_end_ms = now_ms + 60_000;
616 let grace = Duration::from_millis(100);
617 let now_instant = Instant::now();
618
619 let action = parse_notify_action(
620 &now_ms.to_string(),
621 now_ms,
622 window_end_ms,
623 grace,
624 now_instant,
625 );
626
627 assert_eq!(action, NotifyAction::WakeNow);
628 }
629
630 #[test]
631 fn notify_past_visible_at_wakes_immediately() {
632 let now_ms = 1_700_000_000_000i64;
634 let past_ms = now_ms - 5_000; let window_end_ms = now_ms + 60_000;
636 let grace = Duration::from_millis(100);
637 let now_instant = Instant::now();
638
639 let action = parse_notify_action(
640 &past_ms.to_string(),
641 now_ms,
642 window_end_ms,
643 grace,
644 now_instant,
645 );
646
647 assert_eq!(action, NotifyAction::WakeNow);
648 }
649
650 #[test]
651 fn notify_future_timer_adds_to_heap() {
652 let now_ms = 1_700_000_000_000i64;
654 let future_ms = now_ms + 30_000; let window_end_ms = now_ms + 60_000;
656 let grace = Duration::from_millis(100);
657 let now_instant = Instant::now();
658
659 let action = parse_notify_action(
660 &future_ms.to_string(),
661 now_ms,
662 window_end_ms,
663 grace,
664 now_instant,
665 );
666
667 match action {
668 NotifyAction::AddTimer { fire_at } => {
669 let expected_delay = Duration::from_millis(30_100);
671 let actual_delay = fire_at.duration_since(now_instant);
672 assert!(
673 actual_delay >= expected_delay - Duration::from_millis(10)
674 && actual_delay <= expected_delay + Duration::from_millis(10),
675 "Expected delay ~30.1s, got {actual_delay:?}"
676 );
677 }
678 other => panic!("Expected AddTimer, got {other:?}"),
679 }
680 }
681
682 #[test]
683 fn notify_beyond_window_ignored() {
684 let now_ms = 1_700_000_000_000i64;
686 let far_future_ms = now_ms + 90_000; let window_end_ms = now_ms + 60_000;
688 let grace = Duration::from_millis(100);
689 let now_instant = Instant::now();
690
691 let action = parse_notify_action(
692 &far_future_ms.to_string(),
693 now_ms,
694 window_end_ms,
695 grace,
696 now_instant,
697 );
698
699 assert_eq!(action, NotifyAction::Ignore);
700 }
701
702 #[test]
703 fn notify_invalid_payload_treated_as_immediate() {
704 let now_ms = 1_700_000_000_000i64;
706 let window_end_ms = now_ms + 60_000;
707 let grace = Duration::from_millis(100);
708 let now_instant = Instant::now();
709
710 let action = parse_notify_action("garbage", now_ms, window_end_ms, grace, now_instant);
711
712 assert_eq!(action, NotifyAction::WakeNow);
713 }
714
715 #[test]
716 fn notify_empty_payload_treated_as_immediate() {
717 let now_ms = 1_700_000_000_000i64;
719 let window_end_ms = now_ms + 60_000;
720 let grace = Duration::from_millis(100);
721 let now_instant = Instant::now();
722
723 let action = parse_notify_action("", now_ms, window_end_ms, grace, now_instant);
724
725 assert_eq!(action, NotifyAction::WakeNow);
726 }
727
728 #[test]
733 fn timer_fires_at_visible_at_plus_grace() {
734 let now_ms = 1_700_000_000_000i64;
736 let visible_at_ms = now_ms + 10_000; let window_end_ms = now_ms + 60_000;
738 let grace = Duration::from_millis(100);
739 let now_instant = Instant::now();
740
741 let action = parse_notify_action(
742 &visible_at_ms.to_string(),
743 now_ms,
744 window_end_ms,
745 grace,
746 now_instant,
747 );
748
749 match action {
750 NotifyAction::AddTimer { fire_at } => {
751 let delay = fire_at.duration_since(now_instant);
752 let expected = Duration::from_millis(10_100);
754 assert!(
755 delay >= expected - Duration::from_millis(5)
756 && delay <= expected + Duration::from_millis(5),
757 "Timer should fire at visible_at + grace, got {delay:?}"
758 );
759 }
760 other => panic!("Expected AddTimer, got {other:?}"),
761 }
762 }
763
764 #[test]
765 fn timer_heap_ordering() {
766 let mut heap: BinaryHeap<Reverse<Instant>> = BinaryHeap::new();
768 let now = Instant::now();
769
770 let t1 = now + Duration::from_secs(10);
771 let t2 = now + Duration::from_secs(5);
772 let t3 = now + Duration::from_secs(15);
773
774 heap.push(Reverse(t1));
775 heap.push(Reverse(t2));
776 heap.push(Reverse(t3));
777
778 assert_eq!(heap.pop().unwrap().0, t2);
780 assert_eq!(heap.pop().unwrap().0, t1);
781 assert_eq!(heap.pop().unwrap().0, t3);
782 }
783
784 #[test]
785 fn expired_timers_popped_in_batch() {
786 let mut heap: BinaryHeap<Reverse<Instant>> = BinaryHeap::new();
788 let past = Instant::now() - Duration::from_secs(1);
789
790 heap.push(Reverse(past - Duration::from_millis(100)));
792 heap.push(Reverse(past - Duration::from_millis(200)));
793 heap.push(Reverse(past - Duration::from_millis(300)));
794
795 let now = Instant::now();
796 let mut fired = 0;
797
798 while let Some(Reverse(fire_at)) = heap.peek() {
799 if *fire_at <= now {
800 heap.pop();
801 fired += 1;
802 } else {
803 break;
804 }
805 }
806
807 assert_eq!(fired, 3);
808 assert!(heap.is_empty());
809 }
810
811 #[test]
812 fn timer_does_not_fire_early() {
813 let mut heap: BinaryHeap<Reverse<Instant>> = BinaryHeap::new();
815 let now = Instant::now();
816 let future = now + Duration::from_secs(10);
817
818 heap.push(Reverse(future));
819
820 if let Some(Reverse(fire_at)) = heap.peek() {
822 assert!(*fire_at > now, "Timer should not fire early");
823 }
824 }
825
826 #[test]
831 fn refresh_adds_timers_to_heap() {
832 let now_ms = 1_700_000_000_000i64;
834 let grace = Duration::from_millis(100);
835 let now_instant = Instant::now();
836
837 let timers = vec![
838 now_ms + 10_000, now_ms + 30_000, ];
841
842 let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
843
844 assert_eq!(result.len(), 2);
845 let delay1 = result[0].duration_since(now_instant);
847 assert!(delay1 >= Duration::from_millis(10_000));
848 assert!(delay1 <= Duration::from_millis(10_200));
849 }
850
851 #[test]
852 fn refresh_skips_already_passed_timers() {
853 let now_ms = 1_700_000_000_000i64;
855 let grace = Duration::from_millis(100);
856 let now_instant = Instant::now();
857
858 let timers = vec![
859 now_ms - 5_000, now_ms + 100, ];
862
863 let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
864
865 assert_eq!(result.len(), 1);
867 }
868
869 #[test]
870 fn refresh_with_empty_result() {
871 let now_ms = 1_700_000_000_000i64;
873 let grace = Duration::from_millis(100);
874 let now_instant = Instant::now();
875
876 let timers: Vec<i64> = vec![];
877 let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
878
879 assert!(result.is_empty());
880 }
881
882 #[test]
883 fn refresh_timer_includes_grace_period() {
884 let now_ms = 1_700_000_000_000i64;
886 let grace = Duration::from_millis(500); let now_instant = Instant::now();
888
889 let timers = vec![now_ms + 10_000]; let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
892
893 assert_eq!(result.len(), 1);
894 let delay = result[0].duration_since(now_instant);
895 assert!(delay >= Duration::from_millis(10_400));
897 assert!(delay <= Duration::from_millis(10_600));
898 }
899
900 #[test]
901 fn refresh_boundary_timer_at_exactly_now() {
902 let now_ms = 1_700_000_000_000i64;
904 let grace = Duration::from_millis(100);
905 let now_instant = Instant::now();
906
907 let timers = vec![now_ms]; let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
910
911 assert_eq!(result.len(), 1);
914 let delay = result[0].duration_since(now_instant);
915 assert!(delay >= Duration::from_millis(90));
916 assert!(delay <= Duration::from_millis(110));
917 }
918
919 #[test]
924 fn notify_at_window_boundary_included() {
925 let now_ms = 1_700_000_000_000i64;
927 let window_end_ms = now_ms + 60_000;
928 let grace = Duration::from_millis(100);
929 let now_instant = Instant::now();
930
931 let action = parse_notify_action(
932 &window_end_ms.to_string(),
933 now_ms,
934 window_end_ms,
935 grace,
936 now_instant,
937 );
938
939 match action {
941 NotifyAction::AddTimer { .. } => {}
942 other => panic!("Expected AddTimer at window boundary, got {other:?}"),
943 }
944 }
945
946 #[test]
947 fn notify_just_past_window_boundary_ignored() {
948 let now_ms = 1_700_000_000_000i64;
950 let window_end_ms = now_ms + 60_000;
951 let grace = Duration::from_millis(100);
952 let now_instant = Instant::now();
953
954 let action = parse_notify_action(
955 &(window_end_ms + 1).to_string(),
956 now_ms,
957 window_end_ms,
958 grace,
959 now_instant,
960 );
961
962 assert_eq!(action, NotifyAction::Ignore);
963 }
964
965 #[test]
966 fn notify_negative_timestamp_treated_as_immediate() {
967 let now_ms = 1_700_000_000_000i64;
969 let window_end_ms = now_ms + 60_000;
970 let grace = Duration::from_millis(100);
971 let now_instant = Instant::now();
972
973 let action = parse_notify_action("-1000", now_ms, window_end_ms, grace, now_instant);
974
975 assert_eq!(action, NotifyAction::WakeNow);
976 }
977
978 #[test]
979 fn notify_zero_timestamp_treated_as_immediate() {
980 let now_ms = 1_700_000_000_000i64;
982 let window_end_ms = now_ms + 60_000;
983 let grace = Duration::from_millis(100);
984 let now_instant = Instant::now();
985
986 let action = parse_notify_action("0", now_ms, window_end_ms, grace, now_instant);
987
988 assert_eq!(action, NotifyAction::WakeNow);
989 }
990}