use super::{AdapterConfig, AtpH3Error, AtpH3Result, AtpH3Stream, H3FrameCodec, StreamDirection};
use std::collections::HashMap;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq)]
pub enum SessionState {
Connecting,
Active,
Draining,
Closed,
Error(String),
}
#[derive(Debug)]
pub struct H3Session {
session_id: String,
state: SessionState,
config: AdapterConfig,
codec: H3FrameCodec,
streams: HashMap<u64, AtpH3Stream>,
datagram_send_queue: VecDeque<Vec<u8>>,
datagram_queue_high_water: usize,
next_stream_id: u64,
created_at: Instant,
last_activity: Instant,
timeout: Duration,
}
impl H3Session {
pub fn new(session_id: String, config: &AdapterConfig) -> AtpH3Result<Self> {
let now = Instant::now();
let timeout = Duration::from_millis(config.connection_timeout_ms);
Ok(Self {
session_id,
state: SessionState::Connecting,
config: config.clone(),
codec: H3FrameCodec::new(),
streams: HashMap::new(),
datagram_send_queue: VecDeque::new(),
datagram_queue_high_water: 64,
next_stream_id: 0, created_at: now,
last_activity: now,
timeout,
})
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn state(&self) -> &SessionState {
&self.state
}
pub fn codec(&self) -> &H3FrameCodec {
&self.codec
}
pub fn is_active(&self) -> bool {
matches!(self.state, SessionState::Active)
}
pub fn is_closed(&self) -> bool {
matches!(self.state, SessionState::Closed | SessionState::Error(_))
}
pub fn activate(&mut self) -> AtpH3Result<()> {
match self.state {
SessionState::Connecting => {
self.state = SessionState::Active;
self.update_activity();
Ok(())
}
_ => Err(AtpH3Error::Session(format!(
"Cannot activate session in state {:?}",
self.state
))),
}
}
pub fn create_stream(&mut self, direction: StreamDirection) -> AtpH3Result<u64> {
if !self.is_active() {
return Err(AtpH3Error::Session("Session is not active".to_string()));
}
if self.streams.len() >= self.config.max_streams as usize {
return Err(AtpH3Error::Session("Maximum streams exceeded".to_string()));
}
let stream_id = self.next_stream_id;
self.next_stream_id += 4;
let stream = AtpH3Stream::new(stream_id, direction);
self.streams.insert(stream_id, stream);
self.update_activity();
Ok(stream_id)
}
pub fn get_stream(&self, stream_id: u64) -> Option<&AtpH3Stream> {
self.streams.get(&stream_id)
}
pub fn get_stream_mut(&mut self, stream_id: u64) -> Option<&mut AtpH3Stream> {
self.update_activity();
self.streams.get_mut(&stream_id)
}
pub fn close_stream(&mut self, stream_id: u64) -> AtpH3Result<()> {
if let Some(mut stream) = self.streams.remove(&stream_id) {
stream.close()?;
}
self.update_activity();
Ok(())
}
pub fn stream_count(&self) -> usize {
self.streams.len()
}
pub fn stream_ids(&self) -> Vec<u64> {
self.streams.keys().copied().collect()
}
pub fn send_on_stream(&mut self, stream_id: u64, data: &[u8]) -> AtpH3Result<()> {
if !self.is_active() {
return Err(AtpH3Error::Session("Session is not active".to_string()));
}
let stream = self
.streams
.get_mut(&stream_id)
.ok_or_else(|| AtpH3Error::Stream(format!("Stream {} not found", stream_id)))?;
stream.send(data)?;
self.update_activity();
Ok(())
}
pub fn send_datagram(&mut self, data: &[u8]) -> AtpH3Result<()> {
if !self.is_active() {
return Err(AtpH3Error::Session("Session is not active".to_string()));
}
if self.datagram_send_queue.len() >= self.datagram_queue_high_water {
return Err(AtpH3Error::Session(
"Datagram send queue full - apply backpressure".to_string(),
));
}
if data.len() > self.config.max_datagram_size {
return Err(AtpH3Error::Session(format!(
"Datagram size {} exceeds maximum {}",
data.len(),
self.config.max_datagram_size
)));
}
self.datagram_send_queue.push_back(data.to_vec());
self.update_activity();
Ok(())
}
pub fn next_datagram(&mut self) -> Option<Vec<u8>> {
let datagram = self.datagram_send_queue.pop_front();
if datagram.is_some() {
self.update_activity();
}
datagram
}
pub fn has_pending_datagram(&self) -> bool {
!self.datagram_send_queue.is_empty()
}
pub fn datagram_queue_len(&self) -> usize {
self.datagram_send_queue.len()
}
pub fn is_timed_out(&self) -> bool {
self.last_activity.elapsed() > self.timeout
}
pub fn uptime(&self) -> Duration {
self.created_at.elapsed()
}
pub fn idle_time(&self) -> Duration {
self.last_activity.elapsed()
}
pub fn start_close(&mut self) -> AtpH3Result<()> {
match self.state {
SessionState::Active => {
self.state = SessionState::Draining;
self.update_activity();
Ok(())
}
SessionState::Draining => Ok(()), SessionState::Closed | SessionState::Error(_) => Ok(()), SessionState::Connecting => {
self.state = SessionState::Closed;
Ok(())
}
}
}
pub fn close(mut self) -> AtpH3Result<()> {
let stream_ids: Vec<u64> = self.streams.keys().copied().collect();
for stream_id in stream_ids {
self.close_stream(stream_id)?;
}
self.state = SessionState::Closed;
Ok(())
}
pub fn handle_error(&mut self, error: String) {
self.state = SessionState::Error(error);
self.update_activity();
}
pub fn stats(&self) -> SessionStats {
SessionStats {
session_id: self.session_id.clone(),
state: self.state.clone(),
stream_count: self.streams.len(),
datagram_queue_len: self.datagram_send_queue.len(),
max_streams: self.config.max_streams as usize,
uptime_ms: elapsed_millis_floor_one(self.uptime()),
idle_time_ms: elapsed_millis_floor_one(self.idle_time()),
timeout_ms: duration_millis(self.timeout),
}
}
fn update_activity(&mut self) {
self.last_activity = Instant::now();
}
}
fn elapsed_millis_floor_one(duration: Duration) -> u64 {
u64::try_from(duration.as_millis().max(1)).unwrap_or(u64::MAX)
}
fn duration_millis(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
#[derive(Debug, Clone)]
pub struct SessionStats {
pub session_id: String,
pub state: SessionState,
pub stream_count: usize,
pub datagram_queue_len: usize,
pub max_streams: usize,
pub uptime_ms: u64,
pub idle_time_ms: u64,
pub timeout_ms: u64,
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> AdapterConfig {
AdapterConfig {
max_streams: 10,
max_datagram_size: 1350,
enable_unreliable_repair: true,
connection_timeout_ms: 30000,
}
}
#[test]
fn test_session_creation() {
let config = test_config();
let session = H3Session::new("test-session".to_string(), &config).unwrap();
assert_eq!(session.session_id(), "test-session");
assert_eq!(session.state(), &SessionState::Connecting);
assert!(!session.is_active());
assert!(!session.is_closed());
assert_eq!(session.stream_count(), 0);
}
#[test]
fn test_session_activation() {
let config = test_config();
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
assert!(session.activate().is_ok());
assert_eq!(session.state(), &SessionState::Active);
assert!(session.is_active());
assert!(session.activate().is_err());
}
#[test]
fn test_stream_management() {
let config = test_config();
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
session.activate().unwrap();
let stream_id1 = session
.create_stream(StreamDirection::Bidirectional)
.unwrap();
assert_eq!(stream_id1, 0);
let stream_id2 = session.create_stream(StreamDirection::Outbound).unwrap();
assert_eq!(stream_id2, 4);
assert_eq!(session.stream_count(), 2);
assert!(session.get_stream(stream_id1).is_some());
assert!(session.get_stream(999).is_none());
assert!(session.close_stream(stream_id1).is_ok());
assert_eq!(session.stream_count(), 1);
assert!(session.get_stream(stream_id1).is_none());
}
#[test]
fn test_stream_limits() {
let mut config = test_config();
config.max_streams = 2;
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
session.activate().unwrap();
assert!(
session
.create_stream(StreamDirection::Bidirectional)
.is_ok()
);
assert!(
session
.create_stream(StreamDirection::Bidirectional)
.is_ok()
);
assert!(
session
.create_stream(StreamDirection::Bidirectional)
.is_err()
);
}
#[test]
fn test_session_closure() {
let config = test_config();
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
session.activate().unwrap();
let _stream_id = session
.create_stream(StreamDirection::Bidirectional)
.unwrap();
assert!(session.start_close().is_ok());
assert_eq!(session.state(), &SessionState::Draining);
assert!(session.close().is_ok());
}
#[test]
fn test_datagram_send() {
let config = test_config();
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
session.activate().unwrap();
let data = vec![0u8; 100];
assert!(session.send_datagram(&data).is_ok());
assert!(session.has_pending_datagram());
assert_eq!(session.datagram_queue_len(), 1);
assert_eq!(session.next_datagram(), Some(data));
assert!(!session.has_pending_datagram());
let large_data = vec![0u8; 2000];
assert!(session.send_datagram(&large_data).is_err());
}
#[test]
fn test_datagram_send_applies_backpressure() {
let config = test_config();
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
session.activate().unwrap();
for _ in 0..64 {
session.send_datagram(&[1, 2, 3]).unwrap();
}
let err = session
.send_datagram(&[1, 2, 3])
.expect_err("queue high water should apply backpressure");
assert!(err.to_string().contains("Datagram send queue full"));
}
#[test]
fn test_session_stats() {
let config = test_config();
let mut session = H3Session::new("test-session".to_string(), &config).unwrap();
session.activate().unwrap();
let _stream_id = session
.create_stream(StreamDirection::Bidirectional)
.unwrap();
let stats = session.stats();
assert_eq!(stats.session_id, "test-session");
assert_eq!(stats.state, SessionState::Active);
assert_eq!(stats.stream_count, 1);
assert_eq!(stats.datagram_queue_len, 0);
assert_eq!(stats.max_streams, 10);
assert!(stats.uptime_ms > 0);
}
#[test]
fn test_timeout_detection() {
let mut config = test_config();
config.connection_timeout_ms = 1;
let session = H3Session::new("test-session".to_string(), &config).unwrap();
std::thread::sleep(Duration::from_millis(10));
assert!(session.is_timed_out());
}
}