use lilo_rm_core::{EventBatch, EventCursor, EventsRequest};
use crate::{ClientError, RuntimeClient};
const DEFAULT_WAIT_MS: u32 = 30_000;
#[derive(Clone, Debug, Default)]
pub struct EventWatcherBuilder {
cursor: Option<EventCursor>,
wait_ms: Option<u32>,
}
impl EventWatcherBuilder {
pub fn since(mut self, cursor: EventCursor) -> Self {
self.cursor = Some(cursor);
self
}
pub fn wait_ms(mut self, ms: u32) -> Self {
self.wait_ms = Some(ms);
self
}
pub fn build(self, client: RuntimeClient) -> EventWatcher {
EventWatcher {
client,
cursor: self.cursor,
wait_ms: self.wait_ms.or(Some(DEFAULT_WAIT_MS)),
}
}
pub async fn connect(self, client: RuntimeClient) -> Result<EventWatcher, ClientError> {
client.check_protocol_version().await?;
Ok(self.build(client))
}
}
#[derive(Clone, Debug)]
pub struct EventWatcher {
client: RuntimeClient,
cursor: Option<EventCursor>,
wait_ms: Option<u32>,
}
impl EventWatcher {
pub fn builder() -> EventWatcherBuilder {
EventWatcherBuilder::default()
}
pub fn current_cursor(&self) -> Option<&EventCursor> {
self.cursor.as_ref()
}
pub fn seek(&mut self, cursor: EventCursor) {
self.cursor = Some(cursor);
}
pub async fn next(&mut self) -> Result<EventBatch, ClientError> {
let batch = self
.client
.events(EventsRequest {
since: self.cursor,
wait_ms: self.wait_ms,
})
.await?;
self.update_cursor(&batch);
Ok(batch)
}
fn update_cursor(&mut self, batch: &EventBatch) {
match batch {
EventBatch::Events { cursor, .. } => {
self.cursor = Some(*cursor);
}
EventBatch::CursorExpired { oldest } => {
self.cursor = Some(*oldest);
}
_ => {}
}
}
}