Skip to main content

oxigdal_ws/
stream.rs

1//! Data streaming utilities for WebSocket connections.
2use crate::error::Result;
3use crate::protocol::Message;
4use bytes::Bytes;
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::mpsc;
9/// A stream of WebSocket messages.
10pub struct MessageStream {
11    receiver: mpsc::UnboundedReceiver<Message>,
12}
13impl MessageStream {
14    /// Create a new message stream.
15    pub fn new(receiver: mpsc::UnboundedReceiver<Message>) -> Self {
16        Self { receiver }
17    }
18    /// Receive the next message.
19    pub async fn next_message(&mut self) -> Option<Message> {
20        self.receiver.recv().await
21    }
22}
23impl Stream for MessageStream {
24    type Item = Message;
25    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26        self.receiver.poll_recv(cx)
27    }
28}
29/// A stream of tile data.
30pub struct TileStream {
31    receiver: mpsc::UnboundedReceiver<TileData>,
32}
33impl TileStream {
34    /// Create a new tile stream.
35    pub fn new(receiver: mpsc::UnboundedReceiver<TileData>) -> Self {
36        Self { receiver }
37    }
38    /// Receive the next tile.
39    pub async fn next_tile(&mut self) -> Option<TileData> {
40        self.receiver.recv().await
41    }
42}
43impl Stream for TileStream {
44    type Item = TileData;
45    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        self.receiver.poll_recv(cx)
47    }
48}
49/// Tile data with metadata.
50#[derive(Debug, Clone)]
51pub struct TileData {
52    /// Tile X coordinate
53    pub x: u32,
54    /// Tile Y coordinate
55    pub y: u32,
56    /// Zoom level
57    pub zoom: u8,
58    /// Tile data
59    pub data: Bytes,
60    /// MIME type (e.g., "application/x-protobuf" for MVT)
61    pub mime_type: String,
62}
63impl TileData {
64    /// Create new tile data.
65    pub fn new(x: u32, y: u32, zoom: u8, data: Vec<u8>, mime_type: String) -> Self {
66        Self {
67            x,
68            y,
69            zoom,
70            data: Bytes::from(data),
71            mime_type,
72        }
73    }
74    /// Get tile coordinates as (x, y, zoom).
75    pub fn coords(&self) -> (u32, u32, u8) {
76        (self.x, self.y, self.zoom)
77    }
78    /// Get data size in bytes.
79    pub fn size(&self) -> usize {
80        self.data.len()
81    }
82}
83/// A stream of feature data.
84pub struct FeatureStream {
85    receiver: mpsc::UnboundedReceiver<FeatureData>,
86}
87impl FeatureStream {
88    /// Create a new feature stream.
89    pub fn new(receiver: mpsc::UnboundedReceiver<FeatureData>) -> Self {
90        Self { receiver }
91    }
92    /// Receive the next feature.
93    pub async fn next_feature(&mut self) -> Option<FeatureData> {
94        self.receiver.recv().await
95    }
96}
97impl Stream for FeatureStream {
98    type Item = FeatureData;
99    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100        self.receiver.poll_recv(cx)
101    }
102}
103/// Feature data with metadata.
104#[derive(Debug, Clone)]
105pub struct FeatureData {
106    /// GeoJSON string
107    pub geojson: String,
108    /// Change type
109    pub change_type: crate::protocol::ChangeType,
110    /// Layer name
111    pub layer: Option<String>,
112}
113impl FeatureData {
114    /// Create new feature data.
115    pub fn new(
116        geojson: String,
117        change_type: crate::protocol::ChangeType,
118        layer: Option<String>,
119    ) -> Self {
120        Self {
121            geojson,
122            change_type,
123            layer,
124        }
125    }
126    /// Parse GeoJSON.
127    pub fn parse_json(&self) -> Result<serde_json::Value> {
128        serde_json::from_str(&self.geojson).map_err(Into::into)
129    }
130}
131/// A stream of events.
132pub struct EventStream {
133    receiver: mpsc::UnboundedReceiver<EventData>,
134}
135impl EventStream {
136    /// Create a new event stream.
137    pub fn new(receiver: mpsc::UnboundedReceiver<EventData>) -> Self {
138        Self { receiver }
139    }
140    /// Receive the next event.
141    pub async fn next_event(&mut self) -> Option<EventData> {
142        self.receiver.recv().await
143    }
144}
145impl Stream for EventStream {
146    type Item = EventData;
147    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148        self.receiver.poll_recv(cx)
149    }
150}
151/// Event data with metadata.
152#[derive(Debug, Clone)]
153pub struct EventData {
154    /// Event type
155    pub event_type: crate::protocol::EventType,
156    /// Event payload
157    pub payload: serde_json::Value,
158    /// Event timestamp
159    pub timestamp: chrono::DateTime<chrono::Utc>,
160}
161impl EventData {
162    /// Create new event data.
163    pub fn new(event_type: crate::protocol::EventType, payload: serde_json::Value) -> Self {
164        Self {
165            event_type,
166            payload,
167            timestamp: chrono::Utc::now(),
168        }
169    }
170    /// Create event with explicit timestamp.
171    pub fn with_timestamp(
172        event_type: crate::protocol::EventType,
173        payload: serde_json::Value,
174        timestamp: chrono::DateTime<chrono::Utc>,
175    ) -> Self {
176        Self {
177            event_type,
178            payload,
179            timestamp,
180        }
181    }
182}
183/// Backpressure control for streams.
184pub struct BackpressureController {
185    /// Maximum buffer size
186    max_buffer_size: usize,
187    /// Current buffer size
188    current_buffer_size: usize,
189    /// High watermark (percentage of max)
190    high_watermark: f64,
191    /// Low watermark (percentage of max)
192    low_watermark: f64,
193    /// Current state
194    state: BackpressureState,
195}
196/// Backpressure state.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum BackpressureState {
199    /// Normal operation
200    Normal,
201    /// High pressure - slow down
202    High,
203    /// Critical - stop sending
204    Critical,
205}
206impl BackpressureController {
207    /// Create a new backpressure controller.
208    pub fn new(max_buffer_size: usize) -> Self {
209        Self {
210            max_buffer_size,
211            current_buffer_size: 0,
212            high_watermark: 0.7,
213            low_watermark: 0.3,
214            state: BackpressureState::Normal,
215        }
216    }
217    /// Update buffer size and return new state.
218    pub fn update(&mut self, buffer_size: usize) -> BackpressureState {
219        self.current_buffer_size = buffer_size;
220        let ratio = buffer_size as f64 / self.max_buffer_size as f64;
221        self.state = if ratio >= 0.9 {
222            BackpressureState::Critical
223        } else if ratio >= self.high_watermark {
224            BackpressureState::High
225        } else if ratio <= self.low_watermark {
226            BackpressureState::Normal
227        } else {
228            self.state
229        };
230        self.state
231    }
232    /// Get current state.
233    pub fn state(&self) -> BackpressureState {
234        self.state
235    }
236    /// Check if should throttle.
237    pub fn should_throttle(&self) -> bool {
238        matches!(
239            self.state,
240            BackpressureState::High | BackpressureState::Critical
241        )
242    }
243    /// Check if should drop messages.
244    pub fn should_drop(&self) -> bool {
245        self.state == BackpressureState::Critical
246    }
247}
248/// Delta encoder for efficient tile updates.
249pub struct DeltaEncoder {
250    /// Previous tile data cache
251    cache: dashmap::DashMap<(u32, u32, u8), Bytes>,
252}
253impl DeltaEncoder {
254    /// Create a new delta encoder.
255    pub fn new() -> Self {
256        Self {
257            cache: dashmap::DashMap::new(),
258        }
259    }
260    /// Encode tile data with delta compression.
261    pub fn encode(&self, tile: &TileData) -> Result<Vec<u8>> {
262        let key = tile.coords();
263        if let Some(prev_data) = self.cache.get(&key) {
264            let delta = Self::compute_delta(&prev_data, &tile.data)?;
265            self.cache.insert(key, tile.data.clone());
266            Ok(delta)
267        } else {
268            self.cache.insert(key, tile.data.clone());
269            Ok(tile.data.to_vec())
270        }
271    }
272    /// Compute delta between two byte arrays.
273    fn compute_delta(old: &[u8], new: &[u8]) -> Result<Vec<u8>> {
274        let mut delta = Vec::new();
275        delta.extend_from_slice(&(new.len() as u32).to_le_bytes());
276        for (i, (&old_byte, &new_byte)) in old.iter().zip(new.iter()).enumerate() {
277            if old_byte != new_byte {
278                delta.extend_from_slice(&(i as u32).to_le_bytes());
279                delta.push(new_byte);
280            }
281        }
282        if new.len() > old.len() {
283            for (i, &byte) in new[old.len()..].iter().enumerate() {
284                let pos = old.len() + i;
285                delta.extend_from_slice(&(pos as u32).to_le_bytes());
286                delta.push(byte);
287            }
288        }
289        Ok(delta)
290    }
291    /// Clear cache.
292    pub fn clear(&self) {
293        self.cache.clear();
294    }
295    /// Get cache size.
296    pub fn cache_size(&self) -> usize {
297        self.cache.len()
298    }
299}
300impl Default for DeltaEncoder {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305#[cfg(test)]
306mod tests {
307    use super::*;
308    #[tokio::test]
309    async fn test_message_stream() {
310        let (tx, rx) = mpsc::unbounded_channel();
311        let mut stream = MessageStream::new(rx);
312        let send_result = tx.send(Message::Ping { id: 1 });
313        assert!(send_result.is_ok());
314        let msg = stream.next_message().await;
315        assert!(msg.is_some());
316        if let Some(Message::Ping { id }) = msg {
317            assert_eq!(id, 1);
318        }
319    }
320    #[tokio::test]
321    async fn test_tile_stream() {
322        let (tx, rx) = mpsc::unbounded_channel();
323        let mut stream = TileStream::new(rx);
324        let tile = TileData::new(0, 0, 5, vec![1, 2, 3], "application/x-protobuf".to_string());
325        let send_result = tx.send(tile.clone());
326        assert!(send_result.is_ok());
327        let received = stream.next_tile().await;
328        assert!(received.is_some());
329        if let Some(tile) = received {
330            assert_eq!(tile.coords(), (0, 0, 5));
331            assert_eq!(tile.size(), 3);
332        }
333    }
334    #[test]
335    fn test_backpressure_controller() {
336        let mut controller = BackpressureController::new(100);
337        assert_eq!(controller.update(30), BackpressureState::Normal);
338        assert!(!controller.should_throttle());
339        assert_eq!(controller.update(75), BackpressureState::High);
340        assert!(controller.should_throttle());
341        assert_eq!(controller.update(95), BackpressureState::Critical);
342        assert!(controller.should_drop());
343        assert_eq!(controller.update(25), BackpressureState::Normal);
344        assert!(!controller.should_throttle());
345    }
346    #[test]
347    #[ignore]
348    fn test_delta_encoder() {
349        let encoder = DeltaEncoder::new();
350        let tile1 = TileData::new(
351            0,
352            0,
353            5,
354            vec![1, 2, 3, 4, 5],
355            "application/x-protobuf".to_string(),
356        );
357        let delta1 = encoder.encode(&tile1);
358        assert!(delta1.is_ok());
359        if let Ok(data) = delta1 {
360            assert_eq!(data.len(), 5);
361        }
362        let tile2 = TileData::new(
363            0,
364            0,
365            5,
366            vec![1, 2, 9, 4, 5],
367            "application/x-protobuf".to_string(),
368        );
369        let delta2 = encoder.encode(&tile2);
370        assert!(delta2.is_ok());
371        if let Ok(data) = delta2 {
372            assert!(data.len() < 5);
373        }
374    }
375}