danube_core/metadata/
watch.rs1use futures::stream::Stream;
2use futures::StreamExt;
3use std::task::{Context, Poll};
4use std::{fmt, pin::Pin};
5use tokio::sync::broadcast;
6use tokio_stream::wrappers::BroadcastStream;
7
8use super::errors::{MetadataError, Result};
9
10#[derive(Debug, Clone)]
11pub enum WatchEvent {
12 Put {
13 key: Vec<u8>,
14 value: Vec<u8>,
15 mod_revision: Option<i64>,
16 version: Option<i64>,
17 },
18 Delete {
19 key: Vec<u8>,
20 mod_revision: Option<i64>,
21 version: Option<i64>,
22 },
23}
24
25pub struct WatchStream {
26 inner: Pin<Box<dyn Stream<Item = Result<WatchEvent>> + Send>>,
27}
28
29impl Stream for WatchStream {
30 type Item = Result<WatchEvent>;
31
32 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33 self.inner.as_mut().poll_next(cx)
34 }
35}
36
37impl WatchStream {
38 pub fn new(stream: impl Stream<Item = Result<WatchEvent>> + Send + 'static) -> Self {
39 Self {
40 inner: Box::pin(stream),
41 }
42 }
43 pub fn from_broadcast(rx: broadcast::Receiver<WatchEvent>) -> Self {
46 let stream = BroadcastStream::new(rx).filter_map(|result| {
47 futures::future::ready(match result {
48 Ok(event) => Some(Ok(event)),
49 Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
50 Some(Err(MetadataError::WatchError(format!(
51 "watch lagged by {} events — consumer should resync",
52 n
53 ))))
54 }
55 })
56 });
57 Self {
58 inner: Box::pin(stream),
59 }
60 }
61}
62
63impl fmt::Display for WatchEvent {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 match self {
66 WatchEvent::Put {
67 key,
68 value: _,
69 mod_revision,
70 version,
71 } => {
72 let key_str = String::from_utf8_lossy(key);
73 write!(f, "Put(key: {}", key_str)?;
74 if let Some(rev) = mod_revision {
75 write!(f, ", mod_revision: {}", rev)?;
76 }
77 if let Some(ver) = version {
78 write!(f, ", version: {}", ver)?;
79 }
80 write!(f, ")")
81 }
82 WatchEvent::Delete {
83 key,
84 mod_revision,
85 version,
86 } => {
87 let key_str = String::from_utf8_lossy(key);
88 write!(f, "Delete(key: {}", key_str)?;
89 if let Some(rev) = mod_revision {
90 write!(f, ", mod_revision: {}", rev)?;
91 }
92 if let Some(ver) = version {
93 write!(f, ", version: {}", ver)?;
94 }
95 write!(f, ")")
96 }
97 }
98 }
99}