1use super::{Client, Context, Error};
2use regex::Regex;
3use serde_json::Value;
4use std::sync::mpsc;
5use std::sync::mpsc::Receiver;
6
7pub struct Consumer {
8 #[allow(unused)]
9 client: Client,
10 #[allow(unused)]
11 context: Context,
12 #[allow(unused)]
13 pub profile: String,
14}
15
16#[derive(Clone, Debug, Default)]
17pub struct ChangeEvent {
18 #[allow(unused)]
19 pub connection: String,
20 #[allow(unused)]
21 pub property: String,
22 #[allow(unused)]
23 pub value: Value,
24}
25
26impl Consumer {
27 pub(crate) fn new(client: Client, context: Context, profile: String) -> Self {
28 Self {
29 client,
30 context,
31 profile,
32 }
33 }
34
35 fn profile_key_prefix(&self) -> String {
36 let system_id = self.context.node.system.id.to_string();
37 let node_id = &self.context.node.id;
38 let context_id = &self.context.id;
39 let profile = &self.profile;
40 format!("cns/{system_id}/nodes/{node_id}/contexts/{context_id}/consumer/{profile}/")
41 }
42
43 fn property_key(&self, property: &str) -> String {
44 let profile_key_prefix = self.profile_key_prefix();
45 format!("{profile_key_prefix}properties/{property}")
46 }
47
48 pub fn get(&self, property: &str, default_value: Option<Value>) -> Result<Option<Value>, Error> {
49 let key = self.property_key(property);
50 self.client.get(&key, default_value)
51 }
52
53 pub fn put(&self, property: &str, value: &str) -> Result<(), Error> {
54 let key = self.property_key(property);
55 let mut client = self.client.clone();
56 client.put(&key, value)
57 }
58
59 pub fn watch(&self) -> Result<Receiver<ChangeEvent>, Error> {
60 let key_prefix = self.profile_key_prefix();
61 let mut client = self.client.clone();
62 let upstream_rx = client.on_update()?;
63 let (tx, rx) = mpsc::channel();
64 let re = Regex::new(r"connections/(\w+)/properties/(\w+)$")?;
65
66 let keys = client.keys()?;
67 std::thread::spawn(move || {
68 for (k, v) in keys.iter() {
70 if !k.starts_with(&key_prefix) {
71 continue;
72 }
73 if let Some(captures) = re.captures(k) {
74 let connection = captures[1].to_string();
75 let property = captures[2].to_string();
76 let value = v.clone();
77 let change_event = ChangeEvent {
78 connection,
79 property,
80 value,
81 };
82 tx.send(change_event).unwrap();
83 };
84 }
85
86 for event in upstream_rx {
88 for (k, v) in event.keys.iter() {
89 if !k.starts_with(&key_prefix) {
90 continue;
91 }
92 if let Some(captures) = re.captures(k) {
93 let connection = captures[1].to_string();
94 let property = captures[2].to_string();
95 let value = v.clone();
96 let change_event = ChangeEvent {
97 connection,
98 property,
99 value,
100 };
101 tx.send(change_event).unwrap();
102 };
103 }
104 }
105 });
106 Ok(rx)
107 }
108}