Skip to main content

haystack_server/
his_store.rs

1//! In-memory time-series store for historical data (hisRead / hisWrite).
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, FixedOffset};
6use parking_lot::RwLock;
7
8use haystack_core::kinds::Kind;
9
10const MAX_ITEMS_PER_SERIES: usize = 1_000_000;
11
12/// A single historical data point: a timestamp and a value.
13#[derive(Debug, Clone)]
14pub struct HisItem {
15    pub ts: DateTime<FixedOffset>,
16    pub val: Kind,
17}
18
19/// Thread-safe in-memory time-series store.
20///
21/// Maps entity IDs to sorted vectors of `HisItem`, ordered by timestamp.
22pub struct HisStore {
23    items: RwLock<HashMap<String, Vec<HisItem>>>,
24}
25
26impl Default for HisStore {
27    fn default() -> Self {
28        Self {
29            items: RwLock::new(HashMap::new()),
30        }
31    }
32}
33
34impl HisStore {
35    /// Create a new empty history store.
36    pub fn new() -> Self {
37        Self::default()
38    }
39
40    /// Write history items for a given point ID.
41    ///
42    /// Items are merged into the existing series and the result is kept
43    /// sorted by timestamp. Duplicate timestamps are replaced.
44    pub fn write(&self, id: &str, new_items: Vec<HisItem>) {
45        let mut map = self.items.write();
46        let series = map.entry(id.to_string()).or_default();
47
48        for item in new_items {
49            // Check if there is already an entry with this exact timestamp.
50            match series.binary_search_by(|probe| probe.ts.cmp(&item.ts)) {
51                Ok(pos) => {
52                    // Replace existing entry at same timestamp.
53                    series[pos] = item;
54                }
55                Err(pos) => {
56                    // Insert at the correct sorted position.
57                    series.insert(pos, item);
58                }
59            }
60        }
61
62        // Enforce per-series size cap by dropping oldest entries.
63        if series.len() > MAX_ITEMS_PER_SERIES {
64            let excess = series.len() - MAX_ITEMS_PER_SERIES;
65            series.drain(..excess);
66        }
67    }
68
69    /// Read history items for a point, optionally bounded by start/end.
70    ///
71    /// Both bounds are inclusive. If `start` is `None`, reads from the
72    /// beginning. If `end` is `None`, reads to the end.
73    pub fn read(
74        &self,
75        id: &str,
76        start: Option<DateTime<FixedOffset>>,
77        end: Option<DateTime<FixedOffset>>,
78    ) -> Vec<HisItem> {
79        let map = self.items.read();
80        let series = match map.get(id) {
81            Some(s) => s,
82            None => return Vec::new(),
83        };
84
85        series
86            .iter()
87            .filter(|item| {
88                if let Some(ref s) = start
89                    && item.ts < *s
90                {
91                    return false;
92                }
93                if let Some(ref e) = end
94                    && item.ts > *e
95                {
96                    return false;
97                }
98                true
99            })
100            .cloned()
101            .collect()
102    }
103
104    /// Return the count of history items stored for a given point.
105    pub fn len(&self, id: &str) -> usize {
106        let map = self.items.read();
107        map.get(id).map_or(0, |s| s.len())
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use chrono::TimeZone;
115    use haystack_core::kinds::Number;
116
117    /// Helper: build a DateTime<FixedOffset> at UTC for the given date/hour.
118    fn utc_dt(year: i32, month: u32, day: u32, hour: u32) -> DateTime<FixedOffset> {
119        let offset = FixedOffset::east_opt(0).unwrap();
120        offset
121            .with_ymd_and_hms(year, month, day, hour, 0, 0)
122            .unwrap()
123    }
124
125    fn num_item(ts: DateTime<FixedOffset>, v: f64) -> HisItem {
126        HisItem {
127            ts,
128            val: Kind::Number(Number::unitless(v)),
129        }
130    }
131
132    #[test]
133    fn write_and_read_back() {
134        let store = HisStore::new();
135        let ts1 = utc_dt(2024, 6, 1, 10);
136        let ts2 = utc_dt(2024, 6, 1, 11);
137        store.write("p1", vec![num_item(ts1, 72.0), num_item(ts2, 73.5)]);
138
139        let items = store.read("p1", None, None);
140        assert_eq!(items.len(), 2);
141        assert_eq!(items[0].ts, ts1);
142        assert_eq!(items[1].ts, ts2);
143    }
144
145    #[test]
146    fn range_query() {
147        let store = HisStore::new();
148        let ts1 = utc_dt(2024, 6, 1, 8);
149        let ts2 = utc_dt(2024, 6, 1, 10);
150        let ts3 = utc_dt(2024, 6, 1, 12);
151        let ts4 = utc_dt(2024, 6, 1, 14);
152        store.write(
153            "p1",
154            vec![
155                num_item(ts1, 70.0),
156                num_item(ts2, 72.0),
157                num_item(ts3, 74.0),
158                num_item(ts4, 76.0),
159            ],
160        );
161
162        // Query for items between 10:00 and 12:00 inclusive.
163        let items = store.read("p1", Some(ts2), Some(ts3));
164        assert_eq!(items.len(), 2);
165        assert_eq!(items[0].ts, ts2);
166        assert_eq!(items[1].ts, ts3);
167    }
168
169    #[test]
170    fn empty_read_returns_empty() {
171        let store = HisStore::new();
172
173        // Read from a point that was never written.
174        let items = store.read("nonexistent", None, None);
175        assert!(items.is_empty());
176        assert_eq!(store.len("nonexistent"), 0);
177    }
178
179    #[test]
180    fn multiple_points_are_independent() {
181        let store = HisStore::new();
182        let ts1 = utc_dt(2024, 1, 1, 0);
183        let ts2 = utc_dt(2024, 1, 2, 0);
184
185        store.write("temp", vec![num_item(ts1, 68.0)]);
186        store.write("humidity", vec![num_item(ts2, 55.0)]);
187
188        assert_eq!(store.len("temp"), 1);
189        assert_eq!(store.len("humidity"), 1);
190
191        let temp_items = store.read("temp", None, None);
192        assert_eq!(temp_items.len(), 1);
193        assert_eq!(temp_items[0].ts, ts1);
194
195        let hum_items = store.read("humidity", None, None);
196        assert_eq!(hum_items.len(), 1);
197        assert_eq!(hum_items[0].ts, ts2);
198    }
199
200    #[test]
201    fn sorted_order_maintained() {
202        let store = HisStore::new();
203        let ts1 = utc_dt(2024, 3, 1, 12);
204        let ts2 = utc_dt(2024, 3, 1, 8);
205        let ts3 = utc_dt(2024, 3, 1, 16);
206        let ts4 = utc_dt(2024, 3, 1, 10);
207
208        // Write out of order.
209        store.write("p1", vec![num_item(ts1, 1.0), num_item(ts3, 3.0)]);
210        store.write("p1", vec![num_item(ts2, 2.0), num_item(ts4, 4.0)]);
211
212        let items = store.read("p1", None, None);
213        assert_eq!(items.len(), 4);
214        // Verify strictly ascending order.
215        for window in items.windows(2) {
216            assert!(window[0].ts < window[1].ts);
217        }
218        assert_eq!(items[0].ts, ts2); // 08:00
219        assert_eq!(items[1].ts, ts4); // 10:00
220        assert_eq!(items[2].ts, ts1); // 12:00
221        assert_eq!(items[3].ts, ts3); // 16:00
222    }
223
224    #[test]
225    fn duplicate_timestamp_replaces_value() {
226        let store = HisStore::new();
227        let ts = utc_dt(2024, 6, 1, 10);
228
229        store.write("p1", vec![num_item(ts, 72.0)]);
230        store.write("p1", vec![num_item(ts, 99.0)]);
231
232        let items = store.read("p1", None, None);
233        assert_eq!(items.len(), 1);
234        assert_eq!(items[0].val, Kind::Number(Number::unitless(99.0)));
235    }
236}