lilo_rm_client/
event_watcher.rs1use lilo_rm_core::{EventBatch, EventCursor, EventsRequest};
2
3use crate::{ClientError, RuntimeClient};
4
5const DEFAULT_WAIT_MS: u32 = 30_000;
6
7#[derive(Clone, Debug, Default)]
9pub struct EventWatcherBuilder {
10 cursor: Option<EventCursor>,
11 wait_ms: Option<u32>,
12}
13
14impl EventWatcherBuilder {
15 pub fn since(mut self, cursor: EventCursor) -> Self {
17 self.cursor = Some(cursor);
18 self
19 }
20
21 pub fn wait_ms(mut self, ms: u32) -> Self {
23 self.wait_ms = Some(ms);
24 self
25 }
26
27 pub fn build(self, client: RuntimeClient) -> EventWatcher {
29 EventWatcher {
30 client,
31 cursor: self.cursor,
32 wait_ms: self.wait_ms.or(Some(DEFAULT_WAIT_MS)),
33 }
34 }
35
36 pub async fn connect(self, client: RuntimeClient) -> Result<EventWatcher, ClientError> {
38 client.check_protocol_version().await?;
39 Ok(self.build(client))
40 }
41}
42
43#[derive(Clone, Debug)]
45pub struct EventWatcher {
46 client: RuntimeClient,
47 cursor: Option<EventCursor>,
48 wait_ms: Option<u32>,
49}
50
51impl EventWatcher {
52 pub fn builder() -> EventWatcherBuilder {
54 EventWatcherBuilder::default()
55 }
56
57 pub fn current_cursor(&self) -> Option<&EventCursor> {
62 self.cursor.as_ref()
63 }
64
65 pub fn seek(&mut self, cursor: EventCursor) {
67 self.cursor = Some(cursor);
68 }
69
70 pub async fn next(&mut self) -> Result<EventBatch, ClientError> {
72 let batch = self
73 .client
74 .events(EventsRequest {
75 since: self.cursor,
76 wait_ms: self.wait_ms,
77 })
78 .await?;
79 self.update_cursor(&batch);
80 Ok(batch)
81 }
82
83 fn update_cursor(&mut self, batch: &EventBatch) {
84 match batch {
85 EventBatch::Events { cursor, .. } => {
86 self.cursor = Some(*cursor);
87 }
88 EventBatch::CursorExpired { oldest } => {
89 self.cursor = Some(*oldest);
90 }
91 _ => {}
92 }
93 }
94}