Skip to main content

hermod/dispatcher/backend/
datapoint.rs

1//! Datapoint backend — stores named data points for on-demand retrieval
2//!
3//! The `DatapointBackend` stores the most recent machine-JSON value for each
4//! namespace key in a shared [`DataPointStore`].  The forwarder's DataPoint
5//! mini-protocol handler reads from the same store when the acceptor queries a
6//! named data point.
7//!
8//! # Usage
9//!
10//! ```no_run
11//! use hermod::dispatcher::backend::datapoint::{DatapointBackend, DataPointStore};
12//! use hermod::forwarder::TraceForwarder;
13//! use std::sync::Arc;
14//!
15//! let store = DataPointStore::new();
16//! let backend = DatapointBackend::with_store(store.clone());
17//! let forwarder = TraceForwarder::new(Default::default())
18//!     .with_datapoint_store(store);
19//! // Wire `backend` into your Dispatcher and `forwarder` into your app.
20//! ```
21
22use super::{Backend, DispatchMessage};
23use anyhow::Result;
24use async_trait::async_trait;
25use std::collections::HashMap;
26use std::sync::{Arc, RwLock};
27
28/// Shared in-memory store for named data points.
29///
30/// Written by [`DatapointBackend::dispatch`] (keyed by the dot-joined
31/// namespace of the trace object) and read by the forwarder's DataPoint
32/// mini-protocol handler when the acceptor queries a named data point.
33///
34/// `DataPointStore` is cheap to clone — all clones share the same underlying
35/// `HashMap`.
36#[derive(Clone, Default)]
37pub struct DataPointStore {
38    inner: Arc<RwLock<HashMap<String, Vec<u8>>>>,
39}
40
41impl DataPointStore {
42    /// Create a new empty store
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Store a raw JSON value under `name`
48    pub fn put(&self, name: &str, value: Vec<u8>) {
49        self.inner.write().unwrap().insert(name.to_string(), value);
50    }
51
52    /// Retrieve the value stored under `name`
53    pub fn get(&self, name: &str) -> Option<Vec<u8>> {
54        self.inner.read().unwrap().get(name).cloned()
55    }
56}
57
58/// Backend for the DataPoint protocol.
59///
60/// Each dispatched message is stored in the [`DataPointStore`] under its
61/// dot-joined namespace key, overwriting any previous value for that key.
62///
63/// When no store is configured (the default created by
64/// [`DispatcherBuilder::with_default_backends`]), messages are silently
65/// discarded.  Use [`DatapointBackend::with_store`] with a shared
66/// [`DataPointStore`] to enable full DataPoint support.
67pub struct DatapointBackend {
68    store: Option<DataPointStore>,
69}
70
71impl DatapointBackend {
72    /// Create a no-op backend (all messages are silently discarded)
73    pub fn new() -> Self {
74        Self { store: None }
75    }
76
77    /// Create a backend that stores messages in `store`
78    pub fn with_store(store: DataPointStore) -> Self {
79        Self { store: Some(store) }
80    }
81}
82
83impl Default for DatapointBackend {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89#[async_trait]
90impl Backend for DatapointBackend {
91    async fn dispatch(&self, msg: &DispatchMessage) -> Result<()> {
92        if let Some(store) = &self.store {
93            let key = msg.trace_object.to_namespace.join(".");
94            let value = msg.trace_object.to_machine.as_bytes().to_vec();
95            store.put(&key, value);
96        }
97        Ok(())
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::protocol::types::{DetailLevel, Severity, TraceObject};
105    use chrono::Utc;
106    use serde_json::json;
107
108    fn make_msg(namespace: Vec<&str>, machine: &str) -> DispatchMessage {
109        DispatchMessage {
110            trace_object: TraceObject {
111                to_human: None,
112                to_machine: machine.to_string(),
113                to_namespace: namespace.into_iter().map(str::to_string).collect(),
114                to_severity: Severity::Info,
115                to_details: DetailLevel::DNormal,
116                to_timestamp: Utc::now(),
117                to_hostname: "host".to_string(),
118                to_thread_id: "1".to_string(),
119            },
120            human: String::new(),
121            machine: json!({}),
122            metrics: vec![],
123            detail: DetailLevel::DNormal,
124        }
125    }
126
127    #[test]
128    fn store_put_get_round_trip() {
129        let store = DataPointStore::new();
130        store.put("Foo.Bar", b"hello".to_vec());
131        assert_eq!(store.get("Foo.Bar"), Some(b"hello".to_vec()));
132    }
133
134    #[test]
135    fn store_missing_key_returns_none() {
136        let store = DataPointStore::new();
137        assert_eq!(store.get("Missing"), None);
138    }
139
140    #[test]
141    fn store_overwrite_replaces_value() {
142        let store = DataPointStore::new();
143        store.put("k", b"first".to_vec());
144        store.put("k", b"second".to_vec());
145        assert_eq!(store.get("k"), Some(b"second".to_vec()));
146    }
147
148    #[test]
149    fn store_clone_shares_underlying_data() {
150        let store = DataPointStore::new();
151        let clone = store.clone();
152        store.put("k", b"v".to_vec());
153        assert_eq!(clone.get("k"), Some(b"v".to_vec()));
154    }
155
156    #[tokio::test]
157    async fn no_op_backend_discards_messages() {
158        let backend = DatapointBackend::new();
159        let msg = make_msg(vec!["Foo", "Bar"], r#"{"x":1}"#);
160        backend.dispatch(&msg).await.unwrap();
161        // No assertion — confirms no panic and Ok result
162    }
163
164    #[tokio::test]
165    async fn with_store_saves_under_dotjoined_namespace() {
166        let store = DataPointStore::new();
167        let backend = DatapointBackend::with_store(store.clone());
168        let msg = make_msg(vec!["Foo", "Bar"], r#"{"x":1}"#);
169        backend.dispatch(&msg).await.unwrap();
170        assert_eq!(store.get("Foo.Bar"), Some(r#"{"x":1}"#.as_bytes().to_vec()));
171    }
172
173    #[tokio::test]
174    async fn with_store_single_segment_namespace() {
175        let store = DataPointStore::new();
176        let backend = DatapointBackend::with_store(store.clone());
177        backend
178            .dispatch(&make_msg(vec!["NodeInfo"], r#"{"niName":"n1"}"#))
179            .await
180            .unwrap();
181        assert_eq!(
182            store.get("NodeInfo"),
183            Some(r#"{"niName":"n1"}"#.as_bytes().to_vec())
184        );
185    }
186
187    #[tokio::test]
188    async fn with_store_overwrites_previous_value() {
189        let store = DataPointStore::new();
190        let backend = DatapointBackend::with_store(store.clone());
191        backend
192            .dispatch(&make_msg(vec!["A"], r#"{"v":1}"#))
193            .await
194            .unwrap();
195        backend
196            .dispatch(&make_msg(vec!["A"], r#"{"v":2}"#))
197            .await
198            .unwrap();
199        assert_eq!(store.get("A"), Some(r#"{"v":2}"#.as_bytes().to_vec()));
200    }
201}