use std::sync::Arc;
use std::time::Duration;
use engine::Observer;
use sources_core::cdc::ChangeCapture;
use tokio::time::{MissedTickBehavior, interval};
pub(crate) async fn poll(
source: Arc<dyn ChangeCapture>,
observer: Arc<dyn Observer>,
period: Duration,
) {
let mut ticker = interval(period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
ticker.tick().await;
match source.lag().await {
Ok(Some(bytes)) => observer.on_slot_lag(bytes),
Ok(None) => tracing::trace!("source reports no lag yet"),
Err(error) => tracing::warn!(%error, "failed to poll source lag"),
}
}
}