pjson_rs/infrastructure/adapters/
gat_memory_repository.rs1use parking_lot::RwLock;
6use std::{collections::HashMap, future::Future, sync::Arc};
7
8use crate::domain::{
9 DomainResult,
10 aggregates::StreamSession,
11 entities::Stream,
12 ports::{StreamRepositoryGat, StreamStoreGat},
13 value_objects::{SessionId, StreamId},
14};
15
16#[derive(Debug, Clone)]
18pub struct GatInMemoryStreamRepository {
19 sessions: Arc<RwLock<HashMap<SessionId, StreamSession>>>,
20}
21
22impl GatInMemoryStreamRepository {
23 pub fn new() -> Self {
24 Self {
25 sessions: Arc::new(RwLock::new(HashMap::new())),
26 }
27 }
28
29 pub fn session_count(&self) -> usize {
31 self.sessions.read().len()
32 }
33
34 pub fn clear(&self) {
36 self.sessions.write().clear();
37 }
38
39 pub fn all_session_ids(&self) -> Vec<SessionId> {
41 self.sessions.read().keys().copied().collect()
42 }
43}
44
45impl Default for GatInMemoryStreamRepository {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl StreamRepositoryGat for GatInMemoryStreamRepository {
52 type FindSessionFuture<'a>
53 = impl Future<Output = DomainResult<Option<StreamSession>>> + Send + 'a
54 where
55 Self: 'a;
56
57 type SaveSessionFuture<'a>
58 = impl Future<Output = DomainResult<()>> + Send + 'a
59 where
60 Self: 'a;
61
62 type RemoveSessionFuture<'a>
63 = impl Future<Output = DomainResult<()>> + Send + 'a
64 where
65 Self: 'a;
66
67 type FindActiveSessionsFuture<'a>
68 = impl Future<Output = DomainResult<Vec<StreamSession>>> + Send + 'a
69 where
70 Self: 'a;
71
72 fn find_session(&self, session_id: SessionId) -> Self::FindSessionFuture<'_> {
73 async move { Ok(self.sessions.read().get(&session_id).cloned()) }
74 }
75
76 fn save_session(&self, session: StreamSession) -> Self::SaveSessionFuture<'_> {
77 async move {
78 self.sessions.write().insert(session.id(), session);
79 Ok(())
80 }
81 }
82
83 fn remove_session(&self, session_id: SessionId) -> Self::RemoveSessionFuture<'_> {
84 async move {
85 self.sessions.write().remove(&session_id);
86 Ok(())
87 }
88 }
89
90 fn find_active_sessions(&self) -> Self::FindActiveSessionsFuture<'_> {
91 async move {
92 Ok(self
93 .sessions
94 .read()
95 .values()
96 .filter(|s| s.is_active())
97 .cloned()
98 .collect())
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct GatInMemoryStreamStore {
106 streams: Arc<RwLock<HashMap<StreamId, Stream>>>,
107}
108
109impl GatInMemoryStreamStore {
110 pub fn new() -> Self {
111 Self {
112 streams: Arc::new(RwLock::new(HashMap::new())),
113 }
114 }
115
116 pub fn stream_count(&self) -> usize {
118 self.streams.read().len()
119 }
120
121 pub fn clear(&self) {
123 self.streams.write().clear();
124 }
125
126 pub fn all_stream_ids(&self) -> Vec<StreamId> {
128 self.streams.read().keys().copied().collect()
129 }
130}
131
132impl Default for GatInMemoryStreamStore {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138impl StreamStoreGat for GatInMemoryStreamStore {
139 type StoreStreamFuture<'a>
140 = impl Future<Output = DomainResult<()>> + Send + 'a
141 where
142 Self: 'a;
143
144 type GetStreamFuture<'a>
145 = impl Future<Output = DomainResult<Option<Stream>>> + Send + 'a
146 where
147 Self: 'a;
148
149 type DeleteStreamFuture<'a>
150 = impl Future<Output = DomainResult<()>> + Send + 'a
151 where
152 Self: 'a;
153
154 type ListStreamsFuture<'a>
155 = impl Future<Output = DomainResult<Vec<Stream>>> + Send + 'a
156 where
157 Self: 'a;
158
159 fn store_stream(&self, stream: Stream) -> Self::StoreStreamFuture<'_> {
160 async move {
161 self.streams.write().insert(stream.id(), stream);
162 Ok(())
163 }
164 }
165
166 fn get_stream(&self, stream_id: StreamId) -> Self::GetStreamFuture<'_> {
167 async move { Ok(self.streams.read().get(&stream_id).cloned()) }
168 }
169
170 fn delete_stream(&self, stream_id: StreamId) -> Self::DeleteStreamFuture<'_> {
171 async move {
172 self.streams.write().remove(&stream_id);
173 Ok(())
174 }
175 }
176
177 fn list_streams_for_session(&self, session_id: SessionId) -> Self::ListStreamsFuture<'_> {
178 async move {
179 Ok(self
180 .streams
181 .read()
182 .values()
183 .filter(|s| s.session_id() == session_id)
184 .cloned()
185 .collect())
186 }
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use crate::domain::aggregates::stream_session::SessionConfig;
194
195 #[tokio::test]
196 async fn test_gat_repository_crud() {
197 let repo = GatInMemoryStreamRepository::new();
198
199 let session = StreamSession::new(SessionConfig::default());
201 let session_id = session.id();
202
203 repo.save_session(session.clone()).await.unwrap();
204
205 let found = repo.find_session(session_id).await.unwrap();
206 assert!(found.is_some());
207 assert_eq!(found.unwrap().id(), session_id);
208
209 repo.remove_session(session_id).await.unwrap();
211 let not_found = repo.find_session(session_id).await.unwrap();
212 assert!(not_found.is_none());
213 }
214
215 #[tokio::test]
216 async fn test_gat_store_crud() {
217 let store = GatInMemoryStreamStore::new();
218
219 assert_eq!(store.stream_count(), 0);
222 store.clear();
223 assert_eq!(store.stream_count(), 0);
224 }
225}