Skip to main content

ppoppo_sdk_core/
session_liveness.rs

1//! [`SessionLiveness`] port — per-request session-row liveness check.
2//!
3//! Phase 11.Z 0.10.0 (RFC_2026-05-08 §4.2 lock). Promoted from RCW + CTW's
4//! identical consumer-local `check_session_alive` shape.
5//!
6//! # The L2 axis (vs the L1 sv-axis)
7//!
8//! - **L1 sv-axis** ([`crate::epoch`]): "is the token's `sv` claim still
9//!   current against PAS's authoritative substrate?" — answered against
10//!   the canonical `sv:{sub}` cache + a fetcher fallback. Catches
11//!   break-glass / LogoutAll propagation across services.
12//! - **L2 session liveness** (this module): "is the bearer token's
13//!   session row still alive in the consumer's own DB?" — answered by
14//!   the consumer's per-deployment substrate. Catches per-session
15//!   logout (`revoked_at IS NOT NULL`) without depending on PAS.
16//!
17//! Both axes are wired into [`crate::PasJwtVerifier`] as engine slots
18//! ([`crate::PasJwtVerifier::with_epoch_revocation`] for L1,
19//! [`crate::PasJwtVerifier::with_session_liveness`] for L2). With no
20//! port wired, the verifier short-circuits the corresponding check.
21//!
22//! # Why this lives in `session_liveness` (alongside `TokenCipher`)
23//!
24//! The existing `session_liveness` module ships `TokenCipher` +
25//! [`super::attempt_liveness_refresh`] for the periodic *PAS-callback*
26//! refresh-token check (consumer asks PAS "is my refresh_token still
27//! good?"). This new port is the per-request *consumer-DB* row check.
28//! Both answer "is this user's session valid?" at different layers and
29//! cadences — one shared umbrella module keeps the surface coherent.
30
31use async_trait::async_trait;
32
33use crate::types::SessionId;
34
35/// Per-request session-row liveness check.
36///
37/// Wired into [`crate::PasJwtVerifier::with_session_liveness`] as a
38/// verifier slot symmetric to
39/// [`crate::PasJwtVerifier::with_epoch_revocation`]. With no port wired,
40/// the verifier short-circuits the L2 check (matches pre-0.10.0
41/// behavior).
42///
43/// # 3-state contract
44///
45/// - `Ok(())` → session is live; admit the token.
46/// - `Err(`[`SessionLivenessError::Revoked`]`)` → session row absent OR
47///   `revoked_at` is set; the verifier maps to
48///   [`crate::VerifyError::SessionRevoked`]. Actionable for `LogoutAll`
49///   / per-session-revoke flows.
50/// - `Err(`[`SessionLivenessError::Transient`]`)` → substrate down (DB
51///   connection lost, schema unavailable, etc.); the verifier maps to
52///   [`crate::VerifyError::SessionLivenessLookupUnavailable`] (HTTP 503).
53///   Fail-closed per `STANDARDS_AUTH_INVALIDATION` §3.
54///
55/// # Lenient on no-`sid` claim
56///
57/// When the bearer's `sid` claim is `None` (machine credentials,
58/// AI-agent flows, R6 legacy admit per
59/// [`crate::AuthSession::session_id`]), the verifier admits without
60/// consulting this port — non-session-bound tokens have no row to look
61/// up. RFC_2026-05-08 §4.2 lock decision (lenient — matches the existing
62/// `AuthSession::session_id` invariant).
63///
64/// # Implementations
65///
66/// Consumer-side adapters: RCW ships `PgSessionLiveness` over
67/// `scrcall.user_sessions`; CTW ships the same shape over
68/// `scctime.user_sessions`. Each is ~10 lines of consumer-local
69/// code — schema name + DB pool are deployment-specific and never
70/// shipped from the SDK.
71///
72/// ```ignore
73/// use async_trait::async_trait;
74/// use pas_external::session_liveness::{SessionLiveness, SessionLivenessError};
75/// use pas_external::types::SessionId;
76/// use sqlx::PgPool;
77///
78/// pub struct PgSessionLiveness { pool: PgPool }
79///
80/// #[async_trait]
81/// impl SessionLiveness for PgSessionLiveness {
82///     async fn check(&self, sid: &SessionId) -> Result<(), SessionLivenessError> {
83///         let row: Option<(Option<time::OffsetDateTime>,)> =
84///             sqlx::query_as("SELECT revoked_at FROM scrcall.user_sessions WHERE id = $1")
85///                 .bind(&sid.0)
86///                 .fetch_optional(&self.pool)
87///                 .await
88///                 .map_err(|e| SessionLivenessError::Transient(format!("session lookup: {e}")))?;
89///         match row {
90///             None | Some((Some(_),)) => Err(SessionLivenessError::Revoked),
91///             Some((None,)) => Ok(()),
92///         }
93///     }
94/// }
95/// ```
96#[async_trait]
97pub trait SessionLiveness: std::fmt::Debug + Send + Sync {
98    async fn check(&self, sid: &SessionId) -> Result<(), SessionLivenessError>;
99}
100
101/// Per-request liveness failure surface.
102///
103/// Two variants — `Revoked` (substrate said "no") and `Transient`
104/// (substrate couldn't answer) — collapse to typed
105/// [`crate::VerifyError`] variants in the verifier so audit logs pivot
106/// L2-revoked vs L2-substrate-down distinct from L1 sv-axis events.
107#[derive(Debug, thiserror::Error)]
108#[non_exhaustive]
109pub enum SessionLivenessError {
110    /// Session row absent OR `revoked_at` is set. Token rejected as
111    /// [`crate::VerifyError::SessionRevoked`]. The two surface causes
112    /// (absent row vs revoked row) collapse to one variant because
113    /// audit-time distinction is rarely actionable: a user whose row
114    /// was deleted is in the same security state as one whose row was
115    /// flipped to revoked. Substrate-side logging records the cause if
116    /// needed.
117    #[error("session revoked or not found")]
118    Revoked,
119    /// Substrate could not answer (connection error, query timeout,
120    /// schema unavailable). Carries a free-form detail for audit logs;
121    /// the verifier collapses every Transient variant onto
122    /// [`crate::VerifyError::SessionLivenessLookupUnavailable`]
123    /// (fail-closed) so the engine sees a uniform contract regardless
124    /// of substrate flavor.
125    #[error("session liveness substrate unavailable: {0}")]
126    Transient(String),
127}
128
129#[cfg(test)]
130#[allow(clippy::unwrap_used)]
131mod tests {
132    //! Pins the contract — Display strings, error categorization, and
133    //! the no-`sid` lenient-skip via in-process port construction. The
134    //! verifier-slot integration is exercised by
135    //! `tests/session_liveness_lookup_boundary.rs`.
136    use super::*;
137    use std::sync::Mutex;
138
139    #[derive(Debug)]
140    struct CountingLiveness {
141        responses: Mutex<Vec<Result<(), SessionLivenessError>>>,
142        calls: Mutex<u32>,
143    }
144
145    #[async_trait]
146    impl SessionLiveness for CountingLiveness {
147        async fn check(&self, _sid: &SessionId) -> Result<(), SessionLivenessError> {
148            *self.calls.lock().unwrap() += 1;
149            self.responses
150                .lock()
151                .unwrap()
152                .pop()
153                .unwrap_or_else(|| Err(SessionLivenessError::Transient("exhausted".into())))
154        }
155    }
156
157    #[tokio::test]
158    async fn live_admits() {
159        let port = CountingLiveness {
160            responses: Mutex::new(vec![Ok(())]),
161            calls: Mutex::new(0),
162        };
163        let sid = SessionId::from("01HZAA00000000000000000000".to_string());
164        assert!(matches!(port.check(&sid).await, Ok(())));
165    }
166
167    #[tokio::test]
168    async fn revoked_surfaces_revoked_variant() {
169        let port = CountingLiveness {
170            responses: Mutex::new(vec![Err(SessionLivenessError::Revoked)]),
171            calls: Mutex::new(0),
172        };
173        let sid = SessionId::from("01HZAA00000000000000000000".to_string());
174        assert!(matches!(port.check(&sid).await, Err(SessionLivenessError::Revoked)));
175    }
176
177    #[tokio::test]
178    async fn transient_carries_detail_for_audit() {
179        let port = CountingLiveness {
180            responses: Mutex::new(vec![Err(SessionLivenessError::Transient(
181                "connection refused: pgpool dead".into(),
182            ))]),
183            calls: Mutex::new(0),
184        };
185        let sid = SessionId::from("01HZAA00000000000000000000".to_string());
186        match port.check(&sid).await {
187            Err(SessionLivenessError::Transient(detail)) => {
188                assert!(detail.contains("connection refused"), "{detail}");
189            }
190            other => panic!("expected Transient, got {other:?}"),
191        }
192    }
193
194    #[test]
195    fn display_strings_stable_for_audit_pivot() {
196        // Audit dashboards may match on these strings — keep them stable
197        // unless deliberately rewriting the audit pivot path. The eng
198        // log format is the same as the engine's transient/stale
199        // disambiguation in STANDARDS_AUTH_INVALIDATION §3.
200        assert_eq!(
201            SessionLivenessError::Revoked.to_string(),
202            "session revoked or not found"
203        );
204        assert_eq!(
205            SessionLivenessError::Transient("foo".into()).to_string(),
206            "session liveness substrate unavailable: foo"
207        );
208    }
209}