1use core::future::Future;
2use rs_consul::{
3 types::{ReadKeyRequest, ReadKeyResponse},
4 Config, Consul, ConsulError,
5};
6use thiserror::Error;
7use tokio::time::{sleep, Duration};
8
9const MIN_ERROR_BACKOFF_MS: u64 = 1000;
11
12#[derive(Error, Debug)]
13pub enum WatcherError {
14 #[error(transparent)]
15 Consul(#[from] ConsulError),
16}
17
18pub struct Watcher {
19 client: Consul,
20 path: String,
21}
22
23impl Watcher {
24 pub fn new(path: String) -> Self {
25 Self {
26 client: Consul::new(Config::from_env()),
27 path,
28 }
29 }
30 pub async fn run<F, Fut>(&self, callback: F)
31 where
32 F: Fn(Vec<ReadKeyResponse>) -> Fut,
33 Fut: Future<Output = ()>,
34 {
35 let mut query = ReadKeyRequest::default();
36
37 let backoff = Duration::from_millis(MIN_ERROR_BACKOFF_MS);
38
39 loop {
40 query.key = &self.path;
41 match self.client.read_key(query.clone()).await {
42 Ok(responses) => {
43 if let Some(response) = responses.first() {
44 query.index = response.modify_index.try_into().ok();
49 }
50 callback(responses).await;
51 }
52 Err(e) => {
53 tracing::error!("{:?}", e);
54 sleep(backoff).await;
55 }
56 }
57 }
58 }
59}
60
61#[cfg(test)]
62mod tests {
63 #[test]
64 fn it_works() {
65 let result = 2 + 2;
66 assert_eq!(result, 4);
67 }
68}