consul_kv_trigger/
lib.rs

1use core::future::Future;
2use rs_consul::{
3    types::{ReadKeyRequest, ReadKeyResponse},
4    Config, Consul, ConsulError,
5};
6use thiserror::Error;
7use tokio::time::{sleep, Duration};
8
9// TODO make configurable
10const MIN_ERROR_BACKOFF_MS: u64 = 1000;
11
12#[derive(Error, Debug)]
13pub enum WatcherError {
14    #[error(transparent)]
15    Consul(#[from] ConsulError),
16}
17
18pub struct Watcher {
19    client: Consul,
20    path: String,
21}
22
23impl Watcher {
24    pub fn new(path: String) -> Self {
25        Self {
26            client: Consul::new(Config::from_env()),
27            path,
28        }
29    }
30    pub async fn run<F, Fut>(&self, callback: F)
31    where
32        F: Fn(Vec<ReadKeyResponse>) -> Fut,
33        Fut: Future<Output = ()>,
34    {
35        let mut query = ReadKeyRequest::default();
36
37        let backoff = Duration::from_millis(MIN_ERROR_BACKOFF_MS);
38
39        loop {
40            query.key = &self.path;
41            match self.client.read_key(query.clone()).await {
42                Ok(responses) => {
43                    if let Some(response) = responses.first() {
44                        // this should be the largest for the entire
45                        // prefix or a recursive query acconding to
46                        // documentation, so no need to take a max
47                        // over the vector
48                        query.index = response.modify_index.try_into().ok();
49                    }
50                    callback(responses).await;
51                }
52                Err(e) => {
53                    tracing::error!("{:?}", e);
54                    sleep(backoff).await;
55                }
56            }
57        }
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    #[test]
64    fn it_works() {
65        let result = 2 + 2;
66        assert_eq!(result, 4);
67    }
68}