Skip to main content

wavekat_sip/
session_timer.rs

1//! RFC 4028 session timers — keep long calls from outliving a dead dialog.
2//!
3//! Without session timers, a call whose dialog silently died (peer
4//! crashed, NAT binding dropped, proxy lost state) lives forever:
5//! nothing on the signaling path re-validates the dialog, so an
6//! unattended consumer (voice bot, AI agent) keeps streaming RTP into
7//! the void. RFC 4028 bounds that window: one side periodically
8//! refreshes the session with a re-INVITE, and the other side tears the
9//! call down with BYE if no refresh arrives before the negotiated
10//! session interval lapses.
11//!
12//! This module has three layers:
13//!
14//! 1. **Pure header logic** — [`SessionExpires`] parse/build for the
15//!    `Session-Expires` header (with its `;refresher=uac|uas` param),
16//!    [`min_se_in`] for `Min-SE`, and [`supports_timer`] for the
17//!    `Supported: timer` option tag. rsip 0.4 has no typed variants for
18//!    these, so they are parsed manually from
19//!    [`rsip::Header::Other`] — same style as the crate's SDP and RTP
20//!    parsing.
21//! 2. **Negotiation** — [`negotiate_uac`] (from a 2xx response, caller
22//!    side) and [`negotiate_uas`] (from an INVITE, callee side) decide
23//!    the [`SessionTimer`]: the negotiated interval and whether *we*
24//!    are the refresher. [`SessionTimer::refresh_after`] /
25//!    [`SessionTimer::expiry_after`] give the RFC 4028 §10 schedule.
26//! 3. **Runtime** — [`session_timer_loop`], shaped like
27//!    [`crate::Registrar::keepalive_loop`]: a `select!` over sleeps and
28//!    a [`CancellationToken`] that either sends the periodic refresh
29//!    re-INVITE (when we are the refresher) or watches for the peer's
30//!    refreshes and sends BYE when the session lapses.
31//!
32//! # Wiring it up
33//!
34//! [`crate::Caller`] and [`crate::Callee`] already negotiate for you —
35//! [`crate::AcceptedDial::session_timer`] /
36//! [`crate::AcceptedCall::session_timer`] carry the result. Spawn the
37//! loop after the call confirms:
38//!
39//! ```ignore
40//! let accepted = pending.on_confirmed().await?;
41//! if let Some(timer) = accepted.session_timer {
42//!     let dialog = accepted.dialog.clone();
43//!     let refreshed = Arc::new(Notify::new());
44//!     let sdp = build_sdp(local_ip, rtp_port); // repeat our offer
45//!     tokio::spawn({
46//!         let refreshed = refreshed.clone();
47//!         let cancel = cancel.clone();
48//!         async move {
49//!             let outcome =
50//!                 session_timer_loop(&dialog, timer, Some(sdp), refreshed, cancel).await;
51//!             tracing::info!(?outcome, "session timer finished");
52//!         }
53//!     });
54//! }
55//! ```
56//!
57//! When the **peer** is the refresher, its refresh re-INVITEs arrive on
58//! the dialog state stream as `DialogState::Updated(_, request, handle)`
59//! (after routing the transaction through
60//! [`crate::SipEndpoint::dispatch_in_dialog`]). rsipstack does not
61//! auto-answer them — your state pump must reply and then ping the
62//! watchdog:
63//!
64//! ```ignore
65//! DialogState::Updated(_, _request, handle) => {
66//!     let echo = SessionExpires {
67//!         interval_secs: timer.interval_secs,
68//!         refresher: Some(Refresher::Uac), // the peer (that request's UAC) refreshes
69//!     };
70//!     handle
71//!         .respond(
72//!             StatusCode::OK,
73//!             Some(vec![
74//!                 rsip::Header::ContentType("application/sdp".into()),
75//!                 supported_timer_header(),
76//!                 echo.header(),
77//!             ]),
78//!             Some(sdp_answer.clone()),
79//!         )
80//!         .await
81//!         .ok();
82//!     refreshed.notify_one();
83//! }
84//! ```
85
86use std::future::Future;
87use std::sync::Arc;
88use std::time::Duration;
89
90use rsip::prelude::UntypedHeader;
91use rsip::Header;
92use rsipstack::dialog::client_dialog::ClientInviteDialog;
93use rsipstack::dialog::server_dialog::ServerInviteDialog;
94use tokio::select;
95use tokio::sync::Notify;
96use tokio_util::sync::CancellationToken;
97use tracing::{debug, info, warn};
98
99type BoxError = Box<dyn std::error::Error + Send + Sync>;
100
101/// Smallest session interval we will run a timer at, in seconds.
102/// RFC 4028 §4 fixes 90 s as the absolute minimum (and the default
103/// `Min-SE`); anything shorter would churn re-INVITEs.
104pub const MIN_SESSION_EXPIRES_SECS: u32 = 90;
105
106/// Session interval requested on outbound INVITEs, in seconds — the
107/// RFC 4028 recommended default of 30 minutes.
108pub const DEFAULT_SESSION_EXPIRES_SECS: u32 = 1800;
109
110/// Cap on how far *before* the session expiry the non-refresher fires
111/// its BYE watchdog (RFC 4028 §10: `min(32 s, interval / 3)`).
112const MAX_EXPIRY_HEADROOM_SECS: u32 = 32;
113
114/// Which side of the original INVITE transaction performs refreshes —
115/// the value space of the `Session-Expires` `refresher` parameter.
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum Refresher {
118    /// The party that sent the request refreshes.
119    Uac,
120    /// The party that answered the request refreshes.
121    Uas,
122}
123
124impl Refresher {
125    /// Canonical lowercase parameter value (`"uac"` / `"uas"`).
126    pub fn as_str(&self) -> &'static str {
127        match self {
128            Refresher::Uac => "uac",
129            Refresher::Uas => "uas",
130        }
131    }
132
133    fn parse(s: &str) -> Result<Self, String> {
134        if s.eq_ignore_ascii_case("uac") {
135            Ok(Refresher::Uac)
136        } else if s.eq_ignore_ascii_case("uas") {
137            Ok(Refresher::Uas)
138        } else {
139            Err(format!("invalid refresher value {s:?} (want uac|uas)"))
140        }
141    }
142}
143
144/// Parsed `Session-Expires` header value: interval in seconds plus the
145/// optional `refresher` parameter.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub struct SessionExpires {
148    /// Session interval in seconds.
149    pub interval_secs: u32,
150    /// Who refreshes, if pinned. `None` means "answerer's choice".
151    pub refresher: Option<Refresher>,
152}
153
154impl SessionExpires {
155    /// Parse a `Session-Expires` header *value* (not the full header
156    /// line), e.g. `"1800"` or `"1800;refresher=uas"`. Parameter names
157    /// and values are case-insensitive; unknown parameters are ignored
158    /// per RFC 4028 §4.
159    pub fn parse(value: &str) -> Result<Self, String> {
160        let mut parts = value.split(';');
161        let interval = parts.next().unwrap_or("").trim();
162        let interval_secs: u32 = interval
163            .parse()
164            .map_err(|e| format!("invalid Session-Expires interval {interval:?}: {e}"))?;
165        let mut refresher = None;
166        for param in parts {
167            if let Some((name, val)) = param.split_once('=') {
168                if name.trim().eq_ignore_ascii_case("refresher") {
169                    refresher = Some(Refresher::parse(val.trim())?);
170                }
171            }
172        }
173        Ok(Self {
174            interval_secs,
175            refresher,
176        })
177    }
178
179    /// Serialize back to a header value, e.g. `"1800;refresher=uac"`.
180    pub fn header_value(&self) -> String {
181        match self.refresher {
182            Some(r) => format!("{};refresher={}", self.interval_secs, r.as_str()),
183            None => self.interval_secs.to_string(),
184        }
185    }
186
187    /// Build the untyped `Session-Expires` header (rsip 0.4 has no
188    /// typed variant).
189    pub fn header(&self) -> Header {
190        Header::Other("Session-Expires".into(), self.header_value())
191    }
192}
193
194/// `Supported: timer` — advertise RFC 4028 support on a request or
195/// response.
196pub fn supported_timer_header() -> Header {
197    Header::Supported("timer".into())
198}
199
200/// `Require: timer` — placed in a 2xx by the answerer when the offerer
201/// advertised timer support (RFC 4028 §9).
202pub fn require_timer_header() -> Header {
203    Header::Require("timer".into())
204}
205
206/// Find the value of the first untyped header matching any of `names`
207/// (case-insensitive).
208fn other_header_value<'a>(headers: &'a rsip::Headers, names: &[&str]) -> Option<&'a str> {
209    headers.iter().find_map(|h| match h {
210        Header::Other(name, value) if names.iter().any(|n| name.eq_ignore_ascii_case(n)) => {
211            Some(value.as_str())
212        }
213        _ => None,
214    })
215}
216
217/// Extract and parse the `Session-Expires` header (long or compact `x`
218/// form) from a header list. Malformed values are logged and treated as
219/// absent — a broken peer header must not kill the call.
220pub fn session_expires_in(headers: &rsip::Headers) -> Option<SessionExpires> {
221    let raw = other_header_value(headers, &["Session-Expires", "x"])?;
222    match SessionExpires::parse(raw) {
223        Ok(se) => Some(se),
224        Err(e) => {
225            warn!("ignoring malformed Session-Expires header: {e}");
226            None
227        }
228    }
229}
230
231/// Extract the `Min-SE` interval (seconds) from a header list, ignoring
232/// any generic parameters. Malformed values are treated as absent.
233pub fn min_se_in(headers: &rsip::Headers) -> Option<u32> {
234    let raw = other_header_value(headers, &["Min-SE"])?;
235    let interval = raw.split(';').next().unwrap_or("").trim();
236    match interval.parse() {
237        Ok(v) => Some(v),
238        Err(e) => {
239            warn!("ignoring malformed Min-SE header {raw:?}: {e}");
240            None
241        }
242    }
243}
244
245fn has_timer_token(value: &str) -> bool {
246    value
247        .split(',')
248        .any(|tag| tag.trim().eq_ignore_ascii_case("timer"))
249}
250
251/// `true` if any `Supported` header (typed, untyped, or compact `k`
252/// form) carries the `timer` option tag.
253pub fn supports_timer(headers: &rsip::Headers) -> bool {
254    headers.iter().any(|h| match h {
255        Header::Supported(s) => has_timer_token(s.value()),
256        Header::Other(name, value)
257            if name.eq_ignore_ascii_case("Supported") || name.eq_ignore_ascii_case("k") =>
258        {
259            has_timer_token(value)
260        }
261        _ => false,
262    })
263}
264
265/// Negotiated session-timer state for one dialog, from our side's
266/// perspective.
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub struct SessionTimer {
269    /// Negotiated session interval in seconds.
270    pub interval_secs: u32,
271    /// `true` if we send the periodic refresh re-INVITEs; `false` if we
272    /// only watch for the peer's refreshes and BYE on expiry.
273    pub we_are_refresher: bool,
274}
275
276impl SessionTimer {
277    /// When the refresher sends its refresh: half the session interval
278    /// after the last refresh (RFC 4028 §10).
279    pub fn refresh_after(&self) -> Duration {
280        Duration::from_secs(u64::from(self.interval_secs / 2))
281    }
282
283    /// When the non-refresher gives up and sends BYE: the session
284    /// interval minus `min(32 s, interval / 3)` after the last refresh
285    /// (RFC 4028 §10).
286    pub fn expiry_after(&self) -> Duration {
287        let headroom = (self.interval_secs / 3).min(MAX_EXPIRY_HEADROOM_SECS);
288        Duration::from_secs(u64::from(self.interval_secs.saturating_sub(headroom)))
289    }
290}
291
292/// UAC-side negotiation: decide the [`SessionTimer`] from the 2xx
293/// response to our INVITE.
294///
295/// Returns `None` when the response carries no `Session-Expires` — the
296/// answerer declined (or doesn't support) session timers, so no timer
297/// runs. When the (mandatory per RFC 4028 §9) `refresher` parameter is
298/// missing we defensively take the refresher role ourselves: refreshing
299/// when the peer also refreshes is redundant but harmless, while *not*
300/// refreshing when the peer expects us to would drop the call.
301///
302/// The interval is floored at [`MIN_SESSION_EXPIRES_SECS`] so a bogus
303/// tiny grant can't spin the refresh loop.
304pub fn negotiate_uac(response_headers: &rsip::Headers) -> Option<SessionTimer> {
305    let se = session_expires_in(response_headers)?;
306    Some(SessionTimer {
307        interval_secs: se.interval_secs.max(MIN_SESSION_EXPIRES_SECS),
308        we_are_refresher: !matches!(se.refresher, Some(Refresher::Uas)),
309    })
310}
311
312/// UAS-side negotiation result: the timer to run plus what to put in
313/// our 2xx so the peer agrees on it.
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub struct UasSessionTimer {
316    /// Timer state from our (UAS) perspective.
317    pub timer: SessionTimer,
318    /// `Session-Expires` to echo in the 2xx (interval + pinned
319    /// refresher).
320    pub echo: SessionExpires,
321    /// `true` if the peer advertised `Supported: timer`, in which case
322    /// the 2xx must also carry `Require: timer` (RFC 4028 §9).
323    pub require_timer: bool,
324}
325
326/// UAS-side negotiation: decide the session timer from an inbound
327/// INVITE's headers.
328///
329/// Returns `None` when the INVITE carries no `Session-Expires` — we do
330/// not insert timers into sessions the caller didn't ask for (allowed
331/// by RFC 4028, deliberately deferred; see
332/// `docs/07-session-timers.md`).
333///
334/// The interval is floored at `max(90, Min-SE)`. The refresher is the
335/// request's `refresher` parameter when the peer advertised timer
336/// support, defaulting to `uac` (peer refreshes). When the peer did
337/// *not* advertise `Supported: timer` — e.g. a proxy inserted the
338/// `Session-Expires` — the peer cannot refresh, so we take the
339/// refresher role and skip `Require: timer`.
340pub fn negotiate_uas(invite_headers: &rsip::Headers) -> Option<UasSessionTimer> {
341    let se = session_expires_in(invite_headers)?;
342    let floor = min_se_in(invite_headers)
343        .unwrap_or(0)
344        .max(MIN_SESSION_EXPIRES_SECS);
345    let interval_secs = se.interval_secs.max(floor);
346    let peer_supports = supports_timer(invite_headers);
347    let refresher = if peer_supports {
348        se.refresher.unwrap_or(Refresher::Uac)
349    } else {
350        Refresher::Uas
351    };
352    Some(UasSessionTimer {
353        timer: SessionTimer {
354            interval_secs,
355            we_are_refresher: refresher == Refresher::Uas,
356        },
357        echo: SessionExpires {
358            interval_secs,
359            refresher: Some(refresher),
360        },
361        require_timer: peer_supports,
362    })
363}
364
365/// The dialog operations [`session_timer_loop`] needs, abstracted so
366/// the loop's timing logic stays unit-testable without a live dialog.
367///
368/// Implemented for [`ClientInviteDialog`] and [`ServerInviteDialog`]
369/// over their `reinvite` / `bye` methods.
370pub trait SessionDialogOps {
371    /// Send a session-refresh re-INVITE with the given extra headers
372    /// and (typically SDP) body. Returns the final response, or
373    /// `Ok(None)` if the dialog is no longer confirmed.
374    fn refresh(
375        &self,
376        headers: Vec<Header>,
377        body: Option<Vec<u8>>,
378    ) -> impl Future<Output = Result<Option<rsip::Response>, BoxError>> + Send;
379
380    /// Hang up the dialog with BYE.
381    fn send_bye(&self) -> impl Future<Output = Result<(), BoxError>> + Send;
382}
383
384impl SessionDialogOps for ClientInviteDialog {
385    async fn refresh(
386        &self,
387        headers: Vec<Header>,
388        body: Option<Vec<u8>>,
389    ) -> Result<Option<rsip::Response>, BoxError> {
390        Ok(self.reinvite(Some(headers), body).await?)
391    }
392
393    async fn send_bye(&self) -> Result<(), BoxError> {
394        Ok(self.bye().await?)
395    }
396}
397
398impl SessionDialogOps for ServerInviteDialog {
399    async fn refresh(
400        &self,
401        headers: Vec<Header>,
402        body: Option<Vec<u8>>,
403    ) -> Result<Option<rsip::Response>, BoxError> {
404        Ok(self.reinvite(Some(headers), body).await?)
405    }
406
407    async fn send_bye(&self) -> Result<(), BoxError> {
408        Ok(self.bye().await?)
409    }
410}
411
412/// How [`session_timer_loop`] ended.
413#[derive(Debug, Clone, Copy, PartialEq, Eq)]
414pub enum SessionTimerOutcome {
415    /// The [`CancellationToken`] fired — the call ended through the
416    /// normal path (local/remote BYE) and the loop just stood down.
417    Cancelled,
418    /// We were the watchdog and no refresh arrived before the session
419    /// interval lapsed. A BYE was sent (best effort).
420    Expired,
421    /// We were the refresher and a refresh re-INVITE failed (non-2xx or
422    /// transport error). A BYE was sent (best effort).
423    RefreshFailed,
424    /// The dialog was no longer confirmed when we tried to refresh —
425    /// it already terminated through another path. No BYE needed.
426    DialogGone,
427}
428
429/// Drive RFC 4028 session keepalive for one confirmed dialog. Runs
430/// until cancelled or the session dies; same shape as
431/// [`crate::Registrar::keepalive_loop`].
432///
433/// - When `timer.we_are_refresher`, sends a refresh re-INVITE every
434///   `interval / 2` carrying `refresh_body` (repeat the SDP you sent
435///   when the call was set up — an identical offer is a no-op per
436///   RFC 3264). A 2xx resets the clock (adopting any `Session-Expires`
437///   the peer granted); failure tears the call down with BYE.
438/// - Otherwise runs the expiry watchdog: every
439///   `peer_refreshed.notify_one()` resets the deadline, and if the
440///   deadline lapses the call is torn down with BYE. The consumer pings
441///   `peer_refreshed` after answering the peer's refresh re-INVITE —
442///   see the module docs for the `DialogState::Updated` wiring.
443///
444/// All failures are folded into the returned [`SessionTimerOutcome`];
445/// the loop never panics and never returns early without standing the
446/// session down.
447pub async fn session_timer_loop<D: SessionDialogOps>(
448    dialog: &D,
449    timer: SessionTimer,
450    refresh_body: Option<Vec<u8>>,
451    peer_refreshed: Arc<Notify>,
452    cancel: CancellationToken,
453) -> SessionTimerOutcome {
454    let mut interval_secs = timer.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
455    if timer.we_are_refresher {
456        loop {
457            let current = SessionTimer {
458                interval_secs,
459                we_are_refresher: true,
460            };
461            select! {
462                _ = tokio::time::sleep(current.refresh_after()) => {}
463                _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
464            }
465
466            // In this refresh re-INVITE we are the UAC of the new
467            // transaction, so the refresher param says `uac`.
468            let headers = vec![
469                supported_timer_header(),
470                SessionExpires {
471                    interval_secs,
472                    refresher: Some(Refresher::Uac),
473                }
474                .header(),
475            ];
476            match dialog.refresh(headers, refresh_body.clone()).await {
477                Ok(Some(resp)) if resp.status_code.kind() == rsip::StatusCodeKind::Successful => {
478                    // Adopt a re-granted interval (a peer may shorten or
479                    // lengthen mid-call), floored to avoid a hot loop.
480                    if let Some(granted) = session_expires_in(&resp.headers) {
481                        interval_secs = granted.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
482                    }
483                    debug!(interval_secs, "session refresh accepted");
484                }
485                Ok(Some(resp)) => {
486                    warn!(
487                        status = %resp.status_code,
488                        "session refresh rejected; hanging up"
489                    );
490                    if let Err(e) = dialog.send_bye().await {
491                        warn!("BYE after rejected refresh failed: {e}");
492                    }
493                    return SessionTimerOutcome::RefreshFailed;
494                }
495                Ok(None) => {
496                    debug!("dialog no longer confirmed; session timer standing down");
497                    return SessionTimerOutcome::DialogGone;
498                }
499                Err(e) => {
500                    warn!("session refresh error: {e}; hanging up");
501                    if let Err(e) = dialog.send_bye().await {
502                        warn!("BYE after failed refresh failed: {e}");
503                    }
504                    return SessionTimerOutcome::RefreshFailed;
505                }
506            }
507        }
508    } else {
509        let current = SessionTimer {
510            interval_secs,
511            we_are_refresher: false,
512        };
513        loop {
514            select! {
515                _ = tokio::time::sleep(current.expiry_after()) => {
516                    info!(
517                        interval_secs,
518                        "session lapsed without refresh; sending BYE"
519                    );
520                    if let Err(e) = dialog.send_bye().await {
521                        warn!("BYE after session expiry failed: {e}");
522                    }
523                    return SessionTimerOutcome::Expired;
524                }
525                _ = peer_refreshed.notified() => {
526                    debug!("peer refreshed session; watchdog deadline reset");
527                }
528                _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
529            }
530        }
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use std::sync::Mutex;
538
539    // ---- SessionExpires parse / build ----
540
541    #[test]
542    fn parse_bare_interval() {
543        let se = SessionExpires::parse("1800").unwrap();
544        assert_eq!(se.interval_secs, 1800);
545        assert_eq!(se.refresher, None);
546    }
547
548    #[test]
549    fn parse_with_refresher_param() {
550        let se = SessionExpires::parse("1800;refresher=uas").unwrap();
551        assert_eq!(se.interval_secs, 1800);
552        assert_eq!(se.refresher, Some(Refresher::Uas));
553        let se = SessionExpires::parse("90;refresher=uac").unwrap();
554        assert_eq!(se.refresher, Some(Refresher::Uac));
555    }
556
557    #[test]
558    fn parse_is_case_insensitive_and_whitespace_tolerant() {
559        let se = SessionExpires::parse(" 600 ; Refresher = UAS ").unwrap();
560        assert_eq!(se.interval_secs, 600);
561        assert_eq!(se.refresher, Some(Refresher::Uas));
562    }
563
564    #[test]
565    fn parse_ignores_unknown_params() {
566        let se = SessionExpires::parse("1800;foo=bar;refresher=uac;baz").unwrap();
567        assert_eq!(se.refresher, Some(Refresher::Uac));
568    }
569
570    #[test]
571    fn parse_rejects_garbage() {
572        assert!(SessionExpires::parse("").is_err());
573        assert!(SessionExpires::parse("soon").is_err());
574        assert!(SessionExpires::parse("1800;refresher=bogus").is_err());
575        assert!(SessionExpires::parse("-5").is_err());
576    }
577
578    #[test]
579    fn header_value_round_trips() {
580        for se in [
581            SessionExpires {
582                interval_secs: 1800,
583                refresher: None,
584            },
585            SessionExpires {
586                interval_secs: 90,
587                refresher: Some(Refresher::Uac),
588            },
589            SessionExpires {
590                interval_secs: 7200,
591                refresher: Some(Refresher::Uas),
592            },
593        ] {
594            let parsed = SessionExpires::parse(&se.header_value()).unwrap();
595            assert_eq!(parsed, se, "round-trip via {:?}", se.header_value());
596        }
597    }
598
599    #[test]
600    fn header_builds_untyped_session_expires() {
601        let h = SessionExpires {
602            interval_secs: 1800,
603            refresher: Some(Refresher::Uac),
604        }
605        .header();
606        assert_eq!(h.to_string(), "Session-Expires: 1800;refresher=uac");
607    }
608
609    // ---- header extraction ----
610
611    fn headers(items: Vec<Header>) -> rsip::Headers {
612        let mut h = rsip::Headers::default();
613        for item in items {
614            h.push(item);
615        }
616        h
617    }
618
619    #[test]
620    fn session_expires_in_finds_header_case_insensitively() {
621        let h = headers(vec![Header::Other(
622            "session-expires".into(),
623            "600;refresher=uas".into(),
624        )]);
625        let se = session_expires_in(&h).unwrap();
626        assert_eq!(se.interval_secs, 600);
627        assert_eq!(se.refresher, Some(Refresher::Uas));
628    }
629
630    #[test]
631    fn session_expires_in_accepts_compact_form() {
632        // RFC 4028 §4 defines `x` as the compact form of Session-Expires.
633        let h = headers(vec![Header::Other("x".into(), "300".into())]);
634        assert_eq!(
635            session_expires_in(&h),
636            Some(SessionExpires {
637                interval_secs: 300,
638                refresher: None
639            })
640        );
641    }
642
643    #[test]
644    fn session_expires_in_absent_or_malformed_is_none() {
645        assert_eq!(session_expires_in(&headers(vec![])), None);
646        let h = headers(vec![Header::Other("Session-Expires".into(), "soon".into())]);
647        assert_eq!(session_expires_in(&h), None);
648    }
649
650    #[test]
651    fn min_se_in_parses_and_ignores_params() {
652        let h = headers(vec![Header::Other("Min-SE".into(), "120".into())]);
653        assert_eq!(min_se_in(&h), Some(120));
654        let h = headers(vec![Header::Other("min-se".into(), "240;lr".into())]);
655        assert_eq!(min_se_in(&h), Some(240));
656        assert_eq!(min_se_in(&headers(vec![])), None);
657        let h = headers(vec![Header::Other("Min-SE".into(), "never".into())]);
658        assert_eq!(min_se_in(&h), None);
659    }
660
661    #[test]
662    fn supports_timer_scans_typed_untyped_and_compact() {
663        assert!(supports_timer(&headers(vec![supported_timer_header()])));
664        assert!(supports_timer(&headers(vec![Header::Supported(
665            "100rel, timer".into()
666        )])));
667        assert!(supports_timer(&headers(vec![Header::Other(
668            "k".into(),
669            "timer".into()
670        )])));
671        assert!(supports_timer(&headers(vec![Header::Other(
672            "Supported".into(),
673            "TIMER".into()
674        )])));
675        assert!(!supports_timer(&headers(vec![])));
676        assert!(!supports_timer(&headers(vec![Header::Supported(
677            "100rel".into()
678        )])));
679        // `timer` must be a whole option tag, not a substring.
680        assert!(!supports_timer(&headers(vec![Header::Supported(
681            "timers".into()
682        )])));
683    }
684
685    // ---- interval math (RFC 4028 §10) ----
686
687    #[test]
688    fn refresh_fires_at_half_the_interval() {
689        let t = SessionTimer {
690            interval_secs: 1800,
691            we_are_refresher: true,
692        };
693        assert_eq!(t.refresh_after(), Duration::from_secs(900));
694        let t = SessionTimer {
695            interval_secs: 90,
696            we_are_refresher: true,
697        };
698        assert_eq!(t.refresh_after(), Duration::from_secs(45));
699    }
700
701    #[test]
702    fn expiry_keeps_min_of_32s_or_a_third_headroom() {
703        // Long interval: headroom caps at 32 s.
704        let t = SessionTimer {
705            interval_secs: 1800,
706            we_are_refresher: false,
707        };
708        assert_eq!(t.expiry_after(), Duration::from_secs(1768));
709        // Short interval: a third of 90 s = 30 s < 32 s.
710        let t = SessionTimer {
711            interval_secs: 90,
712            we_are_refresher: false,
713        };
714        assert_eq!(t.expiry_after(), Duration::from_secs(60));
715    }
716
717    // ---- negotiation: UAC side ----
718
719    #[test]
720    fn uac_no_session_expires_means_no_timer() {
721        assert_eq!(negotiate_uac(&headers(vec![])), None);
722    }
723
724    #[test]
725    fn uac_refresher_uas_means_peer_refreshes() {
726        let h = headers(vec![Header::Other(
727            "Session-Expires".into(),
728            "1800;refresher=uas".into(),
729        )]);
730        assert_eq!(
731            negotiate_uac(&h),
732            Some(SessionTimer {
733                interval_secs: 1800,
734                we_are_refresher: false
735            })
736        );
737    }
738
739    #[test]
740    fn uac_refresher_uac_or_missing_means_we_refresh() {
741        let h = headers(vec![Header::Other(
742            "Session-Expires".into(),
743            "600;refresher=uac".into(),
744        )]);
745        assert!(negotiate_uac(&h).unwrap().we_are_refresher);
746        // RFC 4028 §9 says the 2xx MUST pin the refresher; a peer that
747        // omits it gets the defensive default: we refresh.
748        let h = headers(vec![Header::Other("Session-Expires".into(), "600".into())]);
749        assert!(negotiate_uac(&h).unwrap().we_are_refresher);
750    }
751
752    #[test]
753    fn uac_floors_tiny_grants_at_90s() {
754        let h = headers(vec![Header::Other(
755            "Session-Expires".into(),
756            "20;refresher=uac".into(),
757        )]);
758        assert_eq!(negotiate_uac(&h).unwrap().interval_secs, 90);
759    }
760
761    // ---- negotiation: UAS side ----
762
763    fn invite_headers(session_expires: &str, min_se: Option<&str>, timer: bool) -> rsip::Headers {
764        let mut items = vec![Header::Other(
765            "Session-Expires".into(),
766            session_expires.into(),
767        )];
768        if let Some(m) = min_se {
769            items.push(Header::Other("Min-SE".into(), m.into()));
770        }
771        if timer {
772            items.push(supported_timer_header());
773        }
774        headers(items)
775    }
776
777    #[test]
778    fn uas_no_session_expires_means_no_timer() {
779        assert_eq!(
780            negotiate_uas(&headers(vec![supported_timer_header()])),
781            None
782        );
783    }
784
785    #[test]
786    fn uas_default_makes_supporting_peer_the_refresher() {
787        let uas = negotiate_uas(&invite_headers("1800", None, true)).unwrap();
788        assert_eq!(uas.timer.interval_secs, 1800);
789        assert!(!uas.timer.we_are_refresher, "peer (UAC) should refresh");
790        assert_eq!(
791            uas.echo,
792            SessionExpires {
793                interval_secs: 1800,
794                refresher: Some(Refresher::Uac)
795            }
796        );
797        assert!(uas.require_timer);
798    }
799
800    #[test]
801    fn uas_honors_requested_refresher_uas() {
802        let uas = negotiate_uas(&invite_headers("1800;refresher=uas", None, true)).unwrap();
803        assert!(uas.timer.we_are_refresher, "we (UAS) were asked to refresh");
804        assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
805    }
806
807    #[test]
808    fn uas_without_peer_support_takes_refresher_role() {
809        // A proxy inserted Session-Expires but the UAC itself never
810        // advertised `Supported: timer` — it can't refresh, so we must,
811        // and we must not Require: timer in the 2xx.
812        let uas = negotiate_uas(&invite_headers("1800;refresher=uac", None, false)).unwrap();
813        assert!(uas.timer.we_are_refresher);
814        assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
815        assert!(!uas.require_timer);
816    }
817
818    #[test]
819    fn uas_floors_interval_at_min_se_and_90s() {
820        // Requested 30 s with Min-SE 120 → 120 s.
821        let uas = negotiate_uas(&invite_headers("30", Some("120"), true)).unwrap();
822        assert_eq!(uas.timer.interval_secs, 120);
823        assert_eq!(uas.echo.interval_secs, 120);
824        // Requested 30 s without Min-SE → RFC floor of 90 s.
825        let uas = negotiate_uas(&invite_headers("30", None, true)).unwrap();
826        assert_eq!(uas.timer.interval_secs, 90);
827    }
828
829    // ---- session_timer_loop against a mock dialog ----
830
831    #[derive(Debug, Clone, PartialEq, Eq)]
832    enum Event {
833        Refresh { session_expires: String },
834        Bye,
835    }
836
837    /// Scripted mock: pops one canned reply per refresh; records every
838    /// call with a timestamp readable under tokio's paused clock.
839    struct MockDialog {
840        events: Mutex<Vec<(Duration, Event)>>,
841        refresh_replies: Mutex<Vec<Result<Option<rsip::Response>, String>>>,
842        started: tokio::time::Instant,
843    }
844
845    impl MockDialog {
846        fn new(refresh_replies: Vec<Result<Option<rsip::Response>, String>>) -> Self {
847            Self {
848                events: Mutex::new(Vec::new()),
849                refresh_replies: Mutex::new(refresh_replies),
850                started: tokio::time::Instant::now(),
851            }
852        }
853
854        fn events(&self) -> Vec<(Duration, Event)> {
855            self.events.lock().unwrap().clone()
856        }
857    }
858
859    fn response(code: u16, extra: Vec<Header>) -> rsip::Response {
860        rsip::Response {
861            status_code: rsip::StatusCode::from(code),
862            version: rsip::Version::V2,
863            headers: headers(extra),
864            body: Vec::new(),
865        }
866    }
867
868    impl SessionDialogOps for MockDialog {
869        async fn refresh(
870            &self,
871            hdrs: Vec<Header>,
872            _body: Option<Vec<u8>>,
873        ) -> Result<Option<rsip::Response>, BoxError> {
874            let se = hdrs
875                .iter()
876                .find_map(|h| match h {
877                    Header::Other(name, value) if name == "Session-Expires" => Some(value.clone()),
878                    _ => None,
879                })
880                .unwrap_or_default();
881            self.events.lock().unwrap().push((
882                self.started.elapsed(),
883                Event::Refresh {
884                    session_expires: se,
885                },
886            ));
887            let reply = self.refresh_replies.lock().unwrap().remove(0);
888            reply.map_err(Into::into)
889        }
890
891        async fn send_bye(&self) -> Result<(), BoxError> {
892            self.events
893                .lock()
894                .unwrap()
895                .push((self.started.elapsed(), Event::Bye));
896            Ok(())
897        }
898    }
899
900    fn timer(interval_secs: u32, we_are_refresher: bool) -> SessionTimer {
901        SessionTimer {
902            interval_secs,
903            we_are_refresher,
904        }
905    }
906
907    #[tokio::test(start_paused = true)]
908    async fn refresher_sends_refresh_every_half_interval() {
909        let dialog = Arc::new(MockDialog::new(vec![
910            Ok(Some(response(200, vec![]))),
911            Ok(Some(response(200, vec![]))),
912            Ok(None), // third tick: dialog gone, loop exits
913        ]));
914        let cancel = CancellationToken::new();
915        let outcome = session_timer_loop(
916            &*dialog,
917            timer(180, true),
918            None,
919            Arc::new(Notify::new()),
920            cancel,
921        )
922        .await;
923        assert_eq!(outcome, SessionTimerOutcome::DialogGone);
924
925        let events = dialog.events();
926        assert_eq!(events.len(), 3);
927        assert_eq!(events[0].0, Duration::from_secs(90));
928        assert_eq!(events[1].0, Duration::from_secs(180));
929        assert_eq!(events[2].0, Duration::from_secs(270));
930        for (_, e) in &events {
931            assert_eq!(
932                e,
933                &Event::Refresh {
934                    session_expires: "180;refresher=uac".into()
935                }
936            );
937        }
938    }
939
940    #[tokio::test(start_paused = true)]
941    async fn refresher_adopts_interval_granted_in_refresh_response() {
942        // First 200 re-grants a longer interval; the second refresh must
943        // fire at the *new* half-interval.
944        let regrant = response(
945            200,
946            vec![Header::Other(
947                "Session-Expires".into(),
948                "360;refresher=uac".into(),
949            )],
950        );
951        let dialog = Arc::new(MockDialog::new(vec![Ok(Some(regrant)), Ok(None)]));
952        let cancel = CancellationToken::new();
953        let outcome = session_timer_loop(
954            &*dialog,
955            timer(180, true),
956            None,
957            Arc::new(Notify::new()),
958            cancel,
959        )
960        .await;
961        assert_eq!(outcome, SessionTimerOutcome::DialogGone);
962
963        let events = dialog.events();
964        assert_eq!(events[0].0, Duration::from_secs(90), "first at 180/2");
965        assert_eq!(
966            events[1].0,
967            Duration::from_secs(90 + 180),
968            "second at 90 + 360/2 after the re-grant"
969        );
970    }
971
972    #[tokio::test(start_paused = true)]
973    async fn refresher_rejected_refresh_sends_bye() {
974        let dialog = Arc::new(MockDialog::new(vec![Ok(Some(response(481, vec![])))]));
975        let cancel = CancellationToken::new();
976        let outcome = session_timer_loop(
977            &*dialog,
978            timer(180, true),
979            None,
980            Arc::new(Notify::new()),
981            cancel,
982        )
983        .await;
984        assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
985        let events = dialog.events();
986        assert!(matches!(events[0].1, Event::Refresh { .. }));
987        assert_eq!(events[1].1, Event::Bye);
988    }
989
990    #[tokio::test(start_paused = true)]
991    async fn refresher_transport_error_sends_bye() {
992        let dialog = Arc::new(MockDialog::new(vec![Err("socket closed".into())]));
993        let cancel = CancellationToken::new();
994        let outcome = session_timer_loop(
995            &*dialog,
996            timer(180, true),
997            None,
998            Arc::new(Notify::new()),
999            cancel,
1000        )
1001        .await;
1002        assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
1003        assert_eq!(dialog.events().last().unwrap().1, Event::Bye);
1004    }
1005
1006    #[tokio::test(start_paused = true)]
1007    async fn refresher_cancellation_wins_before_first_refresh() {
1008        let dialog = Arc::new(MockDialog::new(vec![]));
1009        let cancel = CancellationToken::new();
1010        cancel.cancel();
1011        let outcome = session_timer_loop(
1012            &*dialog,
1013            timer(180, true),
1014            None,
1015            Arc::new(Notify::new()),
1016            cancel,
1017        )
1018        .await;
1019        assert_eq!(outcome, SessionTimerOutcome::Cancelled);
1020        assert!(dialog.events().is_empty(), "no refresh, no BYE");
1021    }
1022
1023    #[tokio::test(start_paused = true)]
1024    async fn watchdog_sends_bye_when_session_lapses() {
1025        let dialog = Arc::new(MockDialog::new(vec![]));
1026        let cancel = CancellationToken::new();
1027        let outcome = session_timer_loop(
1028            &*dialog,
1029            timer(90, false),
1030            None,
1031            Arc::new(Notify::new()),
1032            cancel,
1033        )
1034        .await;
1035        assert_eq!(outcome, SessionTimerOutcome::Expired);
1036        let events = dialog.events();
1037        assert_eq!(events.len(), 1);
1038        // 90 - min(32, 90/3) = 60 s.
1039        assert_eq!(events[0], (Duration::from_secs(60), Event::Bye));
1040    }
1041
1042    #[tokio::test(start_paused = true)]
1043    async fn watchdog_resets_deadline_on_peer_refresh() {
1044        let dialog = Arc::new(MockDialog::new(vec![]));
1045        let cancel = CancellationToken::new();
1046        let refreshed = Arc::new(Notify::new());
1047
1048        let loop_task = tokio::spawn({
1049            let dialog = dialog.clone();
1050            let refreshed = refreshed.clone();
1051            let cancel = cancel.clone();
1052            async move { session_timer_loop(&*dialog, timer(90, false), None, refreshed, cancel).await }
1053        });
1054
1055        // Just before the 60 s deadline, the peer refreshes.
1056        tokio::time::sleep(Duration::from_secs(59)).await;
1057        refreshed.notify_one();
1058        tokio::task::yield_now().await;
1059        // Crossing the original deadline must NOT fire the BYE...
1060        tokio::time::sleep(Duration::from_secs(30)).await;
1061        assert!(dialog.events().is_empty(), "deadline should have reset");
1062        // ...but the rearmed deadline (59 + 60 = 119 s) must.
1063        let outcome = loop_task.await.unwrap();
1064        assert_eq!(outcome, SessionTimerOutcome::Expired);
1065        assert_eq!(
1066            dialog.events(),
1067            vec![(Duration::from_secs(119), Event::Bye)]
1068        );
1069    }
1070
1071    #[tokio::test(start_paused = true)]
1072    async fn watchdog_cancellation_stands_down_without_bye() {
1073        let dialog = Arc::new(MockDialog::new(vec![]));
1074        let cancel = CancellationToken::new();
1075        let loop_task = tokio::spawn({
1076            let dialog = dialog.clone();
1077            let cancel = cancel.clone();
1078            async move {
1079                session_timer_loop(
1080                    &*dialog,
1081                    timer(90, false),
1082                    None,
1083                    Arc::new(Notify::new()),
1084                    cancel,
1085                )
1086                .await
1087            }
1088        });
1089        tokio::time::sleep(Duration::from_secs(10)).await;
1090        cancel.cancel();
1091        assert_eq!(loop_task.await.unwrap(), SessionTimerOutcome::Cancelled);
1092        assert!(dialog.events().is_empty());
1093    }
1094}