#![allow(dead_code)]
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::OnceCell;
use crate::outbound::OutboundRequests;
#[cfg(feature = "http")]
use crate::outbound_ring::RingSender;
use crate::roots::RootsCache;
pub(crate) struct Session {
pub(crate) id: Option<uuid::Uuid>,
pub(crate) principal: Option<String>,
pub(crate) outbound: OnceCell<Arc<OutboundRequests>>,
pub(crate) roots_cache: OnceCell<Arc<RootsCache>>,
#[cfg(feature = "http")]
pub(crate) outbound_tx: OnceCell<RingSender<(u64, std::sync::Arc<serde_json::Value>)>>,
pub(crate) last_activity_millis: AtomicU64,
pub(crate) closed: AtomicBool,
#[cfg(feature = "http")]
pub(crate) next_event_id: AtomicU64,
#[cfg(feature = "http")]
pub(crate) sse_replay_buffer:
parking_lot::Mutex<std::collections::VecDeque<(u64, std::sync::Arc<serde_json::Value>)>>,
}
impl Session {
pub(crate) fn new_stdio() -> Self {
Self {
id: None,
principal: None,
outbound: OnceCell::new(),
roots_cache: OnceCell::new(),
#[cfg(feature = "http")]
outbound_tx: OnceCell::new(),
last_activity_millis: AtomicU64::new(0),
closed: AtomicBool::new(false),
#[cfg(feature = "http")]
next_event_id: AtomicU64::new(1),
#[cfg(feature = "http")]
sse_replay_buffer: parking_lot::Mutex::new(std::collections::VecDeque::new()),
}
}
pub(crate) fn new_http(
id: uuid::Uuid,
principal: Option<String>,
server_start: Instant,
) -> Self {
let now_millis = server_start.elapsed().as_millis() as u64;
Self {
id: Some(id),
principal,
outbound: OnceCell::new(),
roots_cache: OnceCell::new(),
#[cfg(feature = "http")]
outbound_tx: OnceCell::new(),
last_activity_millis: AtomicU64::new(now_millis),
closed: AtomicBool::new(false),
#[cfg(feature = "http")]
next_event_id: AtomicU64::new(1),
#[cfg(feature = "http")]
sse_replay_buffer: parking_lot::Mutex::new(std::collections::VecDeque::new()),
}
}
pub(crate) fn mark_active(&self, server_start: Instant) {
let now = server_start.elapsed().as_millis() as u64;
self.last_activity_millis.store(now, Ordering::Relaxed);
}
pub(crate) fn mark_closed(&self) -> bool {
self.closed.swap(true, Ordering::SeqCst)
}
pub(crate) fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
#[cfg(feature = "http")]
pub(crate) async fn close_and_drain(&self) {
self.mark_closed();
if let Some(outbound) = self.outbound.get() {
outbound.drain_pending_as_closed().await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn stdio_session_has_no_wire_id() {
let session = Session::new_stdio();
assert!(session.id.is_none());
assert!(!session.is_closed());
}
#[tokio::test]
async fn http_session_carries_minted_uuid() {
let id = uuid::Uuid::new_v4();
let session = Session::new_http(id, None, Instant::now());
assert_eq!(session.id, Some(id));
assert!(session.principal.is_none());
assert!(!session.is_closed());
}
#[tokio::test]
async fn http_session_records_principal_when_supplied() {
let id = uuid::Uuid::new_v4();
let session = Session::new_http(id, Some("alice".into()), Instant::now());
assert_eq!(session.principal.as_deref(), Some("alice"));
}
#[tokio::test]
async fn mark_closed_returns_previous_state_then_idempotent() {
let session = Session::new_stdio();
assert!(!session.mark_closed(), "first mark_closed sees false");
assert!(session.mark_closed(), "second sees true");
assert!(session.is_closed());
}
#[tokio::test]
async fn mark_active_advances_last_activity_timestamp() {
let server_start = Instant::now();
let session = Session::new_stdio();
let before = session.last_activity_millis.load(Ordering::Relaxed);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
session.mark_active(server_start);
let after = session.last_activity_millis.load(Ordering::Relaxed);
assert!(
after > before,
"mark_active must move the timestamp forward"
);
}
#[cfg(feature = "http")]
#[tokio::test]
async fn session_initialises_next_event_id_at_one() {
let session = Session::new_stdio();
let id = session.next_event_id.load(Ordering::Relaxed);
assert_eq!(id, 1);
}
#[cfg(feature = "http")]
#[tokio::test]
async fn session_initialises_empty_sse_replay_buffer() {
let session = Session::new_stdio();
let buffer = session.sse_replay_buffer.lock();
assert!(buffer.is_empty());
}
}