Skip to main content

mdcs_sdk/
session.rs

1//! Session management for collaborative editing sessions.
2
3use crate::document::{JsonDoc, RichTextDoc, TextDoc};
4use crate::error::SdkError;
5use crate::network::{Message, NetworkTransport, Peer, PeerId};
6use crate::presence::Awareness;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12/// Events emitted by a session.
13#[derive(Clone, Debug)]
14pub enum SessionEvent {
15    /// A peer joined the session.
16    PeerJoined { peer_id: PeerId, user_name: String },
17    /// A peer left the session.
18    PeerLeft { peer_id: PeerId },
19    /// A document was opened.
20    DocumentOpened { document_id: String },
21    /// A document was closed.
22    DocumentClosed { document_id: String },
23    /// Session connected.
24    Connected,
25    /// Session disconnected.
26    Disconnected,
27}
28
29/// A collaborative session that manages documents and peers.
30pub struct Session<T: NetworkTransport> {
31    session_id: String,
32    local_peer_id: PeerId,
33    user_name: String,
34    transport: Arc<T>,
35    awareness: Arc<Awareness>,
36    text_docs: Arc<RwLock<HashMap<String, Arc<RwLock<TextDoc>>>>>,
37    rich_text_docs: Arc<RwLock<HashMap<String, Arc<RwLock<RichTextDoc>>>>>,
38    json_docs: Arc<RwLock<HashMap<String, Arc<RwLock<JsonDoc>>>>>,
39    event_tx: broadcast::Sender<SessionEvent>,
40}
41
42impl<T: NetworkTransport> Session<T> {
43    /// Create a new collaborative session bound to a local peer.
44    ///
45    /// Most applications should use [`crate::client::Client::create_session`]
46    /// instead of calling this directly.
47    pub fn new(
48        session_id: impl Into<String>,
49        local_peer_id: PeerId,
50        user_name: impl Into<String>,
51        transport: Arc<T>,
52    ) -> Self {
53        let session_id = session_id.into();
54        let user_name = user_name.into();
55        let (event_tx, _) = broadcast::channel(100);
56
57        let awareness = Arc::new(Awareness::new(local_peer_id.0.clone(), user_name.clone()));
58
59        Self {
60            session_id,
61            local_peer_id,
62            user_name,
63            transport,
64            awareness,
65            text_docs: Arc::new(RwLock::new(HashMap::new())),
66            rich_text_docs: Arc::new(RwLock::new(HashMap::new())),
67            json_docs: Arc::new(RwLock::new(HashMap::new())),
68            event_tx,
69        }
70    }
71
72    /// Return the stable session identifier.
73    pub fn session_id(&self) -> &str {
74        &self.session_id
75    }
76
77    /// Return the local peer identifier for this session.
78    pub fn local_peer_id(&self) -> &PeerId {
79        &self.local_peer_id
80    }
81
82    /// Return the local user name for this session.
83    pub fn user_name(&self) -> &str {
84        &self.user_name
85    }
86
87    /// Return the presence/awareness manager.
88    ///
89    /// Use this to set cursor position, selections, and status.
90    pub fn awareness(&self) -> &Arc<Awareness> {
91        &self.awareness
92    }
93
94    /// Subscribe to session lifecycle events.
95    ///
96    /// This uses a broadcast channel; late subscribers only receive future
97    /// events.
98    pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
99        self.event_tx.subscribe()
100    }
101
102    /// Connect to the session and broadcast a handshake to peers.
103    ///
104    /// # Errors
105    ///
106    /// Returns [`SdkError::NetworkError`] if the transport broadcast fails.
107    pub async fn connect(&self) -> Result<(), SdkError> {
108        let message = Message::Hello {
109            replica_id: self.local_peer_id.0.clone(),
110            user_name: self.user_name.clone(),
111        };
112
113        // Send hello to all connected peers
114        self.transport
115            .broadcast(message)
116            .await
117            .map_err(|e| SdkError::NetworkError(e.to_string()))?;
118
119        let _ = self.event_tx.send(SessionEvent::Connected);
120
121        Ok(())
122    }
123
124    /// Disconnect from the session locally.
125    ///
126    /// This emits [`SessionEvent::Disconnected`] for local listeners.
127    pub async fn disconnect(&self) -> Result<(), SdkError> {
128        let _ = self.event_tx.send(SessionEvent::Disconnected);
129        Ok(())
130    }
131
132    /// Create or open a plain-text document by ID.
133    ///
134    /// Returns a shared, lock-protected document handle.
135    pub fn open_text_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<TextDoc>> {
136        let document_id = document_id.into();
137        let mut docs = self.text_docs.write();
138
139        if let Some(doc) = docs.get(&document_id) {
140            doc.clone()
141        } else {
142            let doc = Arc::new(RwLock::new(TextDoc::new(
143                document_id.clone(),
144                self.local_peer_id.0.clone(),
145            )));
146            docs.insert(document_id.clone(), doc.clone());
147
148            let _ = self
149                .event_tx
150                .send(SessionEvent::DocumentOpened { document_id });
151
152            doc
153        }
154    }
155
156    /// Create or open a rich-text document by ID.
157    ///
158    /// Returns a shared, lock-protected document handle.
159    pub fn open_rich_text_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<RichTextDoc>> {
160        let document_id = document_id.into();
161        let mut docs = self.rich_text_docs.write();
162
163        if let Some(doc) = docs.get(&document_id) {
164            doc.clone()
165        } else {
166            let doc = Arc::new(RwLock::new(RichTextDoc::new(
167                document_id.clone(),
168                self.local_peer_id.0.clone(),
169            )));
170            docs.insert(document_id.clone(), doc.clone());
171
172            let _ = self
173                .event_tx
174                .send(SessionEvent::DocumentOpened { document_id });
175
176            doc
177        }
178    }
179
180    /// Create or open a JSON document by ID.
181    ///
182    /// Returns a shared, lock-protected document handle.
183    pub fn open_json_doc(&self, document_id: impl Into<String>) -> Arc<RwLock<JsonDoc>> {
184        let document_id = document_id.into();
185        let mut docs = self.json_docs.write();
186
187        if let Some(doc) = docs.get(&document_id) {
188            doc.clone()
189        } else {
190            let doc = Arc::new(RwLock::new(JsonDoc::new(
191                document_id.clone(),
192                self.local_peer_id.0.clone(),
193            )));
194            docs.insert(document_id.clone(), doc.clone());
195
196            let _ = self
197                .event_tx
198                .send(SessionEvent::DocumentOpened { document_id });
199
200            doc
201        }
202    }
203
204    /// Close a locally opened document handle by ID.
205    ///
206    /// If a document exists in multiple local maps, all matches are removed.
207    pub fn close_doc(&self, document_id: &str) {
208        self.text_docs.write().remove(document_id);
209        self.rich_text_docs.write().remove(document_id);
210        self.json_docs.write().remove(document_id);
211
212        let _ = self.event_tx.send(SessionEvent::DocumentClosed {
213            document_id: document_id.to_string(),
214        });
215    }
216
217    /// Return all currently opened document IDs across document types.
218    pub fn open_documents(&self) -> Vec<String> {
219        let mut docs = Vec::new();
220        docs.extend(self.text_docs.read().keys().cloned());
221        docs.extend(self.rich_text_docs.read().keys().cloned());
222        docs.extend(self.json_docs.read().keys().cloned());
223        docs
224    }
225
226    /// Return peers currently connected through the underlying transport.
227    pub async fn peers(&self) -> Vec<Peer> {
228        self.transport.connected_peers().await
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::network::MemoryTransport;
236
237    #[tokio::test]
238    async fn test_session_creation() {
239        let peer_id = PeerId::new("peer-1");
240        let transport = Arc::new(MemoryTransport::new(peer_id.clone()));
241
242        let session = Session::new("session-1", peer_id, "Alice", transport);
243
244        assert_eq!(session.session_id(), "session-1");
245        assert_eq!(session.user_name(), "Alice");
246    }
247
248    #[tokio::test]
249    async fn test_document_management() {
250        let peer_id = PeerId::new("peer-1");
251        let transport = Arc::new(MemoryTransport::new(peer_id.clone()));
252
253        let session = Session::new("session-1", peer_id, "Alice", transport);
254
255        // Open documents
256        let _text = session.open_text_doc("doc-1");
257        let _rich = session.open_rich_text_doc("doc-2");
258        let _json = session.open_json_doc("doc-3");
259
260        let docs = session.open_documents();
261        assert_eq!(docs.len(), 3);
262
263        // Close a document
264        session.close_doc("doc-1");
265
266        let docs = session.open_documents();
267        assert_eq!(docs.len(), 2);
268    }
269}