Skip to main content

post_cortex_daemon/daemon/
sse.rs

1// Copyright (c) 2025 Julius ML
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in all
11// copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19// SOFTWARE.
20
21//! Lock-free Server-Sent Events (SSE) broadcaster
22//!
23//! Uses DashMap and tokio channels for zero-blocking event broadcasting.
24
25use dashmap::DashMap;
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
30use uuid::Uuid;
31
32/// SSE event that can be broadcasted to clients
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct SseEvent {
35    /// Unique event identifier.
36    pub id: String,
37    /// Event type label.
38    pub event_type: String,
39    /// JSON event payload.
40    pub data: serde_json::Value,
41}
42
43/// Lock-free SSE broadcaster using DashMap
44///
45/// All operations are non-blocking using DashMap for client tracking
46/// and tokio unbounded channels for message passing.
47pub struct SSEBroadcaster {
48    /// Lock-free client registry
49    clients: Arc<DashMap<Uuid, UnboundedSender<SseEvent>>>,
50
51    /// Atomic event counter
52    event_counter: Arc<AtomicU64>,
53
54    /// Total clients counter
55    total_clients: Arc<AtomicU64>,
56}
57
58impl SSEBroadcaster {
59    /// Create a new broadcaster with no connected clients.
60    pub fn new() -> Self {
61        Self {
62            clients: Arc::new(DashMap::new()),
63            event_counter: Arc::new(AtomicU64::new(0)),
64            total_clients: Arc::new(AtomicU64::new(0)),
65        }
66    }
67
68    /// Register a new client (lock-free)
69    pub fn register_client(&self, id: Uuid) -> UnboundedReceiver<SseEvent> {
70        let (tx, rx) = unbounded_channel();
71        self.clients.insert(id, tx);
72        self.total_clients.fetch_add(1, Ordering::Relaxed);
73        rx
74    }
75
76    /// Unregister a client (lock-free)
77    pub fn unregister_client(&self, id: &Uuid) {
78        if self.clients.remove(id).is_some() {
79            self.total_clients.fetch_sub(1, Ordering::Relaxed);
80        }
81    }
82
83    /// Broadcast event to all clients (lock-free)
84    pub fn broadcast(&self, event: SseEvent) {
85        self.event_counter.fetch_add(1, Ordering::Relaxed);
86
87        // Lock-free iteration over DashMap
88        self.clients.iter().for_each(|entry| {
89            // Fire-and-forget send (if client channel is full/closed, it gets dropped)
90            let _ = entry.value().send(event.clone());
91        });
92    }
93
94    /// Send event to specific client (lock-free)
95    pub fn send_to_client(&self, client_id: &Uuid, event: SseEvent) -> Result<(), String> {
96        self.clients
97            .get(client_id)
98            .ok_or_else(|| format!("Client {} not found", client_id))?
99            .send(event)
100            .map_err(|e| format!("Failed to send to client: {}", e))
101    }
102
103    /// Get active client count (atomic read)
104    pub fn active_clients(&self) -> u64 {
105        self.total_clients.load(Ordering::Relaxed)
106    }
107
108    /// Get total events broadcasted (atomic read)
109    pub fn total_events(&self) -> u64 {
110        self.event_counter.load(Ordering::Relaxed)
111    }
112}
113
114impl Default for SSEBroadcaster {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[tokio::test]
125    async fn test_sse_broadcaster_registration() {
126        let broadcaster = SSEBroadcaster::new();
127
128        let client1 = Uuid::new_v4();
129        let mut rx1 = broadcaster.register_client(client1);
130
131        assert_eq!(broadcaster.active_clients(), 1);
132
133        // Broadcast event
134        let event = SseEvent {
135            id: "1".to_string(),
136            event_type: "test".to_string(),
137            data: serde_json::json!({"message": "hello"}),
138        };
139
140        broadcaster.broadcast(event.clone());
141
142        // Receive event
143        let received = rx1.recv().await.unwrap();
144        assert_eq!(received.id, "1");
145        assert_eq!(received.event_type, "test");
146
147        // Unregister
148        broadcaster.unregister_client(&client1);
149        assert_eq!(broadcaster.active_clients(), 0);
150    }
151
152    #[tokio::test]
153    async fn test_concurrent_sse_operations() {
154        let broadcaster = Arc::new(SSEBroadcaster::new());
155
156        // Register 50 clients concurrently
157        let mut handles = vec![];
158        for _ in 0..50 {
159            let bc = broadcaster.clone();
160            let handle = tokio::spawn(async move {
161                let id = Uuid::new_v4();
162                let mut rx = bc.register_client(id);
163
164                // Receive one event
165                let event = rx.recv().await;
166                assert!(event.is_some());
167
168                bc.unregister_client(&id);
169            });
170            handles.push(handle);
171        }
172
173        // Wait a bit for clients to register
174        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
175
176        // Broadcast event
177        broadcaster.broadcast(SseEvent {
178            id: "broadcast".to_string(),
179            event_type: "test".to_string(),
180            data: serde_json::json!({}),
181        });
182
183        // Wait for all clients
184        for handle in handles {
185            handle.await.unwrap();
186        }
187
188        // All clients should be unregistered
189        assert_eq!(broadcaster.active_clients(), 0);
190        assert_eq!(broadcaster.total_events(), 1);
191    }
192}