1use crate::document::{JsonDoc, RichTextDoc, TextDoc};
4use crate::error::SdkError;
5use crate::network::{Message, NetworkTransport, Peer, PeerId};
6use crate::presence::Awareness;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12#[derive(Clone, Debug)]
14pub enum SessionEvent {
15 PeerJoined { peer_id: PeerId, user_name: String },
17 PeerLeft { peer_id: PeerId },
19 DocumentOpened { document_id: String },
21 DocumentClosed { document_id: String },
23 Connected,
25 Disconnected,
27}
28
29pub struct Session<T: NetworkTransport> {
31 session_id: String,
32 local_peer_id: PeerId,
33 user_name: String,
34 transport: Arc<T>,
35 awareness: Arc<Awareness>,
36 text_docs: Arc<RwLock<HashMap<String, Arc<RwLock<TextDoc>>>>>,
37 rich_text_docs: Arc<RwLock<HashMap<String, Arc<RwLock<RichTextDoc>>>>>,
38 json_docs: Arc<RwLock<HashMap<String, Arc<RwLock<JsonDoc>>>>>,
39 event_tx: broadcast::Sender<SessionEvent>,
40}
41
42impl<T: NetworkTransport> Session<T> {
43 pub fn new(
45 session_id: impl Into<String>,
46 local_peer_id: PeerId,
47 user_name: impl Into<String>,
48 transport: Arc<T>,
49 ) -> Self {
50 let session_id = session_id.into();
51 let user_name = user_name.into();
52 let (event_tx, _) = broadcast::channel(100);
53
54 let awareness = Arc::new(Awareness::new(local_peer_id.0.clone(), user_name.clone()));
55
56 Self {
57 session_id,
58 local_peer_id,
59 user_name,
60 transport,
61 awareness,
62 text_docs: Arc::new(RwLock::new(HashMap::new())),
63 rich_text_docs: Arc::new(RwLock::new(HashMap::new())),
64 json_docs: Arc::new(RwLock::new(HashMap::new())),
65 event_tx,
66 }
67 }
68
69 pub fn session_id(&self) -> &str {
71 &self.session_id
72 }
73
74 pub fn local_peer_id(&self) -> &PeerId {
76 &self.local_peer_id
77 }
78
79 pub fn user_name(&self) -> &str {
81 &self.user_name
82 }
83
84 pub fn awareness(&self) -> &Arc<Awareness> {
86 &self.awareness
87 }
88
89 pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
91 self.event_tx.subscribe()
92 }
93
94 pub async fn connect(&self) -> Result<(), SdkError> {
96 let message = Message::Hello {
97 replica_id: self.local_peer_id.0.clone(),
98 user_name: self.user_name.clone(),
99 };
100
101 self.transport
103 .broadcast(message)
104 .await
105 .map_err(|e| SdkError::NetworkError(e.to_string()))?;
106
107 let _ = self.event_tx.send(SessionEvent::Connected);
108
109 Ok(())
110 }
111
112 pub async fn disconnect(&self) -> Result<(), SdkError> {
114 let _ = self.event_tx.send(SessionEvent::Disconnected);
115 Ok(())
116 }
117
118 pub fn open_text_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<TextDoc>> {
120 let document_id = document_id.into();
121 let mut docs = self.text_docs.write();
122
123 if let Some(doc) = docs.get(&document_id) {
124 doc.clone()
125 } else {
126 let doc = Arc::new(RwLock::new(TextDoc::new(
127 document_id.clone(),
128 self.local_peer_id.0.clone(),
129 )));
130 docs.insert(document_id.clone(), doc.clone());
131
132 let _ = self
133 .event_tx
134 .send(SessionEvent::DocumentOpened { document_id });
135
136 doc
137 }
138 }
139
140 pub fn open_rich_text_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<RichTextDoc>> {
142 let document_id = document_id.into();
143 let mut docs = self.rich_text_docs.write();
144
145 if let Some(doc) = docs.get(&document_id) {
146 doc.clone()
147 } else {
148 let doc = Arc::new(RwLock::new(RichTextDoc::new(
149 document_id.clone(),
150 self.local_peer_id.0.clone(),
151 )));
152 docs.insert(document_id.clone(), doc.clone());
153
154 let _ = self
155 .event_tx
156 .send(SessionEvent::DocumentOpened { document_id });
157
158 doc
159 }
160 }
161
162 pub fn open_json_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<JsonDoc>> {
164 let document_id = document_id.into();
165 let mut docs = self.json_docs.write();
166
167 if let Some(doc) = docs.get(&document_id) {
168 doc.clone()
169 } else {
170 let doc = Arc::new(RwLock::new(JsonDoc::new(
171 document_id.clone(),
172 self.local_peer_id.0.clone(),
173 )));
174 docs.insert(document_id.clone(), doc.clone());
175
176 let _ = self
177 .event_tx
178 .send(SessionEvent::DocumentOpened { document_id });
179
180 doc
181 }
182 }
183
184 pub fn close_doc(&self, document_id: &str) {
186 self.text_docs.write().remove(document_id);
187 self.rich_text_docs.write().remove(document_id);
188 self.json_docs.write().remove(document_id);
189
190 let _ = self.event_tx.send(SessionEvent::DocumentClosed {
191 document_id: document_id.to_string(),
192 });
193 }
194
195 pub fn open_documents(&self) -> Vec<String> {
197 let mut docs = Vec::new();
198 docs.extend(self.text_docs.read().keys().cloned());
199 docs.extend(self.rich_text_docs.read().keys().cloned());
200 docs.extend(self.json_docs.read().keys().cloned());
201 docs
202 }
203
204 pub async fn peers(&self) -> Vec<Peer> {
206 self.transport.connected_peers().await
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::network::MemoryTransport;
214
215 #[tokio::test]
216 async fn test_session_creation() {
217 let peer_id = PeerId::new("peer-1");
218 let transport = Arc::new(MemoryTransport::new(peer_id.clone()));
219
220 let session = Session::new("session-1", peer_id, "Alice", transport);
221
222 assert_eq!(session.session_id(), "session-1");
223 assert_eq!(session.user_name(), "Alice");
224 }
225
226 #[tokio::test]
227 async fn test_document_management() {
228 let peer_id = PeerId::new("peer-1");
229 let transport = Arc::new(MemoryTransport::new(peer_id.clone()));
230
231 let session = Session::new("session-1", peer_id, "Alice", transport);
232
233 let _text = session.open_text_doc("doc-1");
235 let _rich = session.open_rich_text_doc("doc-2");
236 let _json = session.open_json_doc("doc-3");
237
238 let docs = session.open_documents();
239 assert_eq!(docs.len(), 3);
240
241 session.close_doc("doc-1");
243
244 let docs = session.open_documents();
245 assert_eq!(docs.len(), 2);
246 }
247}