Skip to main content

danube_core/metadata/
watch.rs

1use futures::stream::Stream;
2use futures::StreamExt;
3use std::task::{Context, Poll};
4use std::{fmt, pin::Pin};
5use tokio::sync::broadcast;
6use tokio_stream::wrappers::BroadcastStream;
7
8use super::errors::{MetadataError, Result};
9
10#[derive(Debug, Clone)]
11pub enum WatchEvent {
12    Put {
13        key: Vec<u8>,
14        value: Vec<u8>,
15        mod_revision: Option<i64>,
16        version: Option<i64>,
17    },
18    Delete {
19        key: Vec<u8>,
20        mod_revision: Option<i64>,
21        version: Option<i64>,
22    },
23}
24
25pub struct WatchStream {
26    inner: Pin<Box<dyn Stream<Item = Result<WatchEvent>> + Send>>,
27}
28
29impl Stream for WatchStream {
30    type Item = Result<WatchEvent>;
31
32    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33        self.inner.as_mut().poll_next(cx)
34    }
35}
36
37impl WatchStream {
38    pub fn new(stream: impl Stream<Item = Result<WatchEvent>> + Send + 'static) -> Self {
39        Self {
40            inner: Box::pin(stream),
41        }
42    }
43    /// Create a WatchStream from a `tokio::sync::broadcast::Receiver`.
44    /// Broadcast lag (slow consumer) is surfaced as `MetadataError::WatchError`.
45    pub fn from_broadcast(rx: broadcast::Receiver<WatchEvent>) -> Self {
46        let stream = BroadcastStream::new(rx).filter_map(|result| {
47            futures::future::ready(match result {
48                Ok(event) => Some(Ok(event)),
49                Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
50                    Some(Err(MetadataError::WatchError(format!(
51                        "watch lagged by {} events — consumer should resync",
52                        n
53                    ))))
54                }
55            })
56        });
57        Self {
58            inner: Box::pin(stream),
59        }
60    }
61}
62
63impl fmt::Display for WatchEvent {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        match self {
66            WatchEvent::Put {
67                key,
68                value: _,
69                mod_revision,
70                version,
71            } => {
72                let key_str = String::from_utf8_lossy(key);
73                write!(f, "Put(key: {}", key_str)?;
74                if let Some(rev) = mod_revision {
75                    write!(f, ", mod_revision: {}", rev)?;
76                }
77                if let Some(ver) = version {
78                    write!(f, ", version: {}", ver)?;
79                }
80                write!(f, ")")
81            }
82            WatchEvent::Delete {
83                key,
84                mod_revision,
85                version,
86            } => {
87                let key_str = String::from_utf8_lossy(key);
88                write!(f, "Delete(key: {}", key_str)?;
89                if let Some(rev) = mod_revision {
90                    write!(f, ", mod_revision: {}", rev)?;
91                }
92                if let Some(ver) = version {
93                    write!(f, ", version: {}", ver)?;
94                }
95                write!(f, ")")
96            }
97        }
98    }
99}