hermod/dispatcher/backend/
datapoint.rs1use super::{Backend, DispatchMessage};
23use anyhow::Result;
24use async_trait::async_trait;
25use std::collections::HashMap;
26use std::sync::{Arc, RwLock};
27
28#[derive(Clone, Default)]
37pub struct DataPointStore {
38 inner: Arc<RwLock<HashMap<String, Vec<u8>>>>,
39}
40
41impl DataPointStore {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn put(&self, name: &str, value: Vec<u8>) {
49 self.inner.write().unwrap().insert(name.to_string(), value);
50 }
51
52 pub fn get(&self, name: &str) -> Option<Vec<u8>> {
54 self.inner.read().unwrap().get(name).cloned()
55 }
56}
57
58pub struct DatapointBackend {
68 store: Option<DataPointStore>,
69}
70
71impl DatapointBackend {
72 pub fn new() -> Self {
74 Self { store: None }
75 }
76
77 pub fn with_store(store: DataPointStore) -> Self {
79 Self { store: Some(store) }
80 }
81}
82
83impl Default for DatapointBackend {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89#[async_trait]
90impl Backend for DatapointBackend {
91 async fn dispatch(&self, msg: &DispatchMessage) -> Result<()> {
92 if let Some(store) = &self.store {
93 let key = msg.trace_object.to_namespace.join(".");
94 let value = msg.trace_object.to_machine.as_bytes().to_vec();
95 store.put(&key, value);
96 }
97 Ok(())
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::protocol::types::{DetailLevel, Severity, TraceObject};
105 use chrono::Utc;
106 use serde_json::json;
107
108 fn make_msg(namespace: Vec<&str>, machine: &str) -> DispatchMessage {
109 DispatchMessage {
110 trace_object: TraceObject {
111 to_human: None,
112 to_machine: machine.to_string(),
113 to_namespace: namespace.into_iter().map(str::to_string).collect(),
114 to_severity: Severity::Info,
115 to_details: DetailLevel::DNormal,
116 to_timestamp: Utc::now(),
117 to_hostname: "host".to_string(),
118 to_thread_id: "1".to_string(),
119 },
120 human: String::new(),
121 machine: json!({}),
122 metrics: vec![],
123 detail: DetailLevel::DNormal,
124 }
125 }
126
127 #[test]
128 fn store_put_get_round_trip() {
129 let store = DataPointStore::new();
130 store.put("Foo.Bar", b"hello".to_vec());
131 assert_eq!(store.get("Foo.Bar"), Some(b"hello".to_vec()));
132 }
133
134 #[test]
135 fn store_missing_key_returns_none() {
136 let store = DataPointStore::new();
137 assert_eq!(store.get("Missing"), None);
138 }
139
140 #[test]
141 fn store_overwrite_replaces_value() {
142 let store = DataPointStore::new();
143 store.put("k", b"first".to_vec());
144 store.put("k", b"second".to_vec());
145 assert_eq!(store.get("k"), Some(b"second".to_vec()));
146 }
147
148 #[test]
149 fn store_clone_shares_underlying_data() {
150 let store = DataPointStore::new();
151 let clone = store.clone();
152 store.put("k", b"v".to_vec());
153 assert_eq!(clone.get("k"), Some(b"v".to_vec()));
154 }
155
156 #[tokio::test]
157 async fn no_op_backend_discards_messages() {
158 let backend = DatapointBackend::new();
159 let msg = make_msg(vec!["Foo", "Bar"], r#"{"x":1}"#);
160 backend.dispatch(&msg).await.unwrap();
161 }
163
164 #[tokio::test]
165 async fn with_store_saves_under_dotjoined_namespace() {
166 let store = DataPointStore::new();
167 let backend = DatapointBackend::with_store(store.clone());
168 let msg = make_msg(vec!["Foo", "Bar"], r#"{"x":1}"#);
169 backend.dispatch(&msg).await.unwrap();
170 assert_eq!(store.get("Foo.Bar"), Some(r#"{"x":1}"#.as_bytes().to_vec()));
171 }
172
173 #[tokio::test]
174 async fn with_store_single_segment_namespace() {
175 let store = DataPointStore::new();
176 let backend = DatapointBackend::with_store(store.clone());
177 backend
178 .dispatch(&make_msg(vec!["NodeInfo"], r#"{"niName":"n1"}"#))
179 .await
180 .unwrap();
181 assert_eq!(
182 store.get("NodeInfo"),
183 Some(r#"{"niName":"n1"}"#.as_bytes().to_vec())
184 );
185 }
186
187 #[tokio::test]
188 async fn with_store_overwrites_previous_value() {
189 let store = DataPointStore::new();
190 let backend = DatapointBackend::with_store(store.clone());
191 backend
192 .dispatch(&make_msg(vec!["A"], r#"{"v":1}"#))
193 .await
194 .unwrap();
195 backend
196 .dispatch(&make_msg(vec!["A"], r#"{"v":2}"#))
197 .await
198 .unwrap();
199 assert_eq!(store.get("A"), Some(r#"{"v":2}"#.as_bytes().to_vec()));
200 }
201}