Skip to main content

lilo_rm_client/
event_watcher.rs

1use lilo_rm_core::{EventBatch, EventCursor, EventsRequest};
2
3use crate::{ClientError, RuntimeClient};
4
5const DEFAULT_WAIT_MS: u32 = 30_000;
6
7/// Builder for [`EventWatcher`].
8#[derive(Clone, Debug, Default)]
9pub struct EventWatcherBuilder {
10    cursor: Option<EventCursor>,
11    wait_ms: Option<u32>,
12}
13
14impl EventWatcherBuilder {
15    /// Resume watching from `cursor`.
16    pub fn since(mut self, cursor: EventCursor) -> Self {
17        self.cursor = Some(cursor);
18        self
19    }
20
21    /// Set the long poll wait window in milliseconds.
22    pub fn wait_ms(mut self, ms: u32) -> Self {
23        self.wait_ms = Some(ms);
24        self
25    }
26
27    /// Build a watcher without touching the daemon.
28    pub fn build(self, client: RuntimeClient) -> EventWatcher {
29        EventWatcher {
30            client,
31            cursor: self.cursor,
32            wait_ms: self.wait_ms.or(Some(DEFAULT_WAIT_MS)),
33        }
34    }
35
36    /// Check daemon protocol compatibility, then build a watcher.
37    pub async fn connect(self, client: RuntimeClient) -> Result<EventWatcher, ClientError> {
38        client.check_protocol_version().await?;
39        Ok(self.build(client))
40    }
41}
42
43/// Long poll event consumer with caller visible cursor state.
44#[derive(Clone, Debug)]
45pub struct EventWatcher {
46    client: RuntimeClient,
47    cursor: Option<EventCursor>,
48    wait_ms: Option<u32>,
49}
50
51impl EventWatcher {
52    /// Start building an event watcher.
53    pub fn builder() -> EventWatcherBuilder {
54        EventWatcherBuilder::default()
55    }
56
57    /// Return the cursor from the last successful batch.
58    ///
59    /// Callers persist this value after applying each batch, then pass it to
60    /// [`EventWatcherBuilder::since`] when rebuilding a watcher.
61    pub fn current_cursor(&self) -> Option<&EventCursor> {
62        self.cursor.as_ref()
63    }
64
65    /// Override the next request cursor.
66    pub fn seek(&mut self, cursor: EventCursor) {
67        self.cursor = Some(cursor);
68    }
69
70    /// Fetch the next event batch and advance this watcher's cursor.
71    pub async fn next(&mut self) -> Result<EventBatch, ClientError> {
72        let batch = self
73            .client
74            .events(EventsRequest {
75                since: self.cursor,
76                wait_ms: self.wait_ms,
77            })
78            .await?;
79        self.update_cursor(&batch);
80        Ok(batch)
81    }
82
83    fn update_cursor(&mut self, batch: &EventBatch) {
84        match batch {
85            EventBatch::Events { cursor, .. } => {
86                self.cursor = Some(*cursor);
87            }
88            EventBatch::CursorExpired { oldest } => {
89                self.cursor = Some(*oldest);
90            }
91            _ => {}
92        }
93    }
94}