use crate::domain::{
DomainResult,
aggregates::StreamSession,
entities::{Frame, Stream},
events::DomainEvent,
value_objects::{JsonPath, Priority, SessionId, StreamId},
};
use crate::gat_port;
use chrono::{DateTime, Utc};
use std::future::Future;
use std::time::Duration;
pub use super::repositories::{
CacheExtensions, CacheStatistics, FrameQueryResult, Pagination, PriorityDistribution,
SessionHealthSnapshot, SessionQueryCriteria, SessionQueryResult, SortOrder, StreamFilter,
StreamMetadata, StreamStatistics, StreamStatus,
};
pub use super::writer::{
BackpressureStrategy, ConnectionMetrics, ConnectionState, WriterConfig, WriterMetrics,
};
gat_port! {
pub trait FrameSourceGat {
async fn next_frame(&mut self) -> Option<Frame>;
async fn has_frames(&self) -> bool;
async fn close(&mut self) -> ();
}
}
gat_port! {
pub trait FrameSinkGat {
async fn send_frame(&mut self, frame: Frame) -> ();
async fn send_frames(&mut self, frames: Vec<Frame>) -> ();
async fn flush(&mut self) -> ();
async fn close(&mut self) -> ();
}
}
gat_port! {
pub trait StreamRepositoryGat {
async fn find_session(&self, session_id: SessionId) -> Option<StreamSession>;
async fn save_session(&self, session: StreamSession) -> ();
async fn remove_session(&self, session_id: SessionId) -> ();
async fn find_active_sessions(&self) -> Vec<StreamSession>;
async fn find_sessions_by_criteria(
&self,
criteria: SessionQueryCriteria,
pagination: Pagination
) -> SessionQueryResult;
async fn get_session_health(&self, session_id: SessionId) -> SessionHealthSnapshot;
async fn session_exists(&self, session_id: SessionId) -> bool;
}
}
gat_port! {
pub trait StreamStoreGat {
async fn store_stream(&self, stream: Stream) -> ();
async fn get_stream(&self, stream_id: StreamId) -> Option<Stream>;
async fn delete_stream(&self, stream_id: StreamId) -> ();
async fn list_streams_for_session(&self, session_id: SessionId) -> Vec<Stream>;
async fn find_streams_by_session(&self, session_id: SessionId, filter: StreamFilter) -> Vec<Stream>;
async fn update_stream_status(&self, stream_id: StreamId, status: StreamStatus) -> ();
async fn get_stream_statistics(&self, stream_id: StreamId) -> StreamStatistics;
}
}
gat_port! {
pub trait EventPublisherGat {
async fn publish(&self, event: DomainEvent) -> ();
async fn publish_batch(&self, events: Vec<DomainEvent>) -> ();
}
}
pub trait MetricsCollectorGat: Send + Sync {
type IncrementCounterFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type SetGaugeFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type RecordTimingFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
fn increment_counter<'a>(
&'a self,
name: &'a str,
value: u64,
tags: std::collections::HashMap<String, String>,
) -> Self::IncrementCounterFuture<'a>;
fn set_gauge<'a>(
&'a self,
name: &'a str,
value: f64,
tags: std::collections::HashMap<String, String>,
) -> Self::SetGaugeFuture<'a>;
fn record_timing<'a>(
&'a self,
name: &'a str,
duration: std::time::Duration,
tags: std::collections::HashMap<String, String>,
) -> Self::RecordTimingFuture<'a>;
}
gat_port! {
pub trait SessionMetricsGat {
async fn record_session_created(
&self,
session_id: SessionId,
metadata: std::collections::HashMap<String, String>
) -> ();
async fn record_session_ended(&self, session_id: SessionId) -> ();
async fn record_stream_created(&self, stream_id: StreamId, session_id: SessionId) -> ();
async fn record_stream_completed(&self, stream_id: StreamId) -> ();
}
}
pub trait SessionTransactionGat: Send + Sync {
type SaveSessionFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type RemoveSessionFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type AddStreamFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type CommitFuture: Future<Output = DomainResult<()>> + Send;
type RollbackFuture: Future<Output = DomainResult<()>> + Send;
fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_>;
fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_>;
fn add_stream(&self, session_id: SessionId, stream: Stream) -> Self::AddStreamFuture<'_>;
fn commit(self: Box<Self>) -> Self::CommitFuture;
fn rollback(self: Box<Self>) -> Self::RollbackFuture;
}
gat_port! {
pub trait FrameRepositoryGat {
async fn store_frame(&self, frame: Frame) -> ();
async fn store_frames(&self, frames: Vec<Frame>) -> ();
async fn get_frames_by_stream(
&self,
stream_id: StreamId,
priority_filter: Option<Priority>,
pagination: Pagination
) -> FrameQueryResult;
async fn get_frames_by_path(&self, stream_id: StreamId, path: JsonPath) -> Vec<Frame>;
async fn cleanup_old_frames(&self, older_than: DateTime<Utc>) -> u64;
async fn get_frame_priority_distribution(&self, stream_id: StreamId) -> PriorityDistribution;
}
}
gat_port! {
pub trait EventStoreGat {
async fn store_event(&self, event: DomainEvent, sequence: u64) -> ();
async fn store_events(&self, events: Vec<DomainEvent>) -> ();
async fn get_events_for_session(
&self,
session_id: SessionId,
from_sequence: Option<u64>,
limit: Option<usize>
) -> Vec<DomainEvent>;
async fn get_events_for_stream(
&self,
stream_id: StreamId,
from_sequence: Option<u64>,
limit: Option<usize>
) -> Vec<DomainEvent>;
async fn get_events_by_type(
&self,
event_types: Vec<String>,
time_range: Option<(DateTime<Utc>, DateTime<Utc>)>
) -> Vec<DomainEvent>;
async fn get_latest_sequence(&self) -> u64;
async fn replay_session_events(&self, session_id: SessionId) -> Vec<DomainEvent>;
}
}
pub trait CacheGat: Send + Sync {
type GetBytesFuture<'a>: Future<Output = DomainResult<Option<Vec<u8>>>> + Send + 'a
where
Self: 'a;
type SetBytesFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type RemoveFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type ClearPrefixFuture<'a>: Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type GetStatsFuture<'a>: Future<Output = DomainResult<CacheStatistics>> + Send + 'a
where
Self: 'a;
fn get_bytes<'a>(&'a self, key: &'a str) -> Self::GetBytesFuture<'a>;
fn set_bytes<'a>(
&'a self,
key: &'a str,
value: Vec<u8>,
ttl: Option<Duration>,
) -> Self::SetBytesFuture<'a>;
fn remove<'a>(&'a self, key: &'a str) -> Self::RemoveFuture<'a>;
fn clear_prefix<'a>(&'a self, prefix: &'a str) -> Self::ClearPrefixFuture<'a>;
fn get_stats(&self) -> Self::GetStatsFuture<'_>;
}
gat_port! {
pub trait FrameWriterGat {
async fn write_prioritized_frame(&mut self, frame: Frame) -> ();
async fn write_frames_by_priority(&mut self, frames: Vec<Frame>) -> ();
async fn set_backpressure_threshold(&mut self, threshold: usize) -> ();
async fn get_metrics(&self) -> WriterMetrics;
}
}
pub trait WriterFactoryGat: Send + Sync {
type StreamWriter: FrameSinkGat + Send;
type FrameWriter: FrameWriterGat + Send;
type CreateStreamWriterFuture<'a>: Future<Output = DomainResult<Self::StreamWriter>> + Send + 'a
where
Self: 'a;
type CreateFrameWriterFuture<'a>: Future<Output = DomainResult<Self::FrameWriter>> + Send + 'a
where
Self: 'a;
fn create_stream_writer<'a>(
&'a self,
connection_id: &'a str,
config: WriterConfig,
) -> Self::CreateStreamWriterFuture<'a>;
fn create_frame_writer<'a>(
&'a self,
connection_id: &'a str,
config: WriterConfig,
) -> Self::CreateFrameWriterFuture<'a>;
}
pub trait ConnectionMonitorGat: Send + Sync {
type GetConnectionStateFuture<'a>: Future<Output = DomainResult<ConnectionState>> + Send + 'a
where
Self: 'a;
type IsConnectionHealthyFuture<'a>: Future<Output = DomainResult<bool>> + Send + 'a
where
Self: 'a;
type GetConnectionMetricsFuture<'a>: Future<Output = DomainResult<ConnectionMetrics>>
+ Send
+ 'a
where
Self: 'a;
fn get_connection_state<'a>(
&'a self,
connection_id: &'a str,
) -> Self::GetConnectionStateFuture<'a>;
fn is_connection_healthy<'a>(
&'a self,
connection_id: &'a str,
) -> Self::IsConnectionHealthyFuture<'a>;
fn get_connection_metrics<'a>(
&'a self,
connection_id: &'a str,
) -> Self::GetConnectionMetricsFuture<'a>;
}
pub trait FrameSinkGatExt: FrameSinkGat + Sized {
fn send_frames_default(
&mut self,
frames: Vec<Frame>,
) -> impl Future<Output = DomainResult<()>> + Send + '_
where
Self: 'static,
{
async move {
for frame in frames {
self.send_frame(frame).await?;
}
Ok(())
}
}
}
impl<T: FrameSinkGat> FrameSinkGatExt for T {}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::entities::Frame;
use crate::domain::value_objects::{JsonData, Priority, StreamId};
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct MockFrameSinkGat {
frames: Arc<Mutex<Vec<Frame>>>,
closed: Arc<Mutex<bool>>,
}
impl MockFrameSinkGat {
pub fn new() -> Self {
Self {
frames: Arc::new(Mutex::new(Vec::new())),
closed: Arc::new(Mutex::new(false)),
}
}
pub async fn get_frames(&self) -> Vec<Frame> {
self.frames.lock().await.clone()
}
pub async fn is_closed(&self) -> bool {
*self.closed.lock().await
}
}
impl FrameSinkGat for MockFrameSinkGat {
type SendFrameFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type SendFramesFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type FlushFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
type CloseFuture<'a>
= impl Future<Output = DomainResult<()>> + Send + 'a
where
Self: 'a;
fn send_frame(&mut self, frame: Frame) -> Self::SendFrameFuture<'_> {
async move {
self.frames.lock().await.push(frame);
Ok(())
}
}
fn send_frames(&mut self, frames: Vec<Frame>) -> Self::SendFramesFuture<'_> {
async move {
self.frames.lock().await.extend(frames);
Ok(())
}
}
fn flush(&mut self) -> Self::FlushFuture<'_> {
async move { Ok(()) }
}
fn close(&mut self) -> Self::CloseFuture<'_> {
async move {
*self.closed.lock().await = true;
Ok(())
}
}
}
#[tokio::test]
async fn test_gat_frame_sink() {
let mut sink = MockFrameSinkGat::new();
let test_frame = Frame::skeleton(
StreamId::new(),
1,
JsonData::String("test data".to_string()),
);
sink.send_frame(test_frame.clone()).await.unwrap();
let frames = sink.get_frames().await;
assert_eq!(frames.len(), 1);
assert_eq!(frames[0].priority(), Priority::CRITICAL);
let more_frames = vec![test_frame.clone(), test_frame.clone()];
sink.send_frames(more_frames).await.unwrap();
let frames = sink.get_frames().await;
assert_eq!(frames.len(), 3);
sink.flush().await.unwrap();
sink.close().await.unwrap();
assert!(sink.is_closed().await);
}
#[tokio::test]
async fn test_gat_extension_trait() {
let mut sink = MockFrameSinkGat::new();
let test_frame = Frame::skeleton(
StreamId::new(),
1,
JsonData::String("test extension".to_string()),
);
let frames = vec![test_frame.clone(), test_frame.clone()];
sink.send_frames_default(frames).await.unwrap();
let stored_frames = sink.get_frames().await;
assert_eq!(stored_frames.len(), 2);
}
}