use std::cmp::Ordering;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::time::Instant;
use dashmap::DashMap;
use crate::domain::{
DomainError, DomainResult,
aggregates::StreamSession,
entities::{Stream, stream::StreamState},
ports::{
Pagination, PriorityDistribution, SessionHealthSnapshot, SessionQueryCriteria,
SessionQueryResult, SortOrder, StreamFilter, StreamRepositoryGat, StreamStatistics,
StreamStatus, StreamStoreGat,
},
value_objects::{SessionId, StreamId},
};
use super::generic_store::{SessionStore, StreamStore};
use super::limits::{MAX_HEALTH_METRICS, MAX_RESULTS_LIMIT, MAX_SCAN_LIMIT};
const STATS_CACHE_TTL_SECS: u64 = 5;
#[derive(Debug)]
struct CachedSessionStats {
total_frames: AtomicU64,
computed_at_secs: AtomicU64,
}
impl Clone for CachedSessionStats {
fn clone(&self) -> Self {
Self {
total_frames: AtomicU64::new(self.total_frames.load(AtomicOrdering::Relaxed)),
computed_at_secs: AtomicU64::new(self.computed_at_secs.load(AtomicOrdering::Relaxed)),
}
}
}
impl CachedSessionStats {
fn new(total_frames: u64) -> Self {
Self {
total_frames: AtomicU64::new(total_frames),
computed_at_secs: AtomicU64::new(current_timestamp_secs()),
}
}
fn is_valid(&self) -> bool {
let now = current_timestamp_secs();
let computed_at = self.computed_at_secs.load(AtomicOrdering::Relaxed);
now.saturating_sub(computed_at) < STATS_CACHE_TTL_SECS
}
fn get_total_frames(&self) -> u64 {
self.total_frames.load(AtomicOrdering::Relaxed)
}
fn update(&self, total_frames: u64) {
self.total_frames
.store(total_frames, AtomicOrdering::Relaxed);
self.computed_at_secs
.store(current_timestamp_secs(), AtomicOrdering::Relaxed);
}
}
fn current_timestamp_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[derive(Debug)]
pub struct GatInMemoryStreamRepository {
store: SessionStore,
stats_cache: DashMap<SessionId, CachedSessionStats>,
}
impl Clone for GatInMemoryStreamRepository {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
stats_cache: self.stats_cache.clone(),
}
}
}
impl Default for GatInMemoryStreamRepository {
fn default() -> Self {
Self::new()
}
}
impl GatInMemoryStreamRepository {
pub fn new() -> Self {
Self {
store: SessionStore::new(),
stats_cache: DashMap::new(),
}
}
pub fn session_count(&self) -> usize {
self.store.count()
}
pub fn clear(&self) {
self.store.clear();
self.stats_cache.clear();
}
pub fn all_session_ids(&self) -> Vec<SessionId> {
self.store.all_keys()
}
fn matches_criteria(session: &StreamSession, criteria: &SessionQueryCriteria) -> bool {
if let Some(states) = &criteria.states {
let state_str = session.state().as_str();
if !states.iter().any(|s| s.eq_ignore_ascii_case(state_str)) {
return false;
}
}
if let Some(after) = criteria.created_after
&& session.created_at() < after
{
return false;
}
if let Some(before) = criteria.created_before
&& session.created_at() > before
{
return false;
}
if let Some(has_active) = criteria.has_active_streams {
let active_count = session.streams().values().filter(|s| s.is_active()).count();
if has_active && active_count == 0 {
return false;
}
if !has_active && active_count > 0 {
return false;
}
}
let stream_count = session.streams().len();
if let Some(min) = criteria.min_stream_count
&& stream_count < min
{
return false;
}
if let Some(max) = criteria.max_stream_count
&& stream_count > max
{
return false;
}
if let Some(pattern) = &criteria.client_info_pattern {
match session.client_info() {
Some(client_info) => {
if !client_info.to_lowercase().contains(&pattern.to_lowercase()) {
return false;
}
}
None => return false, }
}
true
}
fn compare_by_field(a: &StreamSession, b: &StreamSession, field: &str) -> Ordering {
match field {
"created_at" => a.created_at().cmp(&b.created_at()),
"updated_at" => a.updated_at().cmp(&b.updated_at()),
"stream_count" => a.streams().len().cmp(&b.streams().len()),
_ => Ordering::Equal,
}
}
fn get_cached_total_frames(&self, session: &StreamSession) -> u64 {
let session_id = session.id();
if let Some(cached) = self.stats_cache.get(&session_id)
&& cached.is_valid()
{
return cached.get_total_frames();
}
let total_frames: u64 = session
.streams()
.values()
.map(|s| s.stats().total_frames)
.sum();
self.stats_cache
.entry(session_id)
.and_modify(|cached| cached.update(total_frames))
.or_insert_with(|| CachedSessionStats::new(total_frames));
total_frames
}
fn invalidate_stats_cache(&self, session_id: &SessionId) {
self.stats_cache.remove(session_id);
}
}
impl StreamRepositoryGat for GatInMemoryStreamRepository {
type FindSessionFuture<'a>
= impl Future<Output = DomainResult<Option<StreamSession>>> + Send + 'a
where
Self: 'a;
type SaveSessionFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type RemoveSessionFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type FindActiveSessionsFuture<'a>
= impl Future<Output = DomainResult<Vec<StreamSession>>> + Send + 'a
where
Self: 'a;
type FindSessionsByCriteriaFuture<'a>
= impl Future<Output = DomainResult<SessionQueryResult>> + Send + 'a
where
Self: 'a;
type GetSessionHealthFuture<'a>
= impl Future<Output = DomainResult<SessionHealthSnapshot>> + Send + 'a
where
Self: 'a;
type SessionExistsFuture<'a>
= impl Future<Output = DomainResult<bool>> + Send + 'a
where
Self: 'a;
fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
async move { Ok(self.store.get(&session_id)) }
}
fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
async move {
let session_id = session.id();
self.invalidate_stats_cache(&session_id);
self.store.insert(session_id, session);
Ok(())
}
}
fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
async move {
self.invalidate_stats_cache(&session_id);
self.store.remove(&session_id);
Ok(())
}
}
fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
async move {
let (sessions, _) =
self.store
.filter_limited(|s| s.is_active(), MAX_RESULTS_LIMIT, MAX_SCAN_LIMIT);
Ok(sessions)
}
}
fn find_sessions_by_criteria(
&self,
criteria: SessionQueryCriteria,
pagination: Pagination,
) -> Self::FindSessionsByCriteriaFuture<'_> {
async move {
criteria.validate()?;
pagination.validate()?;
let start = Instant::now();
let (mut filtered, scan_limit_reached) = self.store.filter_limited(
|session| Self::matches_criteria(session, &criteria),
MAX_RESULTS_LIMIT,
MAX_SCAN_LIMIT,
);
let total_count = filtered.len();
if let Some(sort_field) = &pagination.sort_by {
filtered.sort_by(|a, b| {
let cmp = Self::compare_by_field(a, b, sort_field);
match pagination.sort_order {
SortOrder::Ascending => cmp,
SortOrder::Descending => cmp.reverse(),
}
});
}
let paginated: Vec<StreamSession> = filtered
.into_iter()
.skip(pagination.offset)
.take(pagination.limit)
.collect();
let has_more = pagination.offset + paginated.len() < total_count;
Ok(SessionQueryResult {
sessions: paginated,
total_count,
has_more,
query_duration_ms: start.elapsed().as_millis() as u64,
scan_limit_reached,
})
}
}
fn get_session_health(&self, session_id: SessionId) -> Self::GetSessionHealthFuture<'_> {
async move {
match self.store.get(&session_id) {
Some(session) => {
let health = session.health_check();
let total_frames = self.get_cached_total_frames(&session);
let failed_streams = health.failed_streams as f64;
let total_streams = session.streams().len() as f64;
let error_rate = if total_streams > 0.0 {
failed_streams / total_streams
} else {
0.0
};
let mut metrics = std::collections::HashMap::with_capacity(MAX_HEALTH_METRICS);
metrics.insert("active_streams".to_string(), health.active_streams as f64);
metrics.insert(
"total_bytes".to_string(),
session.stats().total_bytes as f64,
);
metrics.insert(
"avg_duration_ms".to_string(),
session.stats().average_stream_duration_ms,
);
debug_assert!(
metrics.len() <= MAX_HEALTH_METRICS,
"Health metrics exceeded MAX_HEALTH_METRICS"
);
Ok(SessionHealthSnapshot {
session_id,
is_healthy: health.is_healthy,
active_streams: health.active_streams,
total_frames,
last_activity: session.updated_at(),
error_rate,
metrics,
})
}
None => Err(DomainError::SessionNotFound(format!(
"Session {} not found",
session_id
))),
}
}
}
#[inline]
fn session_exists(&self, session_id: SessionId) -> Self::SessionExistsFuture<'_> {
async move { Ok(self.store.contains_key(&session_id)) }
}
}
#[derive(Debug, Clone, Default)]
pub struct GatInMemoryStreamStore {
store: StreamStore,
}
impl GatInMemoryStreamStore {
pub fn new() -> Self {
Self {
store: StreamStore::new(),
}
}
pub fn stream_count(&self) -> usize {
self.store.count()
}
pub fn clear(&self) {
self.store.clear();
}
pub fn all_stream_ids(&self) -> Vec<StreamId> {
self.store.all_keys()
}
fn stream_state_matches_status(state: &StreamState, status: &StreamStatus) -> bool {
matches!(
(state, status),
(StreamState::Preparing, StreamStatus::Created)
| (StreamState::Streaming, StreamStatus::Active)
| (StreamState::Completed, StreamStatus::Completed)
| (StreamState::Failed, StreamStatus::Failed)
| (StreamState::Cancelled, StreamStatus::Cancelled)
)
}
fn matches_stream_filter(
stream: &Stream,
session_id: SessionId,
filter: &StreamFilter,
) -> bool {
if stream.session_id() != session_id {
return false;
}
if let Some(statuses) = &filter.statuses {
let matches_status = statuses
.iter()
.any(|status| Self::stream_state_matches_status(stream.state(), status));
if !matches_status {
return false;
}
}
if let Some(after) = filter.created_after
&& stream.created_at() < after
{
return false;
}
if let Some(has_frames) = filter.has_frames {
let frame_count = stream.stats().total_frames;
if has_frames && frame_count == 0 {
return false;
}
if !has_frames && frame_count > 0 {
return false;
}
}
true
}
}
impl StreamStoreGat for GatInMemoryStreamStore {
type StoreStreamFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type GetStreamFuture<'a>
= impl Future<Output = DomainResult<Option<Stream>>> + Send + 'a
where
Self: 'a;
type DeleteStreamFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type ListStreamsForSessionFuture<'a>
= impl Future<Output = DomainResult<Vec<Stream>>> + Send + 'a
where
Self: 'a;
type FindStreamsBySessionFuture<'a>
= impl Future<Output = DomainResult<Vec<Stream>>> + Send + 'a
where
Self: 'a;
type UpdateStreamStatusFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type GetStreamStatisticsFuture<'a>
= impl Future<Output = DomainResult<StreamStatistics>> + Send + 'a
where
Self: 'a;
fn store_stream(&self, stream: Stream) -> Self::StoreStreamFuture<'_> {
async move {
self.store.insert(stream.id(), stream);
Ok(())
}
}
fn get_stream(&self, stream_id: StreamId) -> Self::GetStreamFuture<'_> {
async move { Ok(self.store.get(&stream_id)) }
}
fn delete_stream(&self, stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
async move {
self.store.remove(&stream_id);
Ok(())
}
}
fn list_streams_for_session(
&self,
session_id: SessionId,
) -> Self::ListStreamsForSessionFuture<'_> {
async move {
let (streams, _) = self.store.filter_limited(
|s| s.session_id() == session_id,
MAX_RESULTS_LIMIT,
MAX_SCAN_LIMIT,
);
Ok(streams)
}
}
fn find_streams_by_session(
&self,
session_id: SessionId,
filter: StreamFilter,
) -> Self::FindStreamsBySessionFuture<'_> {
async move {
let (streams, _) = self.store.filter_limited(
|stream| Self::matches_stream_filter(stream, session_id, &filter),
MAX_RESULTS_LIMIT,
MAX_SCAN_LIMIT,
);
Ok(streams)
}
}
fn update_stream_status(
&self,
stream_id: StreamId,
status: StreamStatus,
) -> Self::UpdateStreamStatusFuture<'_> {
async move {
match self.store.update_with(&stream_id, |stream| {
match status {
StreamStatus::Active => stream.start_streaming(),
StreamStatus::Completed => stream.complete(),
StreamStatus::Failed => stream.fail("Status update to Failed".to_string()),
StreamStatus::Cancelled => stream.cancel(),
StreamStatus::Paused => {
Err(DomainError::InvalidStateTransition(
"Cannot transition to Paused status: not supported by StreamState"
.to_string(),
))
}
StreamStatus::Created => {
Err(DomainError::InvalidStateTransition(
"Cannot transition to Created status".to_string(),
))
}
}
}) {
Some(result) => result,
None => Err(DomainError::StreamNotFound(format!(
"Stream {} not found",
stream_id
))),
}
}
}
fn get_stream_statistics(&self, stream_id: StreamId) -> Self::GetStreamStatisticsFuture<'_> {
async move {
match self.store.get(&stream_id) {
Some(stream) => {
let stats = stream.stats();
let high_frames = if stats.average_frame_size > 0.0 {
let ratio = stats.high_priority_bytes as f64 / stats.average_frame_size;
saturating_f64_to_u64(ratio)
} else {
0
};
let priority_dist = PriorityDistribution {
critical_frames: stats.skeleton_frames
+ stats.complete_frames
+ stats.error_frames,
high_frames,
medium_frames: 0, low_frames: 0,
background_frames: 0,
};
Ok(StreamStatistics {
total_frames: stats.total_frames,
total_bytes: stats.total_bytes,
priority_distribution: priority_dist,
avg_frame_size: stats.average_frame_size,
creation_time: stream.created_at(),
completion_time: stream.completed_at(),
processing_duration: stream.duration().map(|d| {
std::time::Duration::from_millis(
d.num_milliseconds().try_into().unwrap_or(0),
)
}),
})
}
None => Err(DomainError::StreamNotFound(format!(
"Stream {} not found",
stream_id
))),
}
}
}
}
#[inline]
fn saturating_f64_to_u64(value: f64) -> u64 {
if value.is_nan() || value < 0.0 {
0
} else if value >= u64::MAX as f64 {
u64::MAX
} else {
value as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::{
aggregates::stream_session::SessionConfig, entities::stream::StreamConfig,
value_objects::JsonData,
};
use chrono::{Duration, Utc};
#[test]
fn test_saturating_f64_to_u64_normal_values() {
assert_eq!(saturating_f64_to_u64(0.0), 0);
assert_eq!(saturating_f64_to_u64(1.5), 1);
assert_eq!(saturating_f64_to_u64(100.9), 100);
assert_eq!(saturating_f64_to_u64(1_000_000.0), 1_000_000);
}
#[test]
fn test_saturating_f64_to_u64_edge_cases() {
assert_eq!(saturating_f64_to_u64(f64::NAN), 0);
assert_eq!(saturating_f64_to_u64(f64::NEG_INFINITY), 0);
assert_eq!(saturating_f64_to_u64(-1.0), 0);
assert_eq!(saturating_f64_to_u64(f64::INFINITY), u64::MAX);
assert_eq!(saturating_f64_to_u64(1e20), u64::MAX);
}
#[tokio::test]
async fn test_gat_repository_crud() {
let repo = GatInMemoryStreamRepository::new();
let session = StreamSession::new(SessionConfig::default());
let session_id = session.id();
repo.save_session(session.clone()).await.unwrap();
let found = repo.find_session(session_id).await.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id(), session_id);
repo.remove_session(session_id).await.unwrap();
let not_found = repo.find_session(session_id).await.unwrap();
assert!(not_found.is_none());
}
#[tokio::test]
async fn test_gat_store_crud() {
let store = GatInMemoryStreamStore::new();
assert_eq!(store.stream_count(), 0);
store.clear();
assert_eq!(store.stream_count(), 0);
}
#[tokio::test]
async fn test_find_sessions_by_criteria_empty() {
let repo = GatInMemoryStreamRepository::new();
let criteria = SessionQueryCriteria::default();
let pagination = Pagination::default();
let result = repo
.find_sessions_by_criteria(criteria, pagination)
.await
.unwrap();
assert_eq!(result.sessions.len(), 0);
assert_eq!(result.total_count, 0);
assert!(!result.has_more);
assert!(!result.scan_limit_reached);
}
#[tokio::test]
async fn test_find_sessions_by_criteria_state_filter() {
let repo = GatInMemoryStreamRepository::new();
let mut active_session = StreamSession::new(SessionConfig::default());
active_session.activate().unwrap();
let active_id = active_session.id();
repo.save_session(active_session).await.unwrap();
let inactive_session = StreamSession::new(SessionConfig::default());
repo.save_session(inactive_session).await.unwrap();
let criteria = SessionQueryCriteria {
states: Some(vec!["Active".to_string()]),
..Default::default()
};
let pagination = Pagination::default();
let result = repo
.find_sessions_by_criteria(criteria, pagination)
.await
.unwrap();
assert_eq!(result.total_count, 1);
assert_eq!(result.sessions[0].id(), active_id);
}
#[tokio::test]
async fn test_find_sessions_by_criteria_time_range() {
let repo = GatInMemoryStreamRepository::new();
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
repo.save_session(session).await.unwrap();
let now = Utc::now();
let criteria = SessionQueryCriteria {
created_after: Some(now - Duration::hours(1)),
created_before: Some(now + Duration::hours(1)),
..Default::default()
};
let pagination = Pagination::default();
let result = repo
.find_sessions_by_criteria(criteria, pagination)
.await
.unwrap();
assert_eq!(result.total_count, 1);
let criteria_future = SessionQueryCriteria {
created_after: Some(now + Duration::hours(1)),
..Default::default()
};
let result_future = repo
.find_sessions_by_criteria(criteria_future, Pagination::default())
.await
.unwrap();
assert_eq!(result_future.total_count, 0);
}
#[tokio::test]
async fn test_find_sessions_by_criteria_stream_count() {
let repo = GatInMemoryStreamRepository::new();
let mut session_with_streams = StreamSession::new(SessionConfig::default());
session_with_streams.activate().unwrap();
session_with_streams
.create_stream(JsonData::String("test1".to_string()))
.unwrap();
session_with_streams
.create_stream(JsonData::String("test2".to_string()))
.unwrap();
let session_with_streams_id = session_with_streams.id();
repo.save_session(session_with_streams).await.unwrap();
let mut session_no_streams = StreamSession::new(SessionConfig::default());
session_no_streams.activate().unwrap();
repo.save_session(session_no_streams).await.unwrap();
let criteria = SessionQueryCriteria {
min_stream_count: Some(2),
..Default::default()
};
let result = repo
.find_sessions_by_criteria(criteria, Pagination::default())
.await
.unwrap();
assert_eq!(result.total_count, 1);
assert_eq!(result.sessions[0].id(), session_with_streams_id);
let criteria_max = SessionQueryCriteria {
max_stream_count: Some(1),
..Default::default()
};
let result_max = repo
.find_sessions_by_criteria(criteria_max, Pagination::default())
.await
.unwrap();
assert_eq!(result_max.total_count, 1);
}
#[tokio::test]
async fn test_find_sessions_by_criteria_pagination() {
let repo = GatInMemoryStreamRepository::new();
for _ in 0..5 {
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
repo.save_session(session).await.unwrap();
}
let pagination = Pagination {
offset: 2,
limit: 2,
..Default::default()
};
let result = repo
.find_sessions_by_criteria(SessionQueryCriteria::default(), pagination)
.await
.unwrap();
assert_eq!(result.sessions.len(), 2);
assert_eq!(result.total_count, 5);
assert!(result.has_more); }
#[tokio::test]
async fn test_find_sessions_by_criteria_sorting() {
let repo = GatInMemoryStreamRepository::new();
let mut session1 = StreamSession::new(SessionConfig::default());
session1.activate().unwrap();
session1
.create_stream(JsonData::String("s1".to_string()))
.unwrap();
repo.save_session(session1).await.unwrap();
let mut session2 = StreamSession::new(SessionConfig::default());
session2.activate().unwrap();
session2
.create_stream(JsonData::String("s2".to_string()))
.unwrap();
session2
.create_stream(JsonData::String("s3".to_string()))
.unwrap();
session2
.create_stream(JsonData::String("s4".to_string()))
.unwrap();
let session2_id = session2.id();
repo.save_session(session2).await.unwrap();
let pagination = Pagination {
sort_by: Some("stream_count".to_string()),
sort_order: SortOrder::Descending,
..Default::default()
};
let result = repo
.find_sessions_by_criteria(SessionQueryCriteria::default(), pagination)
.await
.unwrap();
assert_eq!(result.sessions.len(), 2);
assert_eq!(result.sessions[0].id(), session2_id); }
#[tokio::test]
async fn test_find_sessions_validates_criteria() {
let repo = GatInMemoryStreamRepository::new();
let criteria = SessionQueryCriteria {
min_stream_count: Some(10),
max_stream_count: Some(5),
..Default::default()
};
let result = repo
.find_sessions_by_criteria(criteria, Pagination::default())
.await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), DomainError::InvalidInput(_)));
}
#[tokio::test]
async fn test_find_sessions_validates_pagination() {
let repo = GatInMemoryStreamRepository::new();
let pagination = Pagination {
offset: 0,
limit: 0,
sort_by: None,
sort_order: SortOrder::Ascending,
};
let result = repo
.find_sessions_by_criteria(SessionQueryCriteria::default(), pagination)
.await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), DomainError::InvalidInput(_)));
}
#[tokio::test]
async fn test_get_session_health_existing() {
let repo = GatInMemoryStreamRepository::new();
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
let session_id = session.id();
repo.save_session(session).await.unwrap();
let health = repo.get_session_health(session_id).await.unwrap();
assert_eq!(health.session_id, session_id);
assert!(health.is_healthy);
assert_eq!(health.active_streams, 0);
assert!(health.error_rate == 0.0);
}
#[tokio::test]
async fn test_get_session_health_returns_not_found() {
let repo = GatInMemoryStreamRepository::new();
let missing_session_id = SessionId::new();
let result = repo.get_session_health(missing_session_id).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
DomainError::SessionNotFound(_)
));
}
#[tokio::test]
async fn test_get_session_health_uses_cache() {
let repo = GatInMemoryStreamRepository::new();
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
let session_id = session.id();
repo.save_session(session).await.unwrap();
let health1 = repo.get_session_health(session_id).await.unwrap();
assert_eq!(health1.total_frames, 0);
let health2 = repo.get_session_health(session_id).await.unwrap();
assert_eq!(health2.total_frames, 0);
assert!(repo.stats_cache.contains_key(&session_id));
}
#[tokio::test]
async fn test_session_exists_true() {
let repo = GatInMemoryStreamRepository::new();
let session = StreamSession::new(SessionConfig::default());
let session_id = session.id();
repo.save_session(session).await.unwrap();
let exists = repo.session_exists(session_id).await.unwrap();
assert!(exists);
}
#[tokio::test]
async fn test_session_exists_false() {
let repo = GatInMemoryStreamRepository::new();
let missing_session_id = SessionId::new();
let exists = repo.session_exists(missing_session_id).await.unwrap();
assert!(!exists);
}
#[tokio::test]
async fn test_find_streams_by_session_empty() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let filter = StreamFilter::default();
let result = store
.find_streams_by_session(session_id, filter)
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn test_find_streams_by_session_status_filter() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
let stream_id = stream.id();
store.store_stream(stream).await.unwrap();
let filter = StreamFilter {
statuses: Some(vec![StreamStatus::Created]),
..Default::default()
};
let result = store
.find_streams_by_session(session_id, filter)
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].id(), stream_id);
let filter_active = StreamFilter {
statuses: Some(vec![StreamStatus::Active]),
..Default::default()
};
let result_active = store
.find_streams_by_session(session_id, filter_active)
.await
.unwrap();
assert!(result_active.is_empty());
}
#[tokio::test]
async fn test_find_streams_by_session_time_filter() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
store.store_stream(stream).await.unwrap();
let now = Utc::now();
let filter = StreamFilter {
created_after: Some(now - Duration::hours(1)),
..Default::default()
};
let result = store
.find_streams_by_session(session_id, filter)
.await
.unwrap();
assert_eq!(result.len(), 1);
let filter_future = StreamFilter {
created_after: Some(now + Duration::hours(1)),
..Default::default()
};
let result_future = store
.find_streams_by_session(session_id, filter_future)
.await
.unwrap();
assert!(result_future.is_empty());
}
#[tokio::test]
async fn test_find_streams_by_session_has_frames() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
store.store_stream(stream).await.unwrap();
let filter = StreamFilter {
has_frames: Some(false),
..Default::default()
};
let result = store
.find_streams_by_session(session_id, filter)
.await
.unwrap();
assert_eq!(result.len(), 1);
let filter_with_frames = StreamFilter {
has_frames: Some(true),
..Default::default()
};
let result_with_frames = store
.find_streams_by_session(session_id, filter_with_frames)
.await
.unwrap();
assert!(result_with_frames.is_empty());
}
#[tokio::test]
async fn test_update_stream_status_to_active() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
let stream_id = stream.id();
store.store_stream(stream).await.unwrap();
store
.update_stream_status(stream_id, StreamStatus::Active)
.await
.unwrap();
let updated = store.get_stream(stream_id).await.unwrap().unwrap();
assert!(matches!(updated.state(), StreamState::Streaming));
}
#[tokio::test]
async fn test_update_stream_status_to_completed() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let mut stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
stream.start_streaming().unwrap(); let stream_id = stream.id();
store.store_stream(stream).await.unwrap();
store
.update_stream_status(stream_id, StreamStatus::Completed)
.await
.unwrap();
let updated = store.get_stream(stream_id).await.unwrap().unwrap();
assert!(matches!(updated.state(), StreamState::Completed));
}
#[tokio::test]
async fn test_update_stream_status_returns_not_found() {
let store = GatInMemoryStreamStore::new();
let missing_stream_id = StreamId::new();
let result = store
.update_stream_status(missing_stream_id, StreamStatus::Active)
.await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
DomainError::StreamNotFound(_)
));
}
#[tokio::test]
async fn test_get_stream_statistics_existing() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
let stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
let stream_id = stream.id();
store.store_stream(stream).await.unwrap();
let stats = store.get_stream_statistics(stream_id).await.unwrap();
assert_eq!(stats.total_frames, 0);
assert_eq!(stats.total_bytes, 0);
assert!(stats.completion_time.is_none());
assert!(stats.processing_duration.is_none());
}
#[tokio::test]
async fn test_get_stream_statistics_returns_not_found() {
let store = GatInMemoryStreamStore::new();
let missing_stream_id = StreamId::new();
let result = store.get_stream_statistics(missing_stream_id).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
DomainError::StreamNotFound(_)
));
}
#[tokio::test]
async fn test_find_sessions_uses_bounded_iteration() {
let repo = GatInMemoryStreamRepository::new();
for _ in 0..10 {
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
repo.save_session(session).await.unwrap();
}
let result = repo
.find_sessions_by_criteria(SessionQueryCriteria::default(), Pagination::default())
.await
.unwrap();
assert_eq!(result.total_count, 10);
assert!(!result.scan_limit_reached);
}
#[tokio::test]
async fn test_find_active_sessions_uses_bounded_iteration() {
let repo = GatInMemoryStreamRepository::new();
for _ in 0..5 {
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
repo.save_session(session).await.unwrap();
}
let sessions = repo.find_active_sessions().await.unwrap();
assert_eq!(sessions.len(), 5);
}
#[tokio::test]
async fn test_list_streams_for_session_uses_bounded_iteration() {
let store = GatInMemoryStreamStore::new();
let session_id = SessionId::new();
for _ in 0..5 {
let stream = Stream::new(
session_id,
JsonData::String("test".to_string()),
StreamConfig::default(),
);
store.store_stream(stream).await.unwrap();
}
let streams = store.list_streams_for_session(session_id).await.unwrap();
assert_eq!(streams.len(), 5);
}
#[tokio::test]
async fn test_stats_cache_invalidated_on_save() {
let repo = GatInMemoryStreamRepository::new();
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
let session_id = session.id();
repo.save_session(session.clone()).await.unwrap();
let _ = repo.get_session_health(session_id).await.unwrap();
assert!(repo.stats_cache.contains_key(&session_id));
repo.save_session(session).await.unwrap();
}
#[tokio::test]
async fn test_stats_cache_invalidated_on_remove() {
let repo = GatInMemoryStreamRepository::new();
let mut session = StreamSession::new(SessionConfig::default());
session.activate().unwrap();
let session_id = session.id();
repo.save_session(session).await.unwrap();
let _ = repo.get_session_health(session_id).await.unwrap();
assert!(repo.stats_cache.contains_key(&session_id));
repo.remove_session(session_id).await.unwrap();
assert!(!repo.stats_cache.contains_key(&session_id));
}
}