Skip to main content

wavekat_sip/
session_timer.rs

1//! RFC 4028 session timers — keep long calls from outliving a dead dialog.
2//!
3//! Without session timers, a call whose dialog silently died (peer
4//! crashed, NAT binding dropped, proxy lost state) lives forever:
5//! nothing on the signaling path re-validates the dialog, so an
6//! unattended consumer (voice bot, AI agent) keeps streaming RTP into
7//! the void. RFC 4028 bounds that window: one side periodically
8//! refreshes the session with a re-INVITE, and the other side tears the
9//! call down with BYE if no refresh arrives before the negotiated
10//! session interval lapses.
11//!
12//! This module has three layers:
13//!
14//! 1. **Pure header logic** — [`SessionExpires`] parse/build for the
15//!    `Session-Expires` header (with its `;refresher=uac|uas` param),
16//!    [`min_se_in`] for `Min-SE`, and [`supports_timer`] for the
17//!    `Supported: timer` option tag. rsip 0.4 has no typed variants for
18//!    these, so they are parsed manually from
19//!    [`rsip::Header::Other`] — same style as the crate's SDP and RTP
20//!    parsing.
21//! 2. **Negotiation** — [`negotiate_uac`] (from a 2xx response, caller
22//!    side) and [`negotiate_uas`] (from an INVITE, callee side) decide
23//!    the [`SessionTimer`]: the negotiated interval and whether *we*
24//!    are the refresher. [`SessionTimer::refresh_after`] /
25//!    [`SessionTimer::expiry_after`] give the RFC 4028 §10 schedule.
26//! 3. **Runtime** — [`session_timer_loop`], shaped like
27//!    [`crate::Registrar::keepalive_loop`]: a `select!` over sleeps and
28//!    a [`CancellationToken`] that either sends the periodic refresh
29//!    re-INVITE (when we are the refresher) or watches for the peer's
30//!    refreshes and sends BYE when the session lapses.
31//!
32//! # Wiring it up
33//!
34//! [`crate::Caller`] and [`crate::IncomingCall`] negotiate for you:
35//! [`crate::Call::session_timer`] carries the result. Drive the loop
36//! against the call's shareable dialog handle
37//! ([`crate::Call::session_handle`]) so it runs alongside the audio path:
38//!
39//! ```ignore
40//! if let Some(timer) = call.session_timer() {
41//!     let handle = call.session_handle();
42//!     let refreshed = std::sync::Arc::new(tokio::sync::Notify::new());
43//!     let sdp = build_sdp(local_ip, rtp_port); // repeat our offer
44//!     tokio::spawn(async move {
45//!         let outcome =
46//!             session_timer_loop(&handle, timer, Some(sdp), refreshed, cancel).await;
47//!         tracing::info!(?outcome, "session timer finished");
48//!     });
49//! }
50//! ```
51//!
52//! When the **peer** is the refresher, its refresh re-INVITEs arrive as
53//! inbound in-dialog requests. The consumer answers them with an SDP
54//! answer (echoing `Session-Expires`) and pings `refreshed` so the
55//! watchdog deadline resets.
56
57use std::future::Future;
58use std::sync::Arc;
59use std::time::Duration;
60
61use rsip::prelude::UntypedHeader;
62use rsip::Header;
63use tokio::select;
64use tokio::sync::Notify;
65use tokio_util::sync::CancellationToken;
66use tracing::{debug, info, warn};
67
68type BoxError = Box<dyn std::error::Error + Send + Sync>;
69
70/// Smallest session interval we will run a timer at, in seconds.
71/// RFC 4028 §4 fixes 90 s as the absolute minimum (and the default
72/// `Min-SE`); anything shorter would churn re-INVITEs.
73pub const MIN_SESSION_EXPIRES_SECS: u32 = 90;
74
75/// Session interval requested on outbound INVITEs, in seconds — the
76/// RFC 4028 recommended default of 30 minutes.
77pub const DEFAULT_SESSION_EXPIRES_SECS: u32 = 1800;
78
79/// Cap on how far *before* the session expiry the non-refresher fires
80/// its BYE watchdog (RFC 4028 §10: `min(32 s, interval / 3)`).
81const MAX_EXPIRY_HEADROOM_SECS: u32 = 32;
82
83/// Which side of the original INVITE transaction performs refreshes —
84/// the value space of the `Session-Expires` `refresher` parameter.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum Refresher {
87    /// The party that sent the request refreshes.
88    Uac,
89    /// The party that answered the request refreshes.
90    Uas,
91}
92
93impl Refresher {
94    /// Canonical lowercase parameter value (`"uac"` / `"uas"`).
95    pub fn as_str(&self) -> &'static str {
96        match self {
97            Refresher::Uac => "uac",
98            Refresher::Uas => "uas",
99        }
100    }
101
102    fn parse(s: &str) -> Result<Self, String> {
103        if s.eq_ignore_ascii_case("uac") {
104            Ok(Refresher::Uac)
105        } else if s.eq_ignore_ascii_case("uas") {
106            Ok(Refresher::Uas)
107        } else {
108            Err(format!("invalid refresher value {s:?} (want uac|uas)"))
109        }
110    }
111}
112
113/// Parsed `Session-Expires` header value: interval in seconds plus the
114/// optional `refresher` parameter.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub struct SessionExpires {
117    /// Session interval in seconds.
118    pub interval_secs: u32,
119    /// Who refreshes, if pinned. `None` means "answerer's choice".
120    pub refresher: Option<Refresher>,
121}
122
123impl SessionExpires {
124    /// Parse a `Session-Expires` header *value* (not the full header
125    /// line), e.g. `"1800"` or `"1800;refresher=uas"`. Parameter names
126    /// and values are case-insensitive; unknown parameters are ignored
127    /// per RFC 4028 §4.
128    pub fn parse(value: &str) -> Result<Self, String> {
129        let mut parts = value.split(';');
130        let interval = parts.next().unwrap_or("").trim();
131        let interval_secs: u32 = interval
132            .parse()
133            .map_err(|e| format!("invalid Session-Expires interval {interval:?}: {e}"))?;
134        let mut refresher = None;
135        for param in parts {
136            if let Some((name, val)) = param.split_once('=') {
137                if name.trim().eq_ignore_ascii_case("refresher") {
138                    refresher = Some(Refresher::parse(val.trim())?);
139                }
140            }
141        }
142        Ok(Self {
143            interval_secs,
144            refresher,
145        })
146    }
147
148    /// Serialize back to a header value, e.g. `"1800;refresher=uac"`.
149    pub fn header_value(&self) -> String {
150        match self.refresher {
151            Some(r) => format!("{};refresher={}", self.interval_secs, r.as_str()),
152            None => self.interval_secs.to_string(),
153        }
154    }
155
156    /// Build the untyped `Session-Expires` header (rsip 0.4 has no
157    /// typed variant).
158    pub fn header(&self) -> Header {
159        Header::Other("Session-Expires".into(), self.header_value())
160    }
161}
162
163/// `Supported: timer` — advertise RFC 4028 support on a request or
164/// response.
165pub fn supported_timer_header() -> Header {
166    Header::Supported("timer".into())
167}
168
169/// `Require: timer` — placed in a 2xx by the answerer when the offerer
170/// advertised timer support (RFC 4028 §9).
171pub fn require_timer_header() -> Header {
172    Header::Require("timer".into())
173}
174
175/// Find the value of the first untyped header matching any of `names`
176/// (case-insensitive).
177fn other_header_value<'a>(headers: &'a rsip::Headers, names: &[&str]) -> Option<&'a str> {
178    headers.iter().find_map(|h| match h {
179        Header::Other(name, value) if names.iter().any(|n| name.eq_ignore_ascii_case(n)) => {
180            Some(value.as_str())
181        }
182        _ => None,
183    })
184}
185
186/// Extract and parse the `Session-Expires` header (long or compact `x`
187/// form) from a header list. Malformed values are logged and treated as
188/// absent — a broken peer header must not kill the call.
189pub fn session_expires_in(headers: &rsip::Headers) -> Option<SessionExpires> {
190    let raw = other_header_value(headers, &["Session-Expires", "x"])?;
191    match SessionExpires::parse(raw) {
192        Ok(se) => Some(se),
193        Err(e) => {
194            warn!("ignoring malformed Session-Expires header: {e}");
195            None
196        }
197    }
198}
199
200/// Extract the `Min-SE` interval (seconds) from a header list, ignoring
201/// any generic parameters. Malformed values are treated as absent.
202pub fn min_se_in(headers: &rsip::Headers) -> Option<u32> {
203    let raw = other_header_value(headers, &["Min-SE"])?;
204    let interval = raw.split(';').next().unwrap_or("").trim();
205    match interval.parse() {
206        Ok(v) => Some(v),
207        Err(e) => {
208            warn!("ignoring malformed Min-SE header {raw:?}: {e}");
209            None
210        }
211    }
212}
213
214fn has_timer_token(value: &str) -> bool {
215    value
216        .split(',')
217        .any(|tag| tag.trim().eq_ignore_ascii_case("timer"))
218}
219
220/// `true` if any `Supported` header (typed, untyped, or compact `k`
221/// form) carries the `timer` option tag.
222pub fn supports_timer(headers: &rsip::Headers) -> bool {
223    headers.iter().any(|h| match h {
224        Header::Supported(s) => has_timer_token(s.value()),
225        Header::Other(name, value)
226            if name.eq_ignore_ascii_case("Supported") || name.eq_ignore_ascii_case("k") =>
227        {
228            has_timer_token(value)
229        }
230        _ => false,
231    })
232}
233
234/// Negotiated session-timer state for one dialog, from our side's
235/// perspective.
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct SessionTimer {
238    /// Negotiated session interval in seconds.
239    pub interval_secs: u32,
240    /// `true` if we send the periodic refresh re-INVITEs; `false` if we
241    /// only watch for the peer's refreshes and BYE on expiry.
242    pub we_are_refresher: bool,
243}
244
245impl SessionTimer {
246    /// When the refresher sends its refresh: half the session interval
247    /// after the last refresh (RFC 4028 §10).
248    pub fn refresh_after(&self) -> Duration {
249        Duration::from_secs(u64::from(self.interval_secs / 2))
250    }
251
252    /// When the non-refresher gives up and sends BYE: the session
253    /// interval minus `min(32 s, interval / 3)` after the last refresh
254    /// (RFC 4028 §10).
255    pub fn expiry_after(&self) -> Duration {
256        let headroom = (self.interval_secs / 3).min(MAX_EXPIRY_HEADROOM_SECS);
257        Duration::from_secs(u64::from(self.interval_secs.saturating_sub(headroom)))
258    }
259}
260
261/// UAC-side negotiation: decide the [`SessionTimer`] from the 2xx
262/// response to our INVITE.
263///
264/// Returns `None` when the response carries no `Session-Expires` — the
265/// answerer declined (or doesn't support) session timers, so no timer
266/// runs. When the (mandatory per RFC 4028 §9) `refresher` parameter is
267/// missing we defensively take the refresher role ourselves: refreshing
268/// when the peer also refreshes is redundant but harmless, while *not*
269/// refreshing when the peer expects us to would drop the call.
270///
271/// The interval is floored at [`MIN_SESSION_EXPIRES_SECS`] so a bogus
272/// tiny grant can't spin the refresh loop.
273pub fn negotiate_uac(response_headers: &rsip::Headers) -> Option<SessionTimer> {
274    let se = session_expires_in(response_headers)?;
275    Some(SessionTimer {
276        interval_secs: se.interval_secs.max(MIN_SESSION_EXPIRES_SECS),
277        we_are_refresher: !matches!(se.refresher, Some(Refresher::Uas)),
278    })
279}
280
281/// UAS-side negotiation result: the timer to run plus what to put in
282/// our 2xx so the peer agrees on it.
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub struct UasSessionTimer {
285    /// Timer state from our (UAS) perspective.
286    pub timer: SessionTimer,
287    /// `Session-Expires` to echo in the 2xx (interval + pinned
288    /// refresher).
289    pub echo: SessionExpires,
290    /// `true` if the peer advertised `Supported: timer`, in which case
291    /// the 2xx must also carry `Require: timer` (RFC 4028 §9).
292    pub require_timer: bool,
293}
294
295/// UAS-side negotiation: decide the session timer from an inbound
296/// INVITE's headers.
297///
298/// Returns `None` when the INVITE carries no `Session-Expires` — we do
299/// not insert timers into sessions the caller didn't ask for (allowed
300/// by RFC 4028, deliberately deferred; see
301/// `docs/07-session-timers.md`).
302///
303/// The interval is floored at `max(90, Min-SE)`. The refresher is the
304/// request's `refresher` parameter when the peer advertised timer
305/// support, defaulting to `uac` (peer refreshes). When the peer did
306/// *not* advertise `Supported: timer` — e.g. a proxy inserted the
307/// `Session-Expires` — the peer cannot refresh, so we take the
308/// refresher role and skip `Require: timer`.
309pub fn negotiate_uas(invite_headers: &rsip::Headers) -> Option<UasSessionTimer> {
310    let se = session_expires_in(invite_headers)?;
311    let floor = min_se_in(invite_headers)
312        .unwrap_or(0)
313        .max(MIN_SESSION_EXPIRES_SECS);
314    let interval_secs = se.interval_secs.max(floor);
315    let peer_supports = supports_timer(invite_headers);
316    let refresher = if peer_supports {
317        se.refresher.unwrap_or(Refresher::Uac)
318    } else {
319        Refresher::Uas
320    };
321    Some(UasSessionTimer {
322        timer: SessionTimer {
323            interval_secs,
324            we_are_refresher: refresher == Refresher::Uas,
325        },
326        echo: SessionExpires {
327            interval_secs,
328            refresher: Some(refresher),
329        },
330        require_timer: peer_supports,
331    })
332}
333
334/// The dialog operations [`session_timer_loop`] needs, abstracted so
335/// the loop's timing logic stays unit-testable without a live dialog.
336///
337/// Implemented for [`crate::Call`]'s shareable session handle over the
338/// engine's in-dialog re-INVITE / BYE.
339pub trait SessionDialogOps {
340    /// Send a session-refresh re-INVITE with the given extra headers
341    /// and (typically SDP) body. Returns the final response, or
342    /// `Ok(None)` if the dialog is no longer confirmed.
343    fn refresh(
344        &self,
345        headers: Vec<Header>,
346        body: Option<Vec<u8>>,
347    ) -> impl Future<Output = Result<Option<rsip::Response>, BoxError>> + Send;
348
349    /// Hang up the dialog with BYE.
350    fn send_bye(&self) -> impl Future<Output = Result<(), BoxError>> + Send;
351}
352
353/// How [`session_timer_loop`] ended.
354#[derive(Debug, Clone, Copy, PartialEq, Eq)]
355pub enum SessionTimerOutcome {
356    /// The [`CancellationToken`] fired — the call ended through the
357    /// normal path (local/remote BYE) and the loop just stood down.
358    Cancelled,
359    /// We were the watchdog and no refresh arrived before the session
360    /// interval lapsed. A BYE was sent (best effort).
361    Expired,
362    /// We were the refresher and a refresh re-INVITE failed (non-2xx or
363    /// transport error). A BYE was sent (best effort).
364    RefreshFailed,
365    /// The dialog was no longer confirmed when we tried to refresh —
366    /// it already terminated through another path. No BYE needed.
367    DialogGone,
368}
369
370/// Drive RFC 4028 session keepalive for one confirmed dialog. Runs
371/// until cancelled or the session dies; same shape as
372/// [`crate::Registrar::keepalive_loop`].
373///
374/// - When `timer.we_are_refresher`, sends a refresh re-INVITE every
375///   `interval / 2` carrying `refresh_body` (repeat the SDP you sent
376///   when the call was set up — an identical offer is a no-op per
377///   RFC 3264). A 2xx resets the clock (adopting any `Session-Expires`
378///   the peer granted); failure tears the call down with BYE.
379/// - Otherwise runs the expiry watchdog: every
380///   `peer_refreshed.notify_one()` resets the deadline, and if the
381///   deadline lapses the call is torn down with BYE. The consumer pings
382///   `peer_refreshed` after answering the peer's refresh re-INVITE.
383///
384/// All failures are folded into the returned [`SessionTimerOutcome`];
385/// the loop never panics and never returns early without standing the
386/// session down.
387pub async fn session_timer_loop<D: SessionDialogOps>(
388    dialog: &D,
389    timer: SessionTimer,
390    refresh_body: Option<Vec<u8>>,
391    peer_refreshed: Arc<Notify>,
392    cancel: CancellationToken,
393) -> SessionTimerOutcome {
394    let mut interval_secs = timer.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
395    if timer.we_are_refresher {
396        loop {
397            let current = SessionTimer {
398                interval_secs,
399                we_are_refresher: true,
400            };
401            select! {
402                _ = tokio::time::sleep(current.refresh_after()) => {}
403                _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
404            }
405
406            // In this refresh re-INVITE we are the UAC of the new
407            // transaction, so the refresher param says `uac`.
408            let headers = vec![
409                supported_timer_header(),
410                SessionExpires {
411                    interval_secs,
412                    refresher: Some(Refresher::Uac),
413                }
414                .header(),
415            ];
416            match dialog.refresh(headers, refresh_body.clone()).await {
417                Ok(Some(resp)) if resp.status_code.kind() == rsip::StatusCodeKind::Successful => {
418                    // Adopt a re-granted interval (a peer may shorten or
419                    // lengthen mid-call), floored to avoid a hot loop.
420                    if let Some(granted) = session_expires_in(&resp.headers) {
421                        interval_secs = granted.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
422                    }
423                    debug!(interval_secs, "session refresh accepted");
424                }
425                Ok(Some(resp)) => {
426                    warn!(
427                        status = %resp.status_code,
428                        "session refresh rejected; hanging up"
429                    );
430                    if let Err(e) = dialog.send_bye().await {
431                        warn!("BYE after rejected refresh failed: {e}");
432                    }
433                    return SessionTimerOutcome::RefreshFailed;
434                }
435                Ok(None) => {
436                    debug!("dialog no longer confirmed; session timer standing down");
437                    return SessionTimerOutcome::DialogGone;
438                }
439                Err(e) => {
440                    warn!("session refresh error: {e}; hanging up");
441                    if let Err(e) = dialog.send_bye().await {
442                        warn!("BYE after failed refresh failed: {e}");
443                    }
444                    return SessionTimerOutcome::RefreshFailed;
445                }
446            }
447        }
448    } else {
449        let current = SessionTimer {
450            interval_secs,
451            we_are_refresher: false,
452        };
453        loop {
454            select! {
455                _ = tokio::time::sleep(current.expiry_after()) => {
456                    info!(
457                        interval_secs,
458                        "session lapsed without refresh; sending BYE"
459                    );
460                    if let Err(e) = dialog.send_bye().await {
461                        warn!("BYE after session expiry failed: {e}");
462                    }
463                    return SessionTimerOutcome::Expired;
464                }
465                _ = peer_refreshed.notified() => {
466                    debug!("peer refreshed session; watchdog deadline reset");
467                }
468                _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
469            }
470        }
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use std::sync::Mutex;
478
479    // ---- SessionExpires parse / build ----
480
481    #[test]
482    fn parse_bare_interval() {
483        let se = SessionExpires::parse("1800").unwrap();
484        assert_eq!(se.interval_secs, 1800);
485        assert_eq!(se.refresher, None);
486    }
487
488    #[test]
489    fn parse_with_refresher_param() {
490        let se = SessionExpires::parse("1800;refresher=uas").unwrap();
491        assert_eq!(se.interval_secs, 1800);
492        assert_eq!(se.refresher, Some(Refresher::Uas));
493        let se = SessionExpires::parse("90;refresher=uac").unwrap();
494        assert_eq!(se.refresher, Some(Refresher::Uac));
495    }
496
497    #[test]
498    fn parse_is_case_insensitive_and_whitespace_tolerant() {
499        let se = SessionExpires::parse(" 600 ; Refresher = UAS ").unwrap();
500        assert_eq!(se.interval_secs, 600);
501        assert_eq!(se.refresher, Some(Refresher::Uas));
502    }
503
504    #[test]
505    fn parse_ignores_unknown_params() {
506        let se = SessionExpires::parse("1800;foo=bar;refresher=uac;baz").unwrap();
507        assert_eq!(se.refresher, Some(Refresher::Uac));
508    }
509
510    #[test]
511    fn parse_rejects_garbage() {
512        assert!(SessionExpires::parse("").is_err());
513        assert!(SessionExpires::parse("soon").is_err());
514        assert!(SessionExpires::parse("1800;refresher=bogus").is_err());
515        assert!(SessionExpires::parse("-5").is_err());
516    }
517
518    #[test]
519    fn header_value_round_trips() {
520        for se in [
521            SessionExpires {
522                interval_secs: 1800,
523                refresher: None,
524            },
525            SessionExpires {
526                interval_secs: 90,
527                refresher: Some(Refresher::Uac),
528            },
529            SessionExpires {
530                interval_secs: 7200,
531                refresher: Some(Refresher::Uas),
532            },
533        ] {
534            let parsed = SessionExpires::parse(&se.header_value()).unwrap();
535            assert_eq!(parsed, se, "round-trip via {:?}", se.header_value());
536        }
537    }
538
539    #[test]
540    fn header_builds_untyped_session_expires() {
541        let h = SessionExpires {
542            interval_secs: 1800,
543            refresher: Some(Refresher::Uac),
544        }
545        .header();
546        assert_eq!(h.to_string(), "Session-Expires: 1800;refresher=uac");
547    }
548
549    // ---- header extraction ----
550
551    fn headers(items: Vec<Header>) -> rsip::Headers {
552        let mut h = rsip::Headers::default();
553        for item in items {
554            h.push(item);
555        }
556        h
557    }
558
559    #[test]
560    fn session_expires_in_finds_header_case_insensitively() {
561        let h = headers(vec![Header::Other(
562            "session-expires".into(),
563            "600;refresher=uas".into(),
564        )]);
565        let se = session_expires_in(&h).unwrap();
566        assert_eq!(se.interval_secs, 600);
567        assert_eq!(se.refresher, Some(Refresher::Uas));
568    }
569
570    #[test]
571    fn session_expires_in_accepts_compact_form() {
572        // RFC 4028 §4 defines `x` as the compact form of Session-Expires.
573        let h = headers(vec![Header::Other("x".into(), "300".into())]);
574        assert_eq!(
575            session_expires_in(&h),
576            Some(SessionExpires {
577                interval_secs: 300,
578                refresher: None
579            })
580        );
581    }
582
583    #[test]
584    fn session_expires_in_absent_or_malformed_is_none() {
585        assert_eq!(session_expires_in(&headers(vec![])), None);
586        let h = headers(vec![Header::Other("Session-Expires".into(), "soon".into())]);
587        assert_eq!(session_expires_in(&h), None);
588    }
589
590    #[test]
591    fn min_se_in_parses_and_ignores_params() {
592        let h = headers(vec![Header::Other("Min-SE".into(), "120".into())]);
593        assert_eq!(min_se_in(&h), Some(120));
594        let h = headers(vec![Header::Other("min-se".into(), "240;lr".into())]);
595        assert_eq!(min_se_in(&h), Some(240));
596        assert_eq!(min_se_in(&headers(vec![])), None);
597        let h = headers(vec![Header::Other("Min-SE".into(), "never".into())]);
598        assert_eq!(min_se_in(&h), None);
599    }
600
601    #[test]
602    fn supports_timer_scans_typed_untyped_and_compact() {
603        assert!(supports_timer(&headers(vec![supported_timer_header()])));
604        assert!(supports_timer(&headers(vec![Header::Supported(
605            "100rel, timer".into()
606        )])));
607        assert!(supports_timer(&headers(vec![Header::Other(
608            "k".into(),
609            "timer".into()
610        )])));
611        assert!(supports_timer(&headers(vec![Header::Other(
612            "Supported".into(),
613            "TIMER".into()
614        )])));
615        assert!(!supports_timer(&headers(vec![])));
616        assert!(!supports_timer(&headers(vec![Header::Supported(
617            "100rel".into()
618        )])));
619        // `timer` must be a whole option tag, not a substring.
620        assert!(!supports_timer(&headers(vec![Header::Supported(
621            "timers".into()
622        )])));
623    }
624
625    // ---- interval math (RFC 4028 §10) ----
626
627    #[test]
628    fn refresh_fires_at_half_the_interval() {
629        let t = SessionTimer {
630            interval_secs: 1800,
631            we_are_refresher: true,
632        };
633        assert_eq!(t.refresh_after(), Duration::from_secs(900));
634        let t = SessionTimer {
635            interval_secs: 90,
636            we_are_refresher: true,
637        };
638        assert_eq!(t.refresh_after(), Duration::from_secs(45));
639    }
640
641    #[test]
642    fn expiry_keeps_min_of_32s_or_a_third_headroom() {
643        // Long interval: headroom caps at 32 s.
644        let t = SessionTimer {
645            interval_secs: 1800,
646            we_are_refresher: false,
647        };
648        assert_eq!(t.expiry_after(), Duration::from_secs(1768));
649        // Short interval: a third of 90 s = 30 s < 32 s.
650        let t = SessionTimer {
651            interval_secs: 90,
652            we_are_refresher: false,
653        };
654        assert_eq!(t.expiry_after(), Duration::from_secs(60));
655    }
656
657    // ---- negotiation: UAC side ----
658
659    #[test]
660    fn uac_no_session_expires_means_no_timer() {
661        assert_eq!(negotiate_uac(&headers(vec![])), None);
662    }
663
664    #[test]
665    fn uac_refresher_uas_means_peer_refreshes() {
666        let h = headers(vec![Header::Other(
667            "Session-Expires".into(),
668            "1800;refresher=uas".into(),
669        )]);
670        assert_eq!(
671            negotiate_uac(&h),
672            Some(SessionTimer {
673                interval_secs: 1800,
674                we_are_refresher: false
675            })
676        );
677    }
678
679    #[test]
680    fn uac_refresher_uac_or_missing_means_we_refresh() {
681        let h = headers(vec![Header::Other(
682            "Session-Expires".into(),
683            "600;refresher=uac".into(),
684        )]);
685        assert!(negotiate_uac(&h).unwrap().we_are_refresher);
686        // RFC 4028 §9 says the 2xx MUST pin the refresher; a peer that
687        // omits it gets the defensive default: we refresh.
688        let h = headers(vec![Header::Other("Session-Expires".into(), "600".into())]);
689        assert!(negotiate_uac(&h).unwrap().we_are_refresher);
690    }
691
692    #[test]
693    fn uac_floors_tiny_grants_at_90s() {
694        let h = headers(vec![Header::Other(
695            "Session-Expires".into(),
696            "20;refresher=uac".into(),
697        )]);
698        assert_eq!(negotiate_uac(&h).unwrap().interval_secs, 90);
699    }
700
701    // ---- negotiation: UAS side ----
702
703    fn invite_headers(session_expires: &str, min_se: Option<&str>, timer: bool) -> rsip::Headers {
704        let mut items = vec![Header::Other(
705            "Session-Expires".into(),
706            session_expires.into(),
707        )];
708        if let Some(m) = min_se {
709            items.push(Header::Other("Min-SE".into(), m.into()));
710        }
711        if timer {
712            items.push(supported_timer_header());
713        }
714        headers(items)
715    }
716
717    #[test]
718    fn uas_no_session_expires_means_no_timer() {
719        assert_eq!(
720            negotiate_uas(&headers(vec![supported_timer_header()])),
721            None
722        );
723    }
724
725    #[test]
726    fn uas_default_makes_supporting_peer_the_refresher() {
727        let uas = negotiate_uas(&invite_headers("1800", None, true)).unwrap();
728        assert_eq!(uas.timer.interval_secs, 1800);
729        assert!(!uas.timer.we_are_refresher, "peer (UAC) should refresh");
730        assert_eq!(
731            uas.echo,
732            SessionExpires {
733                interval_secs: 1800,
734                refresher: Some(Refresher::Uac)
735            }
736        );
737        assert!(uas.require_timer);
738    }
739
740    #[test]
741    fn uas_honors_requested_refresher_uas() {
742        let uas = negotiate_uas(&invite_headers("1800;refresher=uas", None, true)).unwrap();
743        assert!(uas.timer.we_are_refresher, "we (UAS) were asked to refresh");
744        assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
745    }
746
747    #[test]
748    fn uas_without_peer_support_takes_refresher_role() {
749        // A proxy inserted Session-Expires but the UAC itself never
750        // advertised `Supported: timer` — it can't refresh, so we must,
751        // and we must not Require: timer in the 2xx.
752        let uas = negotiate_uas(&invite_headers("1800;refresher=uac", None, false)).unwrap();
753        assert!(uas.timer.we_are_refresher);
754        assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
755        assert!(!uas.require_timer);
756    }
757
758    #[test]
759    fn uas_floors_interval_at_min_se_and_90s() {
760        // Requested 30 s with Min-SE 120 → 120 s.
761        let uas = negotiate_uas(&invite_headers("30", Some("120"), true)).unwrap();
762        assert_eq!(uas.timer.interval_secs, 120);
763        assert_eq!(uas.echo.interval_secs, 120);
764        // Requested 30 s without Min-SE → RFC floor of 90 s.
765        let uas = negotiate_uas(&invite_headers("30", None, true)).unwrap();
766        assert_eq!(uas.timer.interval_secs, 90);
767    }
768
769    // ---- session_timer_loop against a mock dialog ----
770
771    #[derive(Debug, Clone, PartialEq, Eq)]
772    enum Event {
773        Refresh { session_expires: String },
774        Bye,
775    }
776
777    /// Scripted mock: pops one canned reply per refresh; records every
778    /// call with a timestamp readable under tokio's paused clock.
779    struct MockDialog {
780        events: Mutex<Vec<(Duration, Event)>>,
781        refresh_replies: Mutex<Vec<Result<Option<rsip::Response>, String>>>,
782        started: tokio::time::Instant,
783    }
784
785    impl MockDialog {
786        fn new(refresh_replies: Vec<Result<Option<rsip::Response>, String>>) -> Self {
787            Self {
788                events: Mutex::new(Vec::new()),
789                refresh_replies: Mutex::new(refresh_replies),
790                started: tokio::time::Instant::now(),
791            }
792        }
793
794        fn events(&self) -> Vec<(Duration, Event)> {
795            self.events.lock().unwrap().clone()
796        }
797    }
798
799    fn response(code: u16, extra: Vec<Header>) -> rsip::Response {
800        rsip::Response {
801            status_code: rsip::StatusCode::from(code),
802            version: rsip::Version::V2,
803            headers: headers(extra),
804            body: Vec::new(),
805        }
806    }
807
808    impl SessionDialogOps for MockDialog {
809        async fn refresh(
810            &self,
811            hdrs: Vec<Header>,
812            _body: Option<Vec<u8>>,
813        ) -> Result<Option<rsip::Response>, BoxError> {
814            let se = hdrs
815                .iter()
816                .find_map(|h| match h {
817                    Header::Other(name, value) if name == "Session-Expires" => Some(value.clone()),
818                    _ => None,
819                })
820                .unwrap_or_default();
821            self.events.lock().unwrap().push((
822                self.started.elapsed(),
823                Event::Refresh {
824                    session_expires: se,
825                },
826            ));
827            let reply = self.refresh_replies.lock().unwrap().remove(0);
828            reply.map_err(Into::into)
829        }
830
831        async fn send_bye(&self) -> Result<(), BoxError> {
832            self.events
833                .lock()
834                .unwrap()
835                .push((self.started.elapsed(), Event::Bye));
836            Ok(())
837        }
838    }
839
840    fn timer(interval_secs: u32, we_are_refresher: bool) -> SessionTimer {
841        SessionTimer {
842            interval_secs,
843            we_are_refresher,
844        }
845    }
846
847    #[tokio::test(start_paused = true)]
848    async fn refresher_sends_refresh_every_half_interval() {
849        let dialog = Arc::new(MockDialog::new(vec![
850            Ok(Some(response(200, vec![]))),
851            Ok(Some(response(200, vec![]))),
852            Ok(None), // third tick: dialog gone, loop exits
853        ]));
854        let cancel = CancellationToken::new();
855        let outcome = session_timer_loop(
856            &*dialog,
857            timer(180, true),
858            None,
859            Arc::new(Notify::new()),
860            cancel,
861        )
862        .await;
863        assert_eq!(outcome, SessionTimerOutcome::DialogGone);
864
865        let events = dialog.events();
866        assert_eq!(events.len(), 3);
867        assert_eq!(events[0].0, Duration::from_secs(90));
868        assert_eq!(events[1].0, Duration::from_secs(180));
869        assert_eq!(events[2].0, Duration::from_secs(270));
870        for (_, e) in &events {
871            assert_eq!(
872                e,
873                &Event::Refresh {
874                    session_expires: "180;refresher=uac".into()
875                }
876            );
877        }
878    }
879
880    #[tokio::test(start_paused = true)]
881    async fn refresher_adopts_interval_granted_in_refresh_response() {
882        // First 200 re-grants a longer interval; the second refresh must
883        // fire at the *new* half-interval.
884        let regrant = response(
885            200,
886            vec![Header::Other(
887                "Session-Expires".into(),
888                "360;refresher=uac".into(),
889            )],
890        );
891        let dialog = Arc::new(MockDialog::new(vec![Ok(Some(regrant)), Ok(None)]));
892        let cancel = CancellationToken::new();
893        let outcome = session_timer_loop(
894            &*dialog,
895            timer(180, true),
896            None,
897            Arc::new(Notify::new()),
898            cancel,
899        )
900        .await;
901        assert_eq!(outcome, SessionTimerOutcome::DialogGone);
902
903        let events = dialog.events();
904        assert_eq!(events[0].0, Duration::from_secs(90), "first at 180/2");
905        assert_eq!(
906            events[1].0,
907            Duration::from_secs(90 + 180),
908            "second at 90 + 360/2 after the re-grant"
909        );
910    }
911
912    #[tokio::test(start_paused = true)]
913    async fn refresher_rejected_refresh_sends_bye() {
914        let dialog = Arc::new(MockDialog::new(vec![Ok(Some(response(481, vec![])))]));
915        let cancel = CancellationToken::new();
916        let outcome = session_timer_loop(
917            &*dialog,
918            timer(180, true),
919            None,
920            Arc::new(Notify::new()),
921            cancel,
922        )
923        .await;
924        assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
925        let events = dialog.events();
926        assert!(matches!(events[0].1, Event::Refresh { .. }));
927        assert_eq!(events[1].1, Event::Bye);
928    }
929
930    #[tokio::test(start_paused = true)]
931    async fn refresher_transport_error_sends_bye() {
932        let dialog = Arc::new(MockDialog::new(vec![Err("socket closed".into())]));
933        let cancel = CancellationToken::new();
934        let outcome = session_timer_loop(
935            &*dialog,
936            timer(180, true),
937            None,
938            Arc::new(Notify::new()),
939            cancel,
940        )
941        .await;
942        assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
943        assert_eq!(dialog.events().last().unwrap().1, Event::Bye);
944    }
945
946    #[tokio::test(start_paused = true)]
947    async fn refresher_cancellation_wins_before_first_refresh() {
948        let dialog = Arc::new(MockDialog::new(vec![]));
949        let cancel = CancellationToken::new();
950        cancel.cancel();
951        let outcome = session_timer_loop(
952            &*dialog,
953            timer(180, true),
954            None,
955            Arc::new(Notify::new()),
956            cancel,
957        )
958        .await;
959        assert_eq!(outcome, SessionTimerOutcome::Cancelled);
960        assert!(dialog.events().is_empty(), "no refresh, no BYE");
961    }
962
963    #[tokio::test(start_paused = true)]
964    async fn watchdog_sends_bye_when_session_lapses() {
965        let dialog = Arc::new(MockDialog::new(vec![]));
966        let cancel = CancellationToken::new();
967        let outcome = session_timer_loop(
968            &*dialog,
969            timer(90, false),
970            None,
971            Arc::new(Notify::new()),
972            cancel,
973        )
974        .await;
975        assert_eq!(outcome, SessionTimerOutcome::Expired);
976        let events = dialog.events();
977        assert_eq!(events.len(), 1);
978        // 90 - min(32, 90/3) = 60 s.
979        assert_eq!(events[0], (Duration::from_secs(60), Event::Bye));
980    }
981
982    #[tokio::test(start_paused = true)]
983    async fn watchdog_resets_deadline_on_peer_refresh() {
984        let dialog = Arc::new(MockDialog::new(vec![]));
985        let cancel = CancellationToken::new();
986        let refreshed = Arc::new(Notify::new());
987
988        let loop_task = tokio::spawn({
989            let dialog = dialog.clone();
990            let refreshed = refreshed.clone();
991            let cancel = cancel.clone();
992            async move { session_timer_loop(&*dialog, timer(90, false), None, refreshed, cancel).await }
993        });
994
995        // Just before the 60 s deadline, the peer refreshes.
996        tokio::time::sleep(Duration::from_secs(59)).await;
997        refreshed.notify_one();
998        tokio::task::yield_now().await;
999        // Crossing the original deadline must NOT fire the BYE...
1000        tokio::time::sleep(Duration::from_secs(30)).await;
1001        assert!(dialog.events().is_empty(), "deadline should have reset");
1002        // ...but the rearmed deadline (59 + 60 = 119 s) must.
1003        let outcome = loop_task.await.unwrap();
1004        assert_eq!(outcome, SessionTimerOutcome::Expired);
1005        assert_eq!(
1006            dialog.events(),
1007            vec![(Duration::from_secs(119), Event::Bye)]
1008        );
1009    }
1010
1011    #[tokio::test(start_paused = true)]
1012    async fn watchdog_cancellation_stands_down_without_bye() {
1013        let dialog = Arc::new(MockDialog::new(vec![]));
1014        let cancel = CancellationToken::new();
1015        let loop_task = tokio::spawn({
1016            let dialog = dialog.clone();
1017            let cancel = cancel.clone();
1018            async move {
1019                session_timer_loop(
1020                    &*dialog,
1021                    timer(90, false),
1022                    None,
1023                    Arc::new(Notify::new()),
1024                    cancel,
1025                )
1026                .await
1027            }
1028        });
1029        tokio::time::sleep(Duration::from_secs(10)).await;
1030        cancel.cancel();
1031        assert_eq!(loop_task.await.unwrap(), SessionTimerOutcome::Cancelled);
1032        assert!(dialog.events().is_empty());
1033    }
1034}