use std::sync::Arc;
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use crate::bytes::{DynamicByteBuffer, StaticByteBuffer};
use crate::cache::SharedMap;
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
use crate::certificate::{ObfuscationBufferContainer, ServerKeyPair};
use crate::crypto::{UserCryptoState, UserServerState};
use crate::defaults::DefaultExecutor;
use crate::session::SessionControllerError;
use crate::session::server::{IncomingPacket, OutgoingRouter, ServerSessionManager};
use crate::settings::consts::DEFAULT_TYPHOON_ID_LENGTH;
use crate::settings::{Settings, SettingsBuilder, keys};
use crate::tailer::{ReturnCode, Tailer};
use crate::utils::sync::create_notify_queue;
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
static TEST_SERVER_SECRET: LazyLock<crate::certificate::ServerSecret<'static>> = LazyLock::new(|| ServerKeyPair::for_tests().into_server_secret());
fn test_identity() -> StaticByteBuffer {
StaticByteBuffer::from_slice(&[0u8; DEFAULT_TYPHOON_ID_LENGTH])
}
fn fast_settings() -> Arc<Settings<DefaultExecutor>> {
Arc::new(SettingsBuilder::new().set(&keys::HEALTH_CHECK_NEXT_IN_MIN, 60_000u64).set(&keys::HEALTH_CHECK_NEXT_IN_MAX, 120_000u64).build().unwrap())
}
struct CapturingRouter {
packets: crate::utils::sync::Mutex<Vec<DynamicByteBuffer>>,
remove_count: AtomicUsize,
}
impl CapturingRouter {
fn new() -> Arc<Self> {
Arc::new(Self {
packets: crate::utils::sync::Mutex::new(Vec::new()),
remove_count: AtomicUsize::new(0),
})
}
}
#[async_trait]
impl OutgoingRouter<StaticByteBuffer> for CapturingRouter {
async fn route_packet(&self, packet: DynamicByteBuffer, _identity: &StaticByteBuffer) -> bool {
self.packets.lock().await.push(packet);
true
}
async fn remove_session(&self, _identity: &StaticByteBuffer) {
self.remove_count.fetch_add(1, Ordering::Relaxed);
}
}
async fn make_session(settings: Arc<Settings<DefaultExecutor>>, router: Arc<CapturingRouter>, num_flows: usize) -> Arc<ServerSessionManager<StaticByteBuffer, DefaultExecutor>> {
let identity = test_identity();
let initial_key = crate::bytes::FixedByteBuffer::<32>::from([0u8; 32]);
let response_body = settings.pool().allocate(Some(0));
let tailer_buf = settings.pool().allocate(Some(DEFAULT_TYPHOON_ID_LENGTH));
let handshake_tailer = Tailer::handshake(tailer_buf, &identity, 0, 1000, 1u64, 0u16);
let mut users: SharedMap<StaticByteBuffer, UserServerState> = SharedMap::new();
let (incoming_tx, _incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
let router_cloned: Arc<CapturingRouter> = Arc::clone(&router);
let router_dyn: Arc<dyn OutgoingRouter<StaticByteBuffer>> = router_cloned;
let router_weak = Arc::downgrade(&router_dyn);
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
let (session, _response) = {
let crypto_state = UserCryptoState::new(&initial_key, TEST_SERVER_SECRET.obfuscation_buffer());
ServerSessionManager::assemble_session(crypto_state, response_body, handshake_tailer, identity, &mut users, incoming_tx, router_weak, num_flows, settings).await.expect("assemble_session must succeed")
};
#[cfg(any(feature = "full_software", feature = "full_hardware"))]
let (session, _response) = {
let crypto_state = UserCryptoState::new(&initial_key);
ServerSessionManager::assemble_session(crypto_state, response_body, handshake_tailer, identity, &mut users, incoming_tx, router_weak, num_flows, settings).await.expect("assemble_session must succeed")
};
session
}
#[tokio::test]
async fn test_select_flow_fallback_when_no_flows_active() {
let settings = fast_settings();
let router = CapturingRouter::new();
let session = make_session(Arc::clone(&settings), Arc::clone(&router), 4).await;
let idx = session.select_active_flow(4);
assert_eq!(idx, 0, "must fall back to flow 0 when no flows are active");
}
#[tokio::test]
async fn test_select_flow_only_from_marked_flows() {
let settings = fast_settings();
let router = CapturingRouter::new();
let session = make_session(Arc::clone(&settings), Arc::clone(&router), 4).await;
session.note_active_flow(2);
session.note_active_flow(3);
for _ in 0..50 {
let idx = session.select_active_flow(4);
assert!(idx == 2 || idx == 3, "selected flow {idx} is not in the active set {{2, 3}}");
}
}
#[tokio::test]
async fn test_note_active_flow_idempotent() {
let settings = fast_settings();
let router = CapturingRouter::new();
let session = make_session(Arc::clone(&settings), Arc::clone(&router), 2).await;
session.note_active_flow(1);
session.note_active_flow(1);
let idx = session.select_active_flow(2);
assert_eq!(idx, 1, "double-marking flow 1 must not change selection result");
}
#[tokio::test]
async fn test_process_incoming_termination_returns_error() {
let settings = fast_settings();
let router = CapturingRouter::new();
let session = make_session(Arc::clone(&settings), Arc::clone(&router), 1).await;
let pn: u64 = 0xDEAD_BEEF_0000_0001;
let identity = test_identity();
let buf = settings.pool().allocate(Some(DEFAULT_TYPHOON_ID_LENGTH));
let tailer = Tailer::termination(buf, &identity, ReturnCode::Success, pn);
let body = settings.pool().allocate(Some(0));
let incoming = IncomingPacket {
body,
tailer,
};
let result = session.process_incoming(incoming).await;
assert!(matches!(result, Err(SessionControllerError::ConnectionTerminated(_))), "TERMINATION packet must yield ConnectionTerminated, got: {:?}", result);
}
#[tokio::test]
async fn test_process_incoming_health_check_no_payload() {
let settings = fast_settings();
let router = CapturingRouter::new();
let session = make_session(Arc::clone(&settings), Arc::clone(&router), 1).await;
let pn: u64 = 0x1111_0000_0000_0002;
let identity = test_identity();
let buf = settings.pool().allocate(Some(DEFAULT_TYPHOON_ID_LENGTH));
let tailer = Tailer::health_check(buf, &identity, 1000u32, pn);
let body = settings.pool().allocate(Some(0));
let incoming = IncomingPacket {
body,
tailer,
};
let result = session.process_incoming(incoming).await;
assert!(result.is_ok(), "health-check-only packet must return Ok, got: {result:?}");
}