Skip to main content

haystack_server/
his_store.rs

1//! In-memory time-series store for historical data (hisRead / hisWrite).
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, FixedOffset};
6use parking_lot::RwLock;
7
8use haystack_core::kinds::Kind;
9
10use crate::his_provider::HistoryProvider;
11
12const MAX_ITEMS_PER_SERIES: usize = 1_000_000;
13
14/// A single historical data point: a timestamp and a value.
15#[derive(Debug, Clone)]
16pub struct HisItem {
17    pub ts: DateTime<FixedOffset>,
18    pub val: Kind,
19}
20
21/// Thread-safe in-memory time-series store.
22///
23/// Maps entity IDs to sorted vectors of `HisItem`, ordered by timestamp.
24pub struct HisStore {
25    items: RwLock<HashMap<String, Vec<HisItem>>>,
26}
27
28impl Default for HisStore {
29    fn default() -> Self {
30        Self {
31            items: RwLock::new(HashMap::new()),
32        }
33    }
34}
35
36impl HisStore {
37    /// Create a new empty history store.
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// Write history items for a given point ID.
43    ///
44    /// Items are merged into the existing series and the result is kept
45    /// sorted by timestamp. Duplicate timestamps are replaced.
46    pub fn write(&self, id: &str, new_items: Vec<HisItem>) {
47        let mut map = self.items.write();
48        let series = map.entry(id.to_string()).or_default();
49
50        for item in new_items {
51            // Check if there is already an entry with this exact timestamp.
52            match series.binary_search_by(|probe| probe.ts.cmp(&item.ts)) {
53                Ok(pos) => {
54                    // Replace existing entry at same timestamp.
55                    series[pos] = item;
56                }
57                Err(pos) => {
58                    // Insert at the correct sorted position.
59                    series.insert(pos, item);
60                }
61            }
62        }
63
64        // Enforce per-series size cap by dropping oldest entries.
65        if series.len() > MAX_ITEMS_PER_SERIES {
66            let excess = series.len() - MAX_ITEMS_PER_SERIES;
67            series.drain(..excess);
68        }
69    }
70
71    /// Read history items for a point, optionally bounded by start/end.
72    ///
73    /// Both bounds are inclusive. If `start` is `None`, reads from the
74    /// beginning. If `end` is `None`, reads to the end.
75    pub fn read(
76        &self,
77        id: &str,
78        start: Option<DateTime<FixedOffset>>,
79        end: Option<DateTime<FixedOffset>>,
80    ) -> Vec<HisItem> {
81        let map = self.items.read();
82        let series = match map.get(id) {
83            Some(s) => s,
84            None => return Vec::new(),
85        };
86
87        series
88            .iter()
89            .filter(|item| {
90                if let Some(ref s) = start
91                    && item.ts < *s
92                {
93                    return false;
94                }
95                if let Some(ref e) = end
96                    && item.ts > *e
97                {
98                    return false;
99                }
100                true
101            })
102            .cloned()
103            .collect()
104    }
105
106    /// Whether the store has no items for the given point.
107    pub fn is_empty(&self, id: &str) -> bool {
108        self.len(id) == 0
109    }
110
111    /// Return the count of history items stored for a given point.
112    pub fn len(&self, id: &str) -> usize {
113        let map = self.items.read();
114        map.get(id).map_or(0, |s| s.len())
115    }
116}
117
118impl HistoryProvider for HisStore {
119    fn his_read(
120        &self,
121        id: &str,
122        start: Option<DateTime<FixedOffset>>,
123        end: Option<DateTime<FixedOffset>>,
124    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<HisItem>> + Send + '_>> {
125        let result = self.read(id, start, end);
126        Box::pin(async move { result })
127    }
128
129    fn his_write(
130        &self,
131        id: &str,
132        items: Vec<HisItem>,
133    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
134        self.write(id, items);
135        Box::pin(async {})
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use chrono::TimeZone;
143    use haystack_core::kinds::Number;
144
145    /// Helper: build a DateTime<FixedOffset> at UTC for the given date/hour.
146    fn utc_dt(year: i32, month: u32, day: u32, hour: u32) -> DateTime<FixedOffset> {
147        let offset = FixedOffset::east_opt(0).unwrap();
148        offset
149            .with_ymd_and_hms(year, month, day, hour, 0, 0)
150            .unwrap()
151    }
152
153    fn num_item(ts: DateTime<FixedOffset>, v: f64) -> HisItem {
154        HisItem {
155            ts,
156            val: Kind::Number(Number::unitless(v)),
157        }
158    }
159
160    #[test]
161    fn write_and_read_back() {
162        let store = HisStore::new();
163        let ts1 = utc_dt(2024, 6, 1, 10);
164        let ts2 = utc_dt(2024, 6, 1, 11);
165        store.write("p1", vec![num_item(ts1, 72.0), num_item(ts2, 73.5)]);
166
167        let items = store.read("p1", None, None);
168        assert_eq!(items.len(), 2);
169        assert_eq!(items[0].ts, ts1);
170        assert_eq!(items[1].ts, ts2);
171    }
172
173    #[test]
174    fn range_query() {
175        let store = HisStore::new();
176        let ts1 = utc_dt(2024, 6, 1, 8);
177        let ts2 = utc_dt(2024, 6, 1, 10);
178        let ts3 = utc_dt(2024, 6, 1, 12);
179        let ts4 = utc_dt(2024, 6, 1, 14);
180        store.write(
181            "p1",
182            vec![
183                num_item(ts1, 70.0),
184                num_item(ts2, 72.0),
185                num_item(ts3, 74.0),
186                num_item(ts4, 76.0),
187            ],
188        );
189
190        // Query for items between 10:00 and 12:00 inclusive.
191        let items = store.read("p1", Some(ts2), Some(ts3));
192        assert_eq!(items.len(), 2);
193        assert_eq!(items[0].ts, ts2);
194        assert_eq!(items[1].ts, ts3);
195    }
196
197    #[test]
198    fn empty_read_returns_empty() {
199        let store = HisStore::new();
200
201        // Read from a point that was never written.
202        let items = store.read("nonexistent", None, None);
203        assert!(items.is_empty());
204        assert_eq!(store.len("nonexistent"), 0);
205    }
206
207    #[test]
208    fn multiple_points_are_independent() {
209        let store = HisStore::new();
210        let ts1 = utc_dt(2024, 1, 1, 0);
211        let ts2 = utc_dt(2024, 1, 2, 0);
212
213        store.write("temp", vec![num_item(ts1, 68.0)]);
214        store.write("humidity", vec![num_item(ts2, 55.0)]);
215
216        assert_eq!(store.len("temp"), 1);
217        assert_eq!(store.len("humidity"), 1);
218
219        let temp_items = store.read("temp", None, None);
220        assert_eq!(temp_items.len(), 1);
221        assert_eq!(temp_items[0].ts, ts1);
222
223        let hum_items = store.read("humidity", None, None);
224        assert_eq!(hum_items.len(), 1);
225        assert_eq!(hum_items[0].ts, ts2);
226    }
227
228    #[test]
229    fn sorted_order_maintained() {
230        let store = HisStore::new();
231        let ts1 = utc_dt(2024, 3, 1, 12);
232        let ts2 = utc_dt(2024, 3, 1, 8);
233        let ts3 = utc_dt(2024, 3, 1, 16);
234        let ts4 = utc_dt(2024, 3, 1, 10);
235
236        // Write out of order.
237        store.write("p1", vec![num_item(ts1, 1.0), num_item(ts3, 3.0)]);
238        store.write("p1", vec![num_item(ts2, 2.0), num_item(ts4, 4.0)]);
239
240        let items = store.read("p1", None, None);
241        assert_eq!(items.len(), 4);
242        // Verify strictly ascending order.
243        for window in items.windows(2) {
244            assert!(window[0].ts < window[1].ts);
245        }
246        assert_eq!(items[0].ts, ts2); // 08:00
247        assert_eq!(items[1].ts, ts4); // 10:00
248        assert_eq!(items[2].ts, ts1); // 12:00
249        assert_eq!(items[3].ts, ts3); // 16:00
250    }
251
252    #[test]
253    fn duplicate_timestamp_replaces_value() {
254        let store = HisStore::new();
255        let ts = utc_dt(2024, 6, 1, 10);
256
257        store.write("p1", vec![num_item(ts, 72.0)]);
258        store.write("p1", vec![num_item(ts, 99.0)]);
259
260        let items = store.read("p1", None, None);
261        assert_eq!(items.len(), 1);
262        assert_eq!(items[0].val, Kind::Number(Number::unitless(99.0)));
263    }
264}