arete_sdk/
provider.rs

1use super::{Client, Context, Error};
2use crate::consumer::ChangeEvent;
3use regex::Regex;
4use serde_json::Value;
5use std::sync::mpsc;
6use std::sync::mpsc::Receiver;
7
8pub struct Provider {
9    client: Client,
10    context: Context,
11    pub profile: String,
12}
13
14impl Provider {
15    pub(crate) fn new(client: Client, context: Context, profile: String) -> Self {
16        Self {
17            client,
18            context,
19            profile,
20        }
21    }
22
23    fn profile_key_prefix(&self) -> String {
24        let system_id = self.context.node.system.id.to_string();
25        let node_id = &self.context.node.id;
26        let context_id = &self.context.id;
27        let profile = &self.profile;
28        format!("cns/{system_id}/nodes/{node_id}/contexts/{context_id}/provider/{profile}/")
29    }
30
31    fn property_key(&self, property: &str) -> String {
32        let profile_key_prefix = self.profile_key_prefix();
33        format!("{profile_key_prefix}properties/{property}")
34    }
35
36    pub fn get(&self, property: &str, default_value: Option<Value>) -> Result<Option<Value>, Error> {
37        let key = self.property_key(property);
38        self.client.get(&key, default_value)
39    }
40
41    pub fn put(&self, property: &str, value: &str) -> Result<(), Error> {
42        let key = self.property_key(property);
43        let mut client = self.client.clone();
44        client.put(&key, value)
45    }
46
47    pub fn watch(&self) -> Result<Receiver<ChangeEvent>, Error> {
48        let key_prefix = self.profile_key_prefix();
49        let mut client = self.client.clone();
50        let upstream_rx = client.on_update()?;
51        let (tx, rx) = mpsc::channel();
52        let re = Regex::new(r"connections/(\w+)/properties/(\w+)$")?;
53
54        let keys = client.keys()?;
55        std::thread::spawn(move || {
56            // Start by notifying of existing cached properties
57            for (k, v) in keys.iter() {
58                if !k.starts_with(&key_prefix) {
59                    continue;
60                }
61                if let Some(captures) = re.captures(k) {
62                    let connection = captures[1].to_string();
63                    let property = captures[2].to_string();
64                    let value = v.clone();
65                    let change_event = ChangeEvent {
66                        connection,
67                        property,
68                        value,
69                    };
70                    tx.send(change_event).unwrap();
71                };
72            }
73
74            // Watch for future property changes
75            for event in upstream_rx {
76                for (k, v) in event.keys.iter() {
77                    if !k.starts_with(&key_prefix) {
78                        continue;
79                    }
80                    if let Some(captures) = re.captures(k) {
81                        let connection = captures[1].to_string();
82                        let property = captures[2].to_string();
83                        let value = v.clone();
84                        let change_event = ChangeEvent {
85                            connection,
86                            property,
87                            value,
88                        };
89                        tx.send(change_event).unwrap();
90                    };
91                }
92            }
93        });
94        Ok(rx)
95    }
96}