Skip to main content

duroxide_pg_opt/
notifier.rs

1//! Long-polling notifier for PostgreSQL provider.
2//!
3//! The notifier thread listens for PostgreSQL NOTIFY events and manages timer heaps
4//! to wake dispatchers at the right time, reducing idle database polling.
5
6use sqlx::postgres::PgListener;
7use sqlx::PgPool;
8use std::cmp::Reverse;
9use std::collections::BinaryHeap;
10use std::sync::Arc;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::{oneshot, Notify};
13use tokio::time::sleep_until;
14use tracing::{debug, error, info, warn};
15
16#[cfg(feature = "test-fault-injection")]
17use crate::fault_injection::FaultInjector;
18
19/// Configuration for long-polling behavior.
20#[derive(Debug, Clone)]
21pub struct LongPollConfig {
22    /// Enable long-polling (LISTEN/NOTIFY based).
23    /// Default: true
24    pub enabled: bool,
25
26    /// Interval for querying upcoming timers from the database.
27    /// The notifier queries for work with visible_at within this window.
28    /// Also serves as a safety net to catch any missed NOTIFYs.
29    /// Default: 60 seconds
30    pub notifier_poll_interval: Duration,
31
32    /// Grace period added to timer delays to ensure we never wake early.
33    /// Accounts for tokio timer jitter and processing overhead.
34    /// delay = (visible_at - now) + timer_grace_period
35    /// Default: 100ms
36    pub timer_grace_period: Duration,
37}
38
39impl Default for LongPollConfig {
40    fn default() -> Self {
41        Self {
42            enabled: true,
43            notifier_poll_interval: Duration::from_secs(60),
44            timer_grace_period: Duration::from_millis(100),
45        }
46    }
47}
48
49/// Result of a refresh query containing upcoming timer timestamps.
50struct RefreshResult {
51    orch_timers: Vec<i64>,   // visible_at as epoch ms
52    worker_timers: Vec<i64>, // visible_at as epoch ms
53}
54
55/// The notifier thread that manages LISTEN/NOTIFY and timer heaps.
56pub struct Notifier {
57    /// PostgreSQL connection for LISTEN
58    pg_listener: PgListener,
59    pool: PgPool,
60    schema_name: String,
61
62    /// Timer heaps (min-heap by fire time)
63    orch_heap: BinaryHeap<Reverse<Instant>>,
64    worker_heap: BinaryHeap<Reverse<Instant>>,
65
66    /// Dispatcher wake channels
67    orch_notify: Arc<Notify>,
68    worker_notify: Arc<Notify>,
69
70    /// Refresh scheduling
71    next_refresh: Instant,
72
73    /// Active refresh task (if any)
74    pending_refresh: Option<oneshot::Receiver<RefreshResult>>,
75
76    /// Configuration
77    config: LongPollConfig,
78
79    /// Fault injector for testing (only available with test-fault-injection feature)
80    #[cfg(feature = "test-fault-injection")]
81    fault_injector: Option<Arc<FaultInjector>>,
82}
83
84impl Notifier {
85    /// Create a new notifier and subscribe to NOTIFY channels.
86    pub async fn new(
87        pool: PgPool,
88        schema_name: String,
89        orch_notify: Arc<Notify>,
90        worker_notify: Arc<Notify>,
91        config: LongPollConfig,
92    ) -> Result<Self, sqlx::Error> {
93        Self::new_internal(
94            pool,
95            schema_name,
96            orch_notify,
97            worker_notify,
98            config,
99            #[cfg(feature = "test-fault-injection")]
100            None,
101        )
102        .await
103    }
104
105    /// Create a new notifier with fault injection for testing.
106    #[cfg(feature = "test-fault-injection")]
107    pub async fn new_with_fault_injection(
108        pool: PgPool,
109        schema_name: String,
110        orch_notify: Arc<Notify>,
111        worker_notify: Arc<Notify>,
112        config: LongPollConfig,
113        fault_injector: Arc<FaultInjector>,
114    ) -> Result<Self, sqlx::Error> {
115        Self::new_internal(
116            pool,
117            schema_name,
118            orch_notify,
119            worker_notify,
120            config,
121            Some(fault_injector),
122        )
123        .await
124    }
125
126    async fn new_internal(
127        pool: PgPool,
128        schema_name: String,
129        orch_notify: Arc<Notify>,
130        worker_notify: Arc<Notify>,
131        config: LongPollConfig,
132        #[cfg(feature = "test-fault-injection")] fault_injector: Option<Arc<FaultInjector>>,
133    ) -> Result<Self, sqlx::Error> {
134        let pg_listener = PgListener::connect_with(&pool).await?;
135
136        let mut notifier = Self {
137            pg_listener,
138            pool,
139            schema_name,
140            orch_heap: BinaryHeap::new(),
141            worker_heap: BinaryHeap::new(),
142            orch_notify,
143            worker_notify,
144            next_refresh: Instant::now(), // Immediate first refresh
145            pending_refresh: None,
146            config,
147            #[cfg(feature = "test-fault-injection")]
148            fault_injector,
149        };
150
151        notifier.subscribe_channels().await?;
152
153        info!(
154            target = "duroxide::providers::postgres::notifier",
155            schema = %notifier.schema_name,
156            "Notifier started, listening for NOTIFY events"
157        );
158
159        Ok(notifier)
160    }
161
162    /// Subscribe to NOTIFY channels. Used by new() and handle_reconnect().
163    async fn subscribe_channels(&mut self) -> Result<(), sqlx::Error> {
164        let orch_channel = format!("{}_orch_work", self.schema_name);
165        let worker_channel = format!("{}_worker_work", self.schema_name);
166
167        self.pg_listener.listen(&orch_channel).await?;
168        self.pg_listener.listen(&worker_channel).await?;
169
170        debug!(
171            target = "duroxide::providers::postgres::notifier",
172            orch_channel = %orch_channel,
173            worker_channel = %worker_channel,
174            "Subscribed to NOTIFY channels"
175        );
176
177        Ok(())
178    }
179
180    /// Main loop - runs until the notifier is dropped.
181    pub async fn run(&mut self) {
182        loop {
183            // Check for fault injection: should we panic?
184            #[cfg(feature = "test-fault-injection")]
185            if let Some(ref fi) = self.fault_injector {
186                if fi.should_notifier_panic() {
187                    panic!("Fault injection: notifier panic triggered");
188                }
189                if fi.should_reconnect() {
190                    warn!(
191                        target = "duroxide::providers::postgres::notifier",
192                        "Fault injection: forcing reconnect"
193                    );
194                    self.handle_reconnect().await;
195                    continue;
196                }
197            }
198
199            // Calculate next wake time
200            let next_timer = self.earliest_timer();
201            let refresh_in_progress = self.pending_refresh.is_some();
202
203            let next_wake = if refresh_in_progress {
204                // Don't wait for refresh time if query already running
205                next_timer.unwrap_or_else(|| Instant::now() + Duration::from_secs(60))
206            } else {
207                match next_timer {
208                    Some(t) => t.min(self.next_refresh),
209                    None => self.next_refresh,
210                }
211            };
212
213            tokio::select! {
214                // PostgreSQL NOTIFY received
215                result = self.pg_listener.recv() => {
216                    match result {
217                        Ok(notification) => {
218                            self.handle_notify(notification);
219                        }
220                        Err(e) => {
221                            warn!(
222                                target = "duroxide::providers::postgres::notifier",
223                                error = %e,
224                                "LISTEN connection error, reconnecting..."
225                            );
226                            self.handle_reconnect().await;
227                        }
228                    }
229                }
230
231                // Timer or refresh time reached
232                _ = sleep_until(next_wake.into()) => {
233                    self.pop_and_wake_expired_timers();
234                    self.maybe_start_refresh();
235                }
236
237                // Refresh query completed (non-blocking)
238                Some(result) = async {
239                    match &mut self.pending_refresh {
240                        Some(rx) => rx.await.ok(),
241                        None => std::future::pending().await,
242                    }
243                } => {
244                    self.pending_refresh = None;
245                    self.handle_refresh_result(result);
246                }
247            }
248        }
249    }
250
251    /// Find the earliest timer across both heaps.
252    fn earliest_timer(&self) -> Option<Instant> {
253        let orch = self.orch_heap.peek().map(|r| r.0);
254        let worker = self.worker_heap.peek().map(|r| r.0);
255        match (orch, worker) {
256            (Some(a), Some(b)) => Some(a.min(b)),
257            (Some(a), None) => Some(a),
258            (None, Some(b)) => Some(b),
259            (None, None) => None,
260        }
261    }
262
263    /// Handle a NOTIFY from PostgreSQL.
264    fn handle_notify(&mut self, notification: sqlx::postgres::PgNotification) {
265        let now_ms = current_epoch_ms();
266        let window_end_ms = now_ms + self.config.notifier_poll_interval.as_millis() as i64;
267        let now_instant = Instant::now();
268
269        let is_orch = notification.channel().ends_with("_orch_work");
270
271        let action = parse_notify_action(
272            notification.payload(),
273            now_ms,
274            window_end_ms,
275            self.config.timer_grace_period,
276            now_instant,
277        );
278
279        match action {
280            NotifyAction::WakeNow => {
281                debug!(
282                    target = "duroxide::providers::postgres::notifier",
283                    channel = %notification.channel(),
284                    payload = %notification.payload(),
285                    "Immediate work, waking dispatchers"
286                );
287                self.wake_dispatchers(is_orch);
288            }
289            NotifyAction::AddTimer { fire_at } => {
290                debug!(
291                    target = "duroxide::providers::postgres::notifier",
292                    channel = %notification.channel(),
293                    payload = %notification.payload(),
294                    "Future timer, adding to heap"
295                );
296                if is_orch {
297                    self.orch_heap.push(Reverse(fire_at));
298                } else {
299                    self.worker_heap.push(Reverse(fire_at));
300                }
301            }
302            NotifyAction::Ignore => {
303                debug!(
304                    target = "duroxide::providers::postgres::notifier",
305                    channel = %notification.channel(),
306                    payload = %notification.payload(),
307                    "Timer beyond window, ignoring"
308                );
309            }
310        }
311    }
312
313    /// Wake all waiting dispatchers for the given queue type.
314    ///
315    /// Worker uses `notify_waiters()` so that ALL worker slots wake up.
316    /// This is required for session routing: a session-bound item can only
317    /// be served by the slot that owns the session, so waking just one
318    /// slot (which might be the wrong one) would cause a deadlock.
319    fn wake_dispatchers(&self, is_orch: bool) {
320        if is_orch {
321            self.orch_notify.notify_one();
322        } else {
323            self.worker_notify.notify_waiters();
324        }
325    }
326
327    /// Pop and fire any expired timers from both heaps.
328    fn pop_and_wake_expired_timers(&mut self) {
329        let now = Instant::now();
330
331        // Pop expired orchestrator timers
332        while let Some(Reverse(fire_at)) = self.orch_heap.peek() {
333            if *fire_at <= now {
334                self.orch_heap.pop();
335                self.orch_notify.notify_one();
336            } else {
337                break;
338            }
339        }
340
341        // Pop expired worker timers (notify_waiters for session routing correctness)
342        while let Some(Reverse(fire_at)) = self.worker_heap.peek() {
343            if *fire_at <= now {
344                self.worker_heap.pop();
345                self.worker_notify.notify_waiters();
346            } else {
347                break;
348            }
349        }
350    }
351
352    /// Start a refresh query if not already in progress and it's time.
353    fn maybe_start_refresh(&mut self) {
354        if self.pending_refresh.is_some() || Instant::now() < self.next_refresh {
355            return;
356        }
357
358        let (tx, rx) = oneshot::channel();
359        self.pending_refresh = Some(rx);
360
361        let pool = self.pool.clone();
362        let schema = self.schema_name.clone();
363        let now_ms = current_epoch_ms();
364        let window_end_ms = now_ms + self.config.notifier_poll_interval.as_millis() as i64;
365
366        // Get fault injection parameters before spawning
367        #[cfg(feature = "test-fault-injection")]
368        let fault_injector = self.fault_injector.clone();
369
370        debug!(
371            target = "duroxide::providers::postgres::notifier",
372            now_ms = now_ms,
373            window_end_ms = window_end_ms,
374            "Starting refresh query"
375        );
376
377        tokio::spawn(async move {
378            // Check for fault injection: add delay before refresh
379            #[cfg(feature = "test-fault-injection")]
380            if let Some(ref fi) = fault_injector {
381                let delay = fi.get_refresh_delay();
382                if !delay.is_zero() {
383                    tokio::time::sleep(delay).await;
384                }
385            }
386
387            // Check for fault injection: should refresh error?
388            #[cfg(feature = "test-fault-injection")]
389            if let Some(ref fi) = fault_injector {
390                if fi.should_refresh_error() {
391                    warn!(
392                        target = "duroxide::providers::postgres::notifier",
393                        "Fault injection: simulating refresh error"
394                    );
395                    // Send empty result to simulate error recovery
396                    let _ = tx.send(RefreshResult {
397                        orch_timers: Vec::new(),
398                        worker_timers: Vec::new(),
399                    });
400                    return;
401                }
402            }
403
404            // Query for upcoming timers in both queues
405            // Use Rust clock ($1) for "now" comparison, not database NOW()
406            let orch_timers = sqlx::query_scalar::<_, i64>(&format!(
407                "SELECT (EXTRACT(EPOCH FROM visible_at) * 1000)::BIGINT
408                 FROM {schema}.orchestrator_queue
409                 WHERE (EXTRACT(EPOCH FROM visible_at) * 1000)::BIGINT > $1
410                   AND (EXTRACT(EPOCH FROM visible_at) * 1000)::BIGINT <= $2
411                   AND lock_token IS NULL"
412            ))
413            .bind(now_ms)
414            .bind(window_end_ms)
415            .fetch_all(&pool)
416            .await
417            .unwrap_or_default();
418
419            // TODO: After adding visible_at column to worker_queue (see docs/WORKER_VISIBLE_AT_PROPOSAL.md),
420            // query for future worker timers here similar to orch_timers above.
421            // Worker queue items are currently always immediately available (no visible_at column)
422            let worker_timers: Vec<i64> = Vec::new();
423
424            // Send result (ignore error if receiver dropped)
425            let _ = tx.send(RefreshResult {
426                orch_timers,
427                worker_timers,
428            });
429        });
430    }
431
432    /// Process the result of a refresh query.
433    fn handle_refresh_result(&mut self, result: RefreshResult) {
434        let now_ms = current_epoch_ms();
435        let now_instant = Instant::now();
436
437        debug!(
438            target = "duroxide::providers::postgres::notifier",
439            orch_count = result.orch_timers.len(),
440            worker_count = result.worker_timers.len(),
441            "Refresh query completed"
442        );
443
444        // Add orchestrator timers
445        for fire_at in timers_from_refresh(
446            &result.orch_timers,
447            now_ms,
448            self.config.timer_grace_period,
449            now_instant,
450        ) {
451            self.orch_heap.push(Reverse(fire_at));
452        }
453
454        // Add worker timers
455        for fire_at in timers_from_refresh(
456            &result.worker_timers,
457            now_ms,
458            self.config.timer_grace_period,
459            now_instant,
460        ) {
461            self.worker_heap.push(Reverse(fire_at));
462        }
463
464        // Schedule next refresh
465        self.next_refresh = Instant::now() + self.config.notifier_poll_interval;
466    }
467
468    /// Handle reconnection after a LISTEN connection failure.
469    async fn handle_reconnect(&mut self) {
470        // Backoff before reconnect attempt
471        tokio::time::sleep(Duration::from_secs(1)).await;
472
473        // Reconnect and resubscribe
474        match PgListener::connect_with(&self.pool).await {
475            Ok(listener) => {
476                self.pg_listener = listener;
477
478                if self.subscribe_channels().await.is_ok() {
479                    info!(
480                        target = "duroxide::providers::postgres::notifier",
481                        "Reconnected to PostgreSQL LISTEN"
482                    );
483
484                    // Wake all dispatchers to catch any missed NOTIFYs during disconnect
485                    self.orch_notify.notify_one();
486                    self.worker_notify.notify_waiters();
487
488                    // Force immediate refresh to rebuild timer heaps
489                    self.next_refresh = Instant::now();
490                }
491            }
492            Err(e) => {
493                error!(
494                    target = "duroxide::providers::postgres::notifier",
495                    error = %e,
496                    "Failed to reconnect, will retry on next loop iteration"
497                );
498            }
499        }
500        // If reconnect fails, loop will call handle_reconnect again on next recv() error
501    }
502}
503
504/// Get current time as epoch milliseconds (Rust clock).
505fn current_epoch_ms() -> i64 {
506    SystemTime::now()
507        .duration_since(UNIX_EPOCH)
508        .unwrap_or_default()
509        .as_millis() as i64
510}
511
512/// Action determined by parsing a NOTIFY payload.
513#[derive(Debug, Clone, PartialEq, Eq)]
514pub enum NotifyAction {
515    /// Work is immediately visible, wake dispatchers now.
516    WakeNow,
517    /// Work will be visible in the future, add timer.
518    AddTimer { fire_at: Instant },
519    /// Work is too far in the future, ignore (refresh will catch it).
520    Ignore,
521}
522
523/// Parse a NOTIFY payload and determine what action to take.
524///
525/// This is a pure function for testability.
526pub fn parse_notify_action(
527    payload: &str,
528    now_ms: i64,
529    window_end_ms: i64,
530    grace_period: Duration,
531    now_instant: Instant,
532) -> NotifyAction {
533    let visible_at_ms: i64 = payload.parse().unwrap_or(0); // 0 = treat as immediate
534
535    if visible_at_ms <= now_ms {
536        // Immediately visible (or past) → wake dispatchers now
537        NotifyAction::WakeNow
538    } else if visible_at_ms <= window_end_ms {
539        // Future timer within current window → schedule a timer
540        let delay_ms = (visible_at_ms - now_ms) + grace_period.as_millis() as i64;
541        let fire_at = now_instant + Duration::from_millis(delay_ms as u64);
542        NotifyAction::AddTimer { fire_at }
543    } else {
544        // Beyond window → ignore, refresh will catch it
545        NotifyAction::Ignore
546    }
547}
548
549/// Calculate timers to add from refresh result.
550///
551/// Returns Vec of (fire_at Instant) for timers that should be added to the heap.
552/// Filters out any timers that are already expired.
553///
554/// NOTE: There's an edge case where items could be missed if:
555/// 1. Item had visible_at just barely in the future when query started
556/// 2. Query takes longer than that margin to complete
557/// 3. By the time we process results, the item has expired (delay <= 0)
558///
559/// This is acceptable because:
560/// - The original NOTIFY when the item was inserted should have already woken dispatchers
561/// - poll_timeout (default 5s) provides a safety net for any missed items
562/// - This edge case is rare (query must be slow AND item must be near-immediate)
563/// - Adding an extra wake for expired items isn't worth the overhead
564pub fn timers_from_refresh(
565    visible_at_times: &[i64],
566    now_ms: i64,
567    grace_period: Duration,
568    now_instant: Instant,
569) -> Vec<Instant> {
570    visible_at_times
571        .iter()
572        .filter_map(|&visible_at_ms| {
573            let delay_ms = (visible_at_ms - now_ms) + grace_period.as_millis() as i64;
574            if delay_ms > 0 {
575                Some(now_instant + Duration::from_millis(delay_ms as u64))
576            } else {
577                None
578            }
579        })
580        .collect()
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    // =======================================================================
588    // Basic Tests
589    // =======================================================================
590
591    #[test]
592    fn test_current_epoch_ms() {
593        let ms = current_epoch_ms();
594        // Should be a reasonable timestamp (after 2020)
595        assert!(ms > 1_577_836_800_000); // 2020-01-01
596        assert!(ms < 2_524_608_000_000); // 2050-01-01
597    }
598
599    #[test]
600    fn test_longpoll_config_default() {
601        let config = LongPollConfig::default();
602        assert!(config.enabled);
603        assert_eq!(config.notifier_poll_interval, Duration::from_secs(60));
604        assert_eq!(config.timer_grace_period, Duration::from_millis(100));
605    }
606
607    // =======================================================================
608    // Category 1: NOTIFY Handling Tests
609    // =======================================================================
610
611    #[test]
612    fn notify_immediate_work_wakes_dispatchers() {
613        // NOTIFY with visible_at = now should wake dispatchers immediately
614        let now_ms = 1_700_000_000_000i64;
615        let window_end_ms = now_ms + 60_000;
616        let grace = Duration::from_millis(100);
617        let now_instant = Instant::now();
618
619        let action = parse_notify_action(
620            &now_ms.to_string(),
621            now_ms,
622            window_end_ms,
623            grace,
624            now_instant,
625        );
626
627        assert_eq!(action, NotifyAction::WakeNow);
628    }
629
630    #[test]
631    fn notify_past_visible_at_wakes_immediately() {
632        // NOTIFY with visible_at in the past should wake immediately
633        let now_ms = 1_700_000_000_000i64;
634        let past_ms = now_ms - 5_000; // 5 seconds ago
635        let window_end_ms = now_ms + 60_000;
636        let grace = Duration::from_millis(100);
637        let now_instant = Instant::now();
638
639        let action = parse_notify_action(
640            &past_ms.to_string(),
641            now_ms,
642            window_end_ms,
643            grace,
644            now_instant,
645        );
646
647        assert_eq!(action, NotifyAction::WakeNow);
648    }
649
650    #[test]
651    fn notify_future_timer_adds_to_heap() {
652        // NOTIFY with visible_at in the future (within window) should add timer
653        let now_ms = 1_700_000_000_000i64;
654        let future_ms = now_ms + 30_000; // 30 seconds from now
655        let window_end_ms = now_ms + 60_000;
656        let grace = Duration::from_millis(100);
657        let now_instant = Instant::now();
658
659        let action = parse_notify_action(
660            &future_ms.to_string(),
661            now_ms,
662            window_end_ms,
663            grace,
664            now_instant,
665        );
666
667        match action {
668            NotifyAction::AddTimer { fire_at } => {
669                // fire_at should be approximately now + 30s + 100ms grace
670                let expected_delay = Duration::from_millis(30_100);
671                let actual_delay = fire_at.duration_since(now_instant);
672                assert!(
673                    actual_delay >= expected_delay - Duration::from_millis(10)
674                        && actual_delay <= expected_delay + Duration::from_millis(10),
675                    "Expected delay ~30.1s, got {actual_delay:?}"
676                );
677            }
678            other => panic!("Expected AddTimer, got {other:?}"),
679        }
680    }
681
682    #[test]
683    fn notify_beyond_window_ignored() {
684        // NOTIFY with visible_at beyond the refresh window should be ignored
685        let now_ms = 1_700_000_000_000i64;
686        let far_future_ms = now_ms + 90_000; // 90 seconds (beyond 60s window)
687        let window_end_ms = now_ms + 60_000;
688        let grace = Duration::from_millis(100);
689        let now_instant = Instant::now();
690
691        let action = parse_notify_action(
692            &far_future_ms.to_string(),
693            now_ms,
694            window_end_ms,
695            grace,
696            now_instant,
697        );
698
699        assert_eq!(action, NotifyAction::Ignore);
700    }
701
702    #[test]
703    fn notify_invalid_payload_treated_as_immediate() {
704        // Invalid payload should be treated as immediate (parsed as 0)
705        let now_ms = 1_700_000_000_000i64;
706        let window_end_ms = now_ms + 60_000;
707        let grace = Duration::from_millis(100);
708        let now_instant = Instant::now();
709
710        let action = parse_notify_action("garbage", now_ms, window_end_ms, grace, now_instant);
711
712        assert_eq!(action, NotifyAction::WakeNow);
713    }
714
715    #[test]
716    fn notify_empty_payload_treated_as_immediate() {
717        // Empty payload should be treated as immediate (parsed as 0)
718        let now_ms = 1_700_000_000_000i64;
719        let window_end_ms = now_ms + 60_000;
720        let grace = Duration::from_millis(100);
721        let now_instant = Instant::now();
722
723        let action = parse_notify_action("", now_ms, window_end_ms, grace, now_instant);
724
725        assert_eq!(action, NotifyAction::WakeNow);
726    }
727
728    // =======================================================================
729    // Category 2: Timer Heap Management Tests
730    // =======================================================================
731
732    #[test]
733    fn timer_fires_at_visible_at_plus_grace() {
734        // Timer should fire at visible_at + grace_period
735        let now_ms = 1_700_000_000_000i64;
736        let visible_at_ms = now_ms + 10_000; // 10 seconds from now
737        let window_end_ms = now_ms + 60_000;
738        let grace = Duration::from_millis(100);
739        let now_instant = Instant::now();
740
741        let action = parse_notify_action(
742            &visible_at_ms.to_string(),
743            now_ms,
744            window_end_ms,
745            grace,
746            now_instant,
747        );
748
749        match action {
750            NotifyAction::AddTimer { fire_at } => {
751                let delay = fire_at.duration_since(now_instant);
752                // Should be 10s + 100ms = 10.1s
753                let expected = Duration::from_millis(10_100);
754                assert!(
755                    delay >= expected - Duration::from_millis(5)
756                        && delay <= expected + Duration::from_millis(5),
757                    "Timer should fire at visible_at + grace, got {delay:?}"
758                );
759            }
760            other => panic!("Expected AddTimer, got {other:?}"),
761        }
762    }
763
764    #[test]
765    fn timer_heap_ordering() {
766        // Min-heap should return timers in order (earliest first)
767        let mut heap: BinaryHeap<Reverse<Instant>> = BinaryHeap::new();
768        let now = Instant::now();
769
770        let t1 = now + Duration::from_secs(10);
771        let t2 = now + Duration::from_secs(5);
772        let t3 = now + Duration::from_secs(15);
773
774        heap.push(Reverse(t1));
775        heap.push(Reverse(t2));
776        heap.push(Reverse(t3));
777
778        // Should pop in order: t2 (5s), t1 (10s), t3 (15s)
779        assert_eq!(heap.pop().unwrap().0, t2);
780        assert_eq!(heap.pop().unwrap().0, t1);
781        assert_eq!(heap.pop().unwrap().0, t3);
782    }
783
784    #[test]
785    fn expired_timers_popped_in_batch() {
786        // Multiple expired timers should all be poppable
787        let mut heap: BinaryHeap<Reverse<Instant>> = BinaryHeap::new();
788        let past = Instant::now() - Duration::from_secs(1);
789
790        // Add 3 timers all in the past
791        heap.push(Reverse(past - Duration::from_millis(100)));
792        heap.push(Reverse(past - Duration::from_millis(200)));
793        heap.push(Reverse(past - Duration::from_millis(300)));
794
795        let now = Instant::now();
796        let mut fired = 0;
797
798        while let Some(Reverse(fire_at)) = heap.peek() {
799            if *fire_at <= now {
800                heap.pop();
801                fired += 1;
802            } else {
803                break;
804            }
805        }
806
807        assert_eq!(fired, 3);
808        assert!(heap.is_empty());
809    }
810
811    #[test]
812    fn timer_does_not_fire_early() {
813        // A timer in the future should not fire yet
814        let mut heap: BinaryHeap<Reverse<Instant>> = BinaryHeap::new();
815        let now = Instant::now();
816        let future = now + Duration::from_secs(10);
817
818        heap.push(Reverse(future));
819
820        // Simulate checking at "now" - timer should not fire
821        if let Some(Reverse(fire_at)) = heap.peek() {
822            assert!(*fire_at > now, "Timer should not fire early");
823        }
824    }
825
826    // =======================================================================
827    // Category 3: Refresh Query Tests
828    // =======================================================================
829
830    #[test]
831    fn refresh_adds_timers_to_heap() {
832        // Refresh with future timers should add them to the heap
833        let now_ms = 1_700_000_000_000i64;
834        let grace = Duration::from_millis(100);
835        let now_instant = Instant::now();
836
837        let timers = vec![
838            now_ms + 10_000, // 10s from now
839            now_ms + 30_000, // 30s from now
840        ];
841
842        let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
843
844        assert_eq!(result.len(), 2);
845        // First timer should fire at ~10.1s
846        let delay1 = result[0].duration_since(now_instant);
847        assert!(delay1 >= Duration::from_millis(10_000));
848        assert!(delay1 <= Duration::from_millis(10_200));
849    }
850
851    #[test]
852    fn refresh_skips_already_passed_timers() {
853        // Timers in the past should not be added
854        let now_ms = 1_700_000_000_000i64;
855        let grace = Duration::from_millis(100);
856        let now_instant = Instant::now();
857
858        let timers = vec![
859            now_ms - 5_000, // 5s ago - should be skipped
860            now_ms + 100,   // 100ms from now - delay would be 200ms, should be added
861        ];
862
863        let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
864
865        // Only the future timer should be added
866        assert_eq!(result.len(), 1);
867    }
868
869    #[test]
870    fn refresh_with_empty_result() {
871        // Empty refresh result should produce no timers
872        let now_ms = 1_700_000_000_000i64;
873        let grace = Duration::from_millis(100);
874        let now_instant = Instant::now();
875
876        let timers: Vec<i64> = vec![];
877        let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
878
879        assert!(result.is_empty());
880    }
881
882    #[test]
883    fn refresh_timer_includes_grace_period() {
884        // Timers from refresh should include grace period
885        let now_ms = 1_700_000_000_000i64;
886        let grace = Duration::from_millis(500); // Larger grace for clearer test
887        let now_instant = Instant::now();
888
889        let timers = vec![now_ms + 10_000]; // 10s from now
890
891        let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
892
893        assert_eq!(result.len(), 1);
894        let delay = result[0].duration_since(now_instant);
895        // Should be 10s + 500ms = 10.5s
896        assert!(delay >= Duration::from_millis(10_400));
897        assert!(delay <= Duration::from_millis(10_600));
898    }
899
900    #[test]
901    fn refresh_boundary_timer_at_exactly_now() {
902        // Timer exactly at now should have delay = grace_period only
903        let now_ms = 1_700_000_000_000i64;
904        let grace = Duration::from_millis(100);
905        let now_instant = Instant::now();
906
907        let timers = vec![now_ms]; // Exactly now
908
909        let result = timers_from_refresh(&timers, now_ms, grace, now_instant);
910
911        // delay = (now_ms - now_ms) + grace = grace = 100ms
912        // This is > 0, so it should be added
913        assert_eq!(result.len(), 1);
914        let delay = result[0].duration_since(now_instant);
915        assert!(delay >= Duration::from_millis(90));
916        assert!(delay <= Duration::from_millis(110));
917    }
918
919    // =======================================================================
920    // Edge Case Tests
921    // =======================================================================
922
923    #[test]
924    fn notify_at_window_boundary_included() {
925        // Timer exactly at window end should be included
926        let now_ms = 1_700_000_000_000i64;
927        let window_end_ms = now_ms + 60_000;
928        let grace = Duration::from_millis(100);
929        let now_instant = Instant::now();
930
931        let action = parse_notify_action(
932            &window_end_ms.to_string(),
933            now_ms,
934            window_end_ms,
935            grace,
936            now_instant,
937        );
938
939        // At window_end should still be included (<=)
940        match action {
941            NotifyAction::AddTimer { .. } => {}
942            other => panic!("Expected AddTimer at window boundary, got {other:?}"),
943        }
944    }
945
946    #[test]
947    fn notify_just_past_window_boundary_ignored() {
948        // Timer just past window end should be ignored
949        let now_ms = 1_700_000_000_000i64;
950        let window_end_ms = now_ms + 60_000;
951        let grace = Duration::from_millis(100);
952        let now_instant = Instant::now();
953
954        let action = parse_notify_action(
955            &(window_end_ms + 1).to_string(),
956            now_ms,
957            window_end_ms,
958            grace,
959            now_instant,
960        );
961
962        assert_eq!(action, NotifyAction::Ignore);
963    }
964
965    #[test]
966    fn notify_negative_timestamp_treated_as_immediate() {
967        // Negative timestamp (way in the past) should wake immediately
968        let now_ms = 1_700_000_000_000i64;
969        let window_end_ms = now_ms + 60_000;
970        let grace = Duration::from_millis(100);
971        let now_instant = Instant::now();
972
973        let action = parse_notify_action("-1000", now_ms, window_end_ms, grace, now_instant);
974
975        assert_eq!(action, NotifyAction::WakeNow);
976    }
977
978    #[test]
979    fn notify_zero_timestamp_treated_as_immediate() {
980        // Zero timestamp (epoch) is way in the past, should wake immediately
981        let now_ms = 1_700_000_000_000i64;
982        let window_end_ms = now_ms + 60_000;
983        let grace = Duration::from_millis(100);
984        let now_instant = Instant::now();
985
986        let action = parse_notify_action("0", now_ms, window_end_ms, grace, now_instant);
987
988        assert_eq!(action, NotifyAction::WakeNow);
989    }
990}