1use super::{Client, Context, Error};
2use crate::consumer::ChangeEvent;
3use regex::Regex;
4use serde_json::Value;
5use std::sync::mpsc;
6use std::sync::mpsc::Receiver;
7
8pub struct Provider {
9 client: Client,
10 context: Context,
11 pub profile: String,
12}
13
14impl Provider {
15 pub(crate) fn new(client: Client, context: Context, profile: String) -> Self {
16 Self {
17 client,
18 context,
19 profile,
20 }
21 }
22
23 fn profile_key_prefix(&self) -> String {
24 let system_id = self.context.node.system.id.to_string();
25 let node_id = &self.context.node.id;
26 let context_id = &self.context.id;
27 let profile = &self.profile;
28 format!("cns/{system_id}/nodes/{node_id}/contexts/{context_id}/provider/{profile}/")
29 }
30
31 fn property_key(&self, property: &str) -> String {
32 let profile_key_prefix = self.profile_key_prefix();
33 format!("{profile_key_prefix}properties/{property}")
34 }
35
36 pub fn get(&self, property: &str, default_value: Option<Value>) -> Result<Option<Value>, Error> {
37 let key = self.property_key(property);
38 self.client.get(&key, default_value)
39 }
40
41 pub fn put(&self, property: &str, value: &str) -> Result<(), Error> {
42 let key = self.property_key(property);
43 let mut client = self.client.clone();
44 client.put(&key, value)
45 }
46
47 pub fn watch(&self) -> Result<Receiver<ChangeEvent>, Error> {
48 let key_prefix = self.profile_key_prefix();
49 let mut client = self.client.clone();
50 let upstream_rx = client.on_update()?;
51 let (tx, rx) = mpsc::channel();
52 let re = Regex::new(r"connections/(\w+)/properties/(\w+)$")?;
53
54 let keys = client.keys()?;
55 std::thread::spawn(move || {
56 for (k, v) in keys.iter() {
58 if !k.starts_with(&key_prefix) {
59 continue;
60 }
61 if let Some(captures) = re.captures(k) {
62 let connection = captures[1].to_string();
63 let property = captures[2].to_string();
64 let value = v.clone();
65 let change_event = ChangeEvent {
66 connection,
67 property,
68 value,
69 };
70 tx.send(change_event).unwrap();
71 };
72 }
73
74 for event in upstream_rx {
76 for (k, v) in event.keys.iter() {
77 if !k.starts_with(&key_prefix) {
78 continue;
79 }
80 if let Some(captures) = re.captures(k) {
81 let connection = captures[1].to_string();
82 let property = captures[2].to_string();
83 let value = v.clone();
84 let change_event = ChangeEvent {
85 connection,
86 property,
87 value,
88 };
89 tx.send(change_event).unwrap();
90 };
91 }
92 }
93 });
94 Ok(rx)
95 }
96}