1use crate::document::{JsonDoc, RichTextDoc, TextDoc};
4use crate::error::SdkError;
5use crate::network::{Message, NetworkTransport, Peer, PeerId};
6use crate::presence::Awareness;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12#[derive(Clone, Debug)]
14pub enum SessionEvent {
15 PeerJoined { peer_id: PeerId, user_name: String },
17 PeerLeft { peer_id: PeerId },
19 DocumentOpened { document_id: String },
21 DocumentClosed { document_id: String },
23 Connected,
25 Disconnected,
27}
28
29pub struct Session<T: NetworkTransport> {
31 session_id: String,
32 local_peer_id: PeerId,
33 user_name: String,
34 transport: Arc<T>,
35 awareness: Arc<Awareness>,
36 text_docs: Arc<RwLock<HashMap<String, Arc<RwLock<TextDoc>>>>>,
37 rich_text_docs: Arc<RwLock<HashMap<String, Arc<RwLock<RichTextDoc>>>>>,
38 json_docs: Arc<RwLock<HashMap<String, Arc<RwLock<JsonDoc>>>>>,
39 event_tx: broadcast::Sender<SessionEvent>,
40}
41
42impl<T: NetworkTransport> Session<T> {
43 pub fn new(
48 session_id: impl Into<String>,
49 local_peer_id: PeerId,
50 user_name: impl Into<String>,
51 transport: Arc<T>,
52 ) -> Self {
53 let session_id = session_id.into();
54 let user_name = user_name.into();
55 let (event_tx, _) = broadcast::channel(100);
56
57 let awareness = Arc::new(Awareness::new(local_peer_id.0.clone(), user_name.clone()));
58
59 Self {
60 session_id,
61 local_peer_id,
62 user_name,
63 transport,
64 awareness,
65 text_docs: Arc::new(RwLock::new(HashMap::new())),
66 rich_text_docs: Arc::new(RwLock::new(HashMap::new())),
67 json_docs: Arc::new(RwLock::new(HashMap::new())),
68 event_tx,
69 }
70 }
71
72 pub fn session_id(&self) -> &str {
74 &self.session_id
75 }
76
77 pub fn local_peer_id(&self) -> &PeerId {
79 &self.local_peer_id
80 }
81
82 pub fn user_name(&self) -> &str {
84 &self.user_name
85 }
86
87 pub fn awareness(&self) -> &Arc<Awareness> {
91 &self.awareness
92 }
93
94 pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
99 self.event_tx.subscribe()
100 }
101
102 pub async fn connect(&self) -> Result<(), SdkError> {
108 let message = Message::Hello {
109 replica_id: self.local_peer_id.0.clone(),
110 user_name: self.user_name.clone(),
111 };
112
113 self.transport
115 .broadcast(message)
116 .await
117 .map_err(|e| SdkError::NetworkError(e.to_string()))?;
118
119 let _ = self.event_tx.send(SessionEvent::Connected);
120
121 Ok(())
122 }
123
124 pub async fn disconnect(&self) -> Result<(), SdkError> {
128 let _ = self.event_tx.send(SessionEvent::Disconnected);
129 Ok(())
130 }
131
132 pub fn open_text_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<TextDoc>> {
136 let document_id = document_id.into();
137 let mut docs = self.text_docs.write();
138
139 if let Some(doc) = docs.get(&document_id) {
140 doc.clone()
141 } else {
142 let doc = Arc::new(RwLock::new(TextDoc::new(
143 document_id.clone(),
144 self.local_peer_id.0.clone(),
145 )));
146 docs.insert(document_id.clone(), doc.clone());
147
148 let _ = self
149 .event_tx
150 .send(SessionEvent::DocumentOpened { document_id });
151
152 doc
153 }
154 }
155
156 pub fn open_rich_text_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<RichTextDoc>> {
160 let document_id = document_id.into();
161 let mut docs = self.rich_text_docs.write();
162
163 if let Some(doc) = docs.get(&document_id) {
164 doc.clone()
165 } else {
166 let doc = Arc::new(RwLock::new(RichTextDoc::new(
167 document_id.clone(),
168 self.local_peer_id.0.clone(),
169 )));
170 docs.insert(document_id.clone(), doc.clone());
171
172 let _ = self
173 .event_tx
174 .send(SessionEvent::DocumentOpened { document_id });
175
176 doc
177 }
178 }
179
180 pub fn open_json_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<JsonDoc>> {
184 let document_id = document_id.into();
185 let mut docs = self.json_docs.write();
186
187 if let Some(doc) = docs.get(&document_id) {
188 doc.clone()
189 } else {
190 let doc = Arc::new(RwLock::new(JsonDoc::new(
191 document_id.clone(),
192 self.local_peer_id.0.clone(),
193 )));
194 docs.insert(document_id.clone(), doc.clone());
195
196 let _ = self
197 .event_tx
198 .send(SessionEvent::DocumentOpened { document_id });
199
200 doc
201 }
202 }
203
204 pub fn close_doc(&self, document_id: &str) {
208 self.text_docs.write().remove(document_id);
209 self.rich_text_docs.write().remove(document_id);
210 self.json_docs.write().remove(document_id);
211
212 let _ = self.event_tx.send(SessionEvent::DocumentClosed {
213 document_id: document_id.to_string(),
214 });
215 }
216
217 pub fn open_documents(&self) -> Vec<String> {
219 let mut docs = Vec::new();
220 docs.extend(self.text_docs.read().keys().cloned());
221 docs.extend(self.rich_text_docs.read().keys().cloned());
222 docs.extend(self.json_docs.read().keys().cloned());
223 docs
224 }
225
226 pub async fn peers(&self) -> Vec<Peer> {
228 self.transport.connected_peers().await
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::network::MemoryTransport;
236
237 #[tokio::test]
238 async fn test_session_creation() {
239 let peer_id = PeerId::new("peer-1");
240 let transport = Arc::new(MemoryTransport::new(peer_id.clone()));
241
242 let session = Session::new("session-1", peer_id, "Alice", transport);
243
244 assert_eq!(session.session_id(), "session-1");
245 assert_eq!(session.user_name(), "Alice");
246 }
247
248 #[tokio::test]
249 async fn test_document_management() {
250 let peer_id = PeerId::new("peer-1");
251 let transport = Arc::new(MemoryTransport::new(peer_id.clone()));
252
253 let session = Session::new("session-1", peer_id, "Alice", transport);
254
255 let _text = session.open_text_doc("doc-1");
257 let _rich = session.open_rich_text_doc("doc-2");
258 let _json = session.open_json_doc("doc-3");
259
260 let docs = session.open_documents();
261 assert_eq!(docs.len(), 3);
262
263 session.close_doc("doc-1");
265
266 let docs = session.open_documents();
267 assert_eq!(docs.len(), 2);
268 }
269}