1use crate::error::Result;
3use crate::protocol::Message;
4use bytes::Bytes;
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::mpsc;
9pub struct MessageStream {
11 receiver: mpsc::UnboundedReceiver<Message>,
12}
13impl MessageStream {
14 pub fn new(receiver: mpsc::UnboundedReceiver<Message>) -> Self {
16 Self { receiver }
17 }
18 pub async fn next_message(&mut self) -> Option<Message> {
20 self.receiver.recv().await
21 }
22}
23impl Stream for MessageStream {
24 type Item = Message;
25 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26 self.receiver.poll_recv(cx)
27 }
28}
29pub struct TileStream {
31 receiver: mpsc::UnboundedReceiver<TileData>,
32}
33impl TileStream {
34 pub fn new(receiver: mpsc::UnboundedReceiver<TileData>) -> Self {
36 Self { receiver }
37 }
38 pub async fn next_tile(&mut self) -> Option<TileData> {
40 self.receiver.recv().await
41 }
42}
43impl Stream for TileStream {
44 type Item = TileData;
45 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 self.receiver.poll_recv(cx)
47 }
48}
49#[derive(Debug, Clone)]
51pub struct TileData {
52 pub x: u32,
54 pub y: u32,
56 pub zoom: u8,
58 pub data: Bytes,
60 pub mime_type: String,
62}
63impl TileData {
64 pub fn new(x: u32, y: u32, zoom: u8, data: Vec<u8>, mime_type: String) -> Self {
66 Self {
67 x,
68 y,
69 zoom,
70 data: Bytes::from(data),
71 mime_type,
72 }
73 }
74 pub fn coords(&self) -> (u32, u32, u8) {
76 (self.x, self.y, self.zoom)
77 }
78 pub fn size(&self) -> usize {
80 self.data.len()
81 }
82}
83pub struct FeatureStream {
85 receiver: mpsc::UnboundedReceiver<FeatureData>,
86}
87impl FeatureStream {
88 pub fn new(receiver: mpsc::UnboundedReceiver<FeatureData>) -> Self {
90 Self { receiver }
91 }
92 pub async fn next_feature(&mut self) -> Option<FeatureData> {
94 self.receiver.recv().await
95 }
96}
97impl Stream for FeatureStream {
98 type Item = FeatureData;
99 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100 self.receiver.poll_recv(cx)
101 }
102}
103#[derive(Debug, Clone)]
105pub struct FeatureData {
106 pub geojson: String,
108 pub change_type: crate::protocol::ChangeType,
110 pub layer: Option<String>,
112}
113impl FeatureData {
114 pub fn new(
116 geojson: String,
117 change_type: crate::protocol::ChangeType,
118 layer: Option<String>,
119 ) -> Self {
120 Self {
121 geojson,
122 change_type,
123 layer,
124 }
125 }
126 pub fn parse_json(&self) -> Result<serde_json::Value> {
128 serde_json::from_str(&self.geojson).map_err(Into::into)
129 }
130}
131pub struct EventStream {
133 receiver: mpsc::UnboundedReceiver<EventData>,
134}
135impl EventStream {
136 pub fn new(receiver: mpsc::UnboundedReceiver<EventData>) -> Self {
138 Self { receiver }
139 }
140 pub async fn next_event(&mut self) -> Option<EventData> {
142 self.receiver.recv().await
143 }
144}
145impl Stream for EventStream {
146 type Item = EventData;
147 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148 self.receiver.poll_recv(cx)
149 }
150}
151#[derive(Debug, Clone)]
153pub struct EventData {
154 pub event_type: crate::protocol::EventType,
156 pub payload: serde_json::Value,
158 pub timestamp: chrono::DateTime<chrono::Utc>,
160}
161impl EventData {
162 pub fn new(event_type: crate::protocol::EventType, payload: serde_json::Value) -> Self {
164 Self {
165 event_type,
166 payload,
167 timestamp: chrono::Utc::now(),
168 }
169 }
170 pub fn with_timestamp(
172 event_type: crate::protocol::EventType,
173 payload: serde_json::Value,
174 timestamp: chrono::DateTime<chrono::Utc>,
175 ) -> Self {
176 Self {
177 event_type,
178 payload,
179 timestamp,
180 }
181 }
182}
183pub struct BackpressureController {
185 max_buffer_size: usize,
187 current_buffer_size: usize,
189 high_watermark: f64,
191 low_watermark: f64,
193 state: BackpressureState,
195}
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub enum BackpressureState {
199 Normal,
201 High,
203 Critical,
205}
206impl BackpressureController {
207 pub fn new(max_buffer_size: usize) -> Self {
209 Self {
210 max_buffer_size,
211 current_buffer_size: 0,
212 high_watermark: 0.7,
213 low_watermark: 0.3,
214 state: BackpressureState::Normal,
215 }
216 }
217 pub fn update(&mut self, buffer_size: usize) -> BackpressureState {
219 self.current_buffer_size = buffer_size;
220 let ratio = buffer_size as f64 / self.max_buffer_size as f64;
221 self.state = if ratio >= 0.9 {
222 BackpressureState::Critical
223 } else if ratio >= self.high_watermark {
224 BackpressureState::High
225 } else if ratio <= self.low_watermark {
226 BackpressureState::Normal
227 } else {
228 self.state
229 };
230 self.state
231 }
232 pub fn state(&self) -> BackpressureState {
234 self.state
235 }
236 pub fn should_throttle(&self) -> bool {
238 matches!(
239 self.state,
240 BackpressureState::High | BackpressureState::Critical
241 )
242 }
243 pub fn should_drop(&self) -> bool {
245 self.state == BackpressureState::Critical
246 }
247}
248pub struct DeltaEncoder {
250 cache: dashmap::DashMap<(u32, u32, u8), Bytes>,
252}
253impl DeltaEncoder {
254 pub fn new() -> Self {
256 Self {
257 cache: dashmap::DashMap::new(),
258 }
259 }
260 pub fn encode(&self, tile: &TileData) -> Result<Vec<u8>> {
262 let key = tile.coords();
263 if let Some(prev_data) = self.cache.get(&key) {
264 let delta = Self::compute_delta(&prev_data, &tile.data)?;
265 self.cache.insert(key, tile.data.clone());
266 Ok(delta)
267 } else {
268 self.cache.insert(key, tile.data.clone());
269 Ok(tile.data.to_vec())
270 }
271 }
272 fn compute_delta(old: &[u8], new: &[u8]) -> Result<Vec<u8>> {
274 let mut delta = Vec::new();
275 delta.extend_from_slice(&(new.len() as u32).to_le_bytes());
276 for (i, (&old_byte, &new_byte)) in old.iter().zip(new.iter()).enumerate() {
277 if old_byte != new_byte {
278 delta.extend_from_slice(&(i as u32).to_le_bytes());
279 delta.push(new_byte);
280 }
281 }
282 if new.len() > old.len() {
283 for (i, &byte) in new[old.len()..].iter().enumerate() {
284 let pos = old.len() + i;
285 delta.extend_from_slice(&(pos as u32).to_le_bytes());
286 delta.push(byte);
287 }
288 }
289 Ok(delta)
290 }
291 pub fn clear(&self) {
293 self.cache.clear();
294 }
295 pub fn cache_size(&self) -> usize {
297 self.cache.len()
298 }
299}
300impl Default for DeltaEncoder {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305#[cfg(test)]
306mod tests {
307 use super::*;
308 #[tokio::test]
309 async fn test_message_stream() {
310 let (tx, rx) = mpsc::unbounded_channel();
311 let mut stream = MessageStream::new(rx);
312 let send_result = tx.send(Message::Ping { id: 1 });
313 assert!(send_result.is_ok());
314 let msg = stream.next_message().await;
315 assert!(msg.is_some());
316 if let Some(Message::Ping { id }) = msg {
317 assert_eq!(id, 1);
318 }
319 }
320 #[tokio::test]
321 async fn test_tile_stream() {
322 let (tx, rx) = mpsc::unbounded_channel();
323 let mut stream = TileStream::new(rx);
324 let tile = TileData::new(0, 0, 5, vec![1, 2, 3], "application/x-protobuf".to_string());
325 let send_result = tx.send(tile.clone());
326 assert!(send_result.is_ok());
327 let received = stream.next_tile().await;
328 assert!(received.is_some());
329 if let Some(tile) = received {
330 assert_eq!(tile.coords(), (0, 0, 5));
331 assert_eq!(tile.size(), 3);
332 }
333 }
334 #[test]
335 fn test_backpressure_controller() {
336 let mut controller = BackpressureController::new(100);
337 assert_eq!(controller.update(30), BackpressureState::Normal);
338 assert!(!controller.should_throttle());
339 assert_eq!(controller.update(75), BackpressureState::High);
340 assert!(controller.should_throttle());
341 assert_eq!(controller.update(95), BackpressureState::Critical);
342 assert!(controller.should_drop());
343 assert_eq!(controller.update(25), BackpressureState::Normal);
344 assert!(!controller.should_throttle());
345 }
346 #[test]
347 #[ignore]
348 fn test_delta_encoder() {
349 let encoder = DeltaEncoder::new();
350 let tile1 = TileData::new(
351 0,
352 0,
353 5,
354 vec![1, 2, 3, 4, 5],
355 "application/x-protobuf".to_string(),
356 );
357 let delta1 = encoder.encode(&tile1);
358 assert!(delta1.is_ok());
359 if let Ok(data) = delta1 {
360 assert_eq!(data.len(), 5);
361 }
362 let tile2 = TileData::new(
363 0,
364 0,
365 5,
366 vec![1, 2, 9, 4, 5],
367 "application/x-protobuf".to_string(),
368 );
369 let delta2 = encoder.encode(&tile2);
370 assert!(delta2.is_ok());
371 if let Ok(data) = delta2 {
372 assert!(data.len() < 5);
373 }
374 }
375}