arete_sdk/
consumer.rs

1use super::{Client, Context, Error};
2use regex::Regex;
3use serde_json::Value;
4use std::sync::mpsc;
5use std::sync::mpsc::Receiver;
6
7pub struct Consumer {
8    #[allow(unused)]
9    client: Client,
10    #[allow(unused)]
11    context: Context,
12    #[allow(unused)]
13    pub profile: String,
14}
15
16#[derive(Clone, Debug, Default)]
17pub struct ChangeEvent {
18    #[allow(unused)]
19    pub connection: String,
20    #[allow(unused)]
21    pub property: String,
22    #[allow(unused)]
23    pub value: Value,
24}
25
26impl Consumer {
27    pub(crate) fn new(client: Client, context: Context, profile: String) -> Self {
28        Self {
29            client,
30            context,
31            profile,
32        }
33    }
34
35    fn profile_key_prefix(&self) -> String {
36        let system_id = self.context.node.system.id.to_string();
37        let node_id = &self.context.node.id;
38        let context_id = &self.context.id;
39        let profile = &self.profile;
40        format!("cns/{system_id}/nodes/{node_id}/contexts/{context_id}/consumer/{profile}/")
41    }
42
43    fn property_key(&self, property: &str) -> String {
44        let profile_key_prefix = self.profile_key_prefix();
45        format!("{profile_key_prefix}properties/{property}")
46    }
47
48    pub fn get(&self, property: &str, default_value: Option<Value>) -> Result<Option<Value>, Error> {
49        let key = self.property_key(property);
50        self.client.get(&key, default_value)
51    }
52
53    pub fn put(&self, property: &str, value: &str) -> Result<(), Error> {
54        let key = self.property_key(property);
55        let mut client = self.client.clone();
56        client.put(&key, value)
57    }
58
59    pub fn watch(&self) -> Result<Receiver<ChangeEvent>, Error> {
60        let key_prefix = self.profile_key_prefix();
61        let mut client = self.client.clone();
62        let upstream_rx = client.on_update()?;
63        let (tx, rx) = mpsc::channel();
64        let re = Regex::new(r"connections/(\w+)/properties/(\w+)$")?;
65
66        let keys = client.keys()?;
67        std::thread::spawn(move || {
68            // Start by notifying of existing cached properties
69            for (k, v) in keys.iter() {
70                if !k.starts_with(&key_prefix) {
71                    continue;
72                }
73                if let Some(captures) = re.captures(k) {
74                    let connection = captures[1].to_string();
75                    let property = captures[2].to_string();
76                    let value = v.clone();
77                    let change_event = ChangeEvent {
78                        connection,
79                        property,
80                        value,
81                    };
82                    tx.send(change_event).unwrap();
83                };
84            }
85
86            // Watch for future property changes
87            for event in upstream_rx {
88                for (k, v) in event.keys.iter() {
89                    if !k.starts_with(&key_prefix) {
90                        continue;
91                    }
92                    if let Some(captures) = re.captures(k) {
93                        let connection = captures[1].to_string();
94                        let property = captures[2].to_string();
95                        let value = v.clone();
96                        let change_event = ChangeEvent {
97                            connection,
98                            property,
99                            value,
100                        };
101                        tx.send(change_event).unwrap();
102                    };
103                }
104            }
105        });
106        Ok(rx)
107    }
108}