pjson_rs/infrastructure/adapters/
gat_memory_repository.rs

1//! GAT-based in-memory repository implementations
2//!
3//! Zero-cost abstractions for domain ports using Generic Associated Types.
4
5use parking_lot::RwLock;
6use std::{collections::HashMap, future::Future, sync::Arc};
7
8use crate::domain::{
9    DomainResult,
10    aggregates::StreamSession,
11    entities::Stream,
12    ports::{StreamRepositoryGat, StreamStoreGat},
13    value_objects::{SessionId, StreamId},
14};
15
16/// GAT-based in-memory implementation of StreamRepositoryGat
17#[derive(Debug, Clone)]
18pub struct GatInMemoryStreamRepository {
19    sessions: Arc<RwLock<HashMap<SessionId, StreamSession>>>,
20}
21
22impl GatInMemoryStreamRepository {
23    pub fn new() -> Self {
24        Self {
25            sessions: Arc::new(RwLock::new(HashMap::new())),
26        }
27    }
28
29    /// Get number of stored sessions
30    pub fn session_count(&self) -> usize {
31        self.sessions.read().len()
32    }
33
34    /// Clear all sessions (for testing)
35    pub fn clear(&self) {
36        self.sessions.write().clear();
37    }
38
39    /// Get all session IDs (for testing)
40    pub fn all_session_ids(&self) -> Vec<SessionId> {
41        self.sessions.read().keys().copied().collect()
42    }
43}
44
45impl Default for GatInMemoryStreamRepository {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl StreamRepositoryGat for GatInMemoryStreamRepository {
52    type FindSessionFuture<'a>
53        = impl Future<Output = DomainResult<Option<StreamSession>>> + Send + 'a
54    where
55        Self: 'a;
56
57    type SaveSessionFuture<'a>
58        = impl Future<Output = DomainResult<()>> + Send + 'a
59    where
60        Self: 'a;
61
62    type RemoveSessionFuture<'a>
63        = impl Future<Output = DomainResult<()>> + Send + 'a
64    where
65        Self: 'a;
66
67    type FindActiveSessionsFuture<'a>
68        = impl Future<Output = DomainResult<Vec<StreamSession>>> + Send + 'a
69    where
70        Self: 'a;
71
72    fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
73        async move { Ok(self.sessions.read().get(&session_id).cloned()) }
74    }
75
76    fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
77        async move {
78            self.sessions.write().insert(session.id(), session);
79            Ok(())
80        }
81    }
82
83    fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
84        async move {
85            self.sessions.write().remove(&session_id);
86            Ok(())
87        }
88    }
89
90    fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
91        async move {
92            Ok(self
93                .sessions
94                .read()
95                .values()
96                .filter(|s| s.is_active())
97                .cloned()
98                .collect())
99        }
100    }
101}
102
103/// GAT-based in-memory implementation of StreamStoreGat
104#[derive(Debug, Clone)]
105pub struct GatInMemoryStreamStore {
106    streams: Arc<RwLock<HashMap<StreamId, Stream>>>,
107}
108
109impl GatInMemoryStreamStore {
110    pub fn new() -> Self {
111        Self {
112            streams: Arc::new(RwLock::new(HashMap::new())),
113        }
114    }
115
116    /// Get number of stored streams
117    pub fn stream_count(&self) -> usize {
118        self.streams.read().len()
119    }
120
121    /// Clear all streams (for testing)
122    pub fn clear(&self) {
123        self.streams.write().clear();
124    }
125
126    /// Get all stream IDs (for testing)
127    pub fn all_stream_ids(&self) -> Vec<StreamId> {
128        self.streams.read().keys().copied().collect()
129    }
130}
131
132impl Default for GatInMemoryStreamStore {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138impl StreamStoreGat for GatInMemoryStreamStore {
139    type StoreStreamFuture<'a>
140        = impl Future<Output = DomainResult<()>> + Send + 'a
141    where
142        Self: 'a;
143
144    type GetStreamFuture<'a>
145        = impl Future<Output = DomainResult<Option<Stream>>> + Send + 'a
146    where
147        Self: 'a;
148
149    type DeleteStreamFuture<'a>
150        = impl Future<Output = DomainResult<()>> + Send + 'a
151    where
152        Self: 'a;
153
154    type ListStreamsFuture<'a>
155        = impl Future<Output = DomainResult<Vec<Stream>>> + Send + 'a
156    where
157        Self: 'a;
158
159    fn store_stream(&self, stream: Stream) -> Self::StoreStreamFuture<'_> {
160        async move {
161            self.streams.write().insert(stream.id(), stream);
162            Ok(())
163        }
164    }
165
166    fn get_stream(&self, stream_id: StreamId) -> Self::GetStreamFuture<'_> {
167        async move { Ok(self.streams.read().get(&stream_id).cloned()) }
168    }
169
170    fn delete_stream(&self, stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
171        async move {
172            self.streams.write().remove(&stream_id);
173            Ok(())
174        }
175    }
176
177    fn list_streams_for_session(&self, session_id: SessionId) -> Self::ListStreamsFuture<'_> {
178        async move {
179            Ok(self
180                .streams
181                .read()
182                .values()
183                .filter(|s| s.session_id() == session_id)
184                .cloned()
185                .collect())
186        }
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use crate::domain::aggregates::stream_session::SessionConfig;
194
195    #[tokio::test]
196    async fn test_gat_repository_crud() {
197        let repo = GatInMemoryStreamRepository::new();
198
199        // Test save and find
200        let session = StreamSession::new(SessionConfig::default());
201        let session_id = session.id();
202
203        repo.save_session(session.clone()).await.unwrap();
204
205        let found = repo.find_session(session_id).await.unwrap();
206        assert!(found.is_some());
207        assert_eq!(found.unwrap().id(), session_id);
208
209        // Test remove
210        repo.remove_session(session_id).await.unwrap();
211        let not_found = repo.find_session(session_id).await.unwrap();
212        assert!(not_found.is_none());
213    }
214
215    #[tokio::test]
216    async fn test_gat_store_crud() {
217        let store = GatInMemoryStreamStore::new();
218
219        // Would need to create a proper Stream for full testing
220        // For now just test the interface works
221        assert_eq!(store.stream_count(), 0);
222        store.clear();
223        assert_eq!(store.stream_count(), 0);
224    }
225}