Skip to main content

haystack_server/
his_store.rs

1//! In-memory time-series store for historical data (hisRead / hisWrite).
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, FixedOffset};
6use parking_lot::RwLock;
7
8use async_trait::async_trait;
9use haystack_core::kinds::Kind;
10
11use crate::his_provider::HistoryProvider;
12
13const MAX_ITEMS_PER_SERIES: usize = 1_000_000;
14
15/// A single historical data point: a timestamp and a value.
16#[derive(Debug, Clone)]
17pub struct HisItem {
18    pub ts: DateTime<FixedOffset>,
19    pub val: Kind,
20}
21
22/// Thread-safe in-memory time-series store.
23///
24/// Maps entity IDs to sorted vectors of `HisItem`, ordered by timestamp.
25pub struct HisStore {
26    items: RwLock<HashMap<String, Vec<HisItem>>>,
27}
28
29impl Default for HisStore {
30    fn default() -> Self {
31        Self {
32            items: RwLock::new(HashMap::new()),
33        }
34    }
35}
36
37impl HisStore {
38    /// Create a new empty history store.
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    /// Write history items for a given point ID.
44    ///
45    /// Items are merged into the existing series and the result is kept
46    /// sorted by timestamp. Duplicate timestamps are replaced.
47    pub fn write(&self, id: &str, new_items: Vec<HisItem>) {
48        let mut map = self.items.write();
49        let series = map.entry(id.to_string()).or_default();
50
51        for item in new_items {
52            // Check if there is already an entry with this exact timestamp.
53            match series.binary_search_by(|probe| probe.ts.cmp(&item.ts)) {
54                Ok(pos) => {
55                    // Replace existing entry at same timestamp.
56                    series[pos] = item;
57                }
58                Err(pos) => {
59                    // Insert at the correct sorted position.
60                    series.insert(pos, item);
61                }
62            }
63        }
64
65        // Enforce per-series size cap by dropping oldest entries.
66        if series.len() > MAX_ITEMS_PER_SERIES {
67            let excess = series.len() - MAX_ITEMS_PER_SERIES;
68            series.drain(..excess);
69        }
70    }
71
72    /// Read history items for a point, optionally bounded by start/end.
73    ///
74    /// Both bounds are inclusive. If `start` is `None`, reads from the
75    /// beginning. If `end` is `None`, reads to the end.
76    pub fn read(
77        &self,
78        id: &str,
79        start: Option<DateTime<FixedOffset>>,
80        end: Option<DateTime<FixedOffset>>,
81    ) -> Vec<HisItem> {
82        let map = self.items.read();
83        let series = match map.get(id) {
84            Some(s) => s,
85            None => return Vec::new(),
86        };
87
88        series
89            .iter()
90            .filter(|item| {
91                if let Some(ref s) = start
92                    && item.ts < *s
93                {
94                    return false;
95                }
96                if let Some(ref e) = end
97                    && item.ts > *e
98                {
99                    return false;
100                }
101                true
102            })
103            .cloned()
104            .collect()
105    }
106
107    /// Whether the store has no items for the given point.
108    pub fn is_empty(&self, id: &str) -> bool {
109        self.len(id) == 0
110    }
111
112    /// Return the count of history items stored for a given point.
113    pub fn len(&self, id: &str) -> usize {
114        let map = self.items.read();
115        map.get(id).map_or(0, |s| s.len())
116    }
117}
118
119#[async_trait]
120impl HistoryProvider for HisStore {
121    async fn his_read(
122        &self,
123        id: &str,
124        start: Option<DateTime<FixedOffset>>,
125        end: Option<DateTime<FixedOffset>>,
126    ) -> Vec<HisItem> {
127        self.read(id, start, end)
128    }
129
130    async fn his_write(&self, id: &str, items: Vec<HisItem>) {
131        self.write(id, items);
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use chrono::TimeZone;
139    use haystack_core::kinds::Number;
140
141    /// Helper: build a DateTime<FixedOffset> at UTC for the given date/hour.
142    fn utc_dt(year: i32, month: u32, day: u32, hour: u32) -> DateTime<FixedOffset> {
143        let offset = FixedOffset::east_opt(0).unwrap();
144        offset
145            .with_ymd_and_hms(year, month, day, hour, 0, 0)
146            .unwrap()
147    }
148
149    fn num_item(ts: DateTime<FixedOffset>, v: f64) -> HisItem {
150        HisItem {
151            ts,
152            val: Kind::Number(Number::unitless(v)),
153        }
154    }
155
156    #[test]
157    fn write_and_read_back() {
158        let store = HisStore::new();
159        let ts1 = utc_dt(2024, 6, 1, 10);
160        let ts2 = utc_dt(2024, 6, 1, 11);
161        store.write("p1", vec![num_item(ts1, 72.0), num_item(ts2, 73.5)]);
162
163        let items = store.read("p1", None, None);
164        assert_eq!(items.len(), 2);
165        assert_eq!(items[0].ts, ts1);
166        assert_eq!(items[1].ts, ts2);
167    }
168
169    #[test]
170    fn range_query() {
171        let store = HisStore::new();
172        let ts1 = utc_dt(2024, 6, 1, 8);
173        let ts2 = utc_dt(2024, 6, 1, 10);
174        let ts3 = utc_dt(2024, 6, 1, 12);
175        let ts4 = utc_dt(2024, 6, 1, 14);
176        store.write(
177            "p1",
178            vec![
179                num_item(ts1, 70.0),
180                num_item(ts2, 72.0),
181                num_item(ts3, 74.0),
182                num_item(ts4, 76.0),
183            ],
184        );
185
186        // Query for items between 10:00 and 12:00 inclusive.
187        let items = store.read("p1", Some(ts2), Some(ts3));
188        assert_eq!(items.len(), 2);
189        assert_eq!(items[0].ts, ts2);
190        assert_eq!(items[1].ts, ts3);
191    }
192
193    #[test]
194    fn empty_read_returns_empty() {
195        let store = HisStore::new();
196
197        // Read from a point that was never written.
198        let items = store.read("nonexistent", None, None);
199        assert!(items.is_empty());
200        assert_eq!(store.len("nonexistent"), 0);
201    }
202
203    #[test]
204    fn multiple_points_are_independent() {
205        let store = HisStore::new();
206        let ts1 = utc_dt(2024, 1, 1, 0);
207        let ts2 = utc_dt(2024, 1, 2, 0);
208
209        store.write("temp", vec![num_item(ts1, 68.0)]);
210        store.write("humidity", vec![num_item(ts2, 55.0)]);
211
212        assert_eq!(store.len("temp"), 1);
213        assert_eq!(store.len("humidity"), 1);
214
215        let temp_items = store.read("temp", None, None);
216        assert_eq!(temp_items.len(), 1);
217        assert_eq!(temp_items[0].ts, ts1);
218
219        let hum_items = store.read("humidity", None, None);
220        assert_eq!(hum_items.len(), 1);
221        assert_eq!(hum_items[0].ts, ts2);
222    }
223
224    #[test]
225    fn sorted_order_maintained() {
226        let store = HisStore::new();
227        let ts1 = utc_dt(2024, 3, 1, 12);
228        let ts2 = utc_dt(2024, 3, 1, 8);
229        let ts3 = utc_dt(2024, 3, 1, 16);
230        let ts4 = utc_dt(2024, 3, 1, 10);
231
232        // Write out of order.
233        store.write("p1", vec![num_item(ts1, 1.0), num_item(ts3, 3.0)]);
234        store.write("p1", vec![num_item(ts2, 2.0), num_item(ts4, 4.0)]);
235
236        let items = store.read("p1", None, None);
237        assert_eq!(items.len(), 4);
238        // Verify strictly ascending order.
239        for window in items.windows(2) {
240            assert!(window[0].ts < window[1].ts);
241        }
242        assert_eq!(items[0].ts, ts2); // 08:00
243        assert_eq!(items[1].ts, ts4); // 10:00
244        assert_eq!(items[2].ts, ts1); // 12:00
245        assert_eq!(items[3].ts, ts3); // 16:00
246    }
247
248    #[test]
249    fn duplicate_timestamp_replaces_value() {
250        let store = HisStore::new();
251        let ts = utc_dt(2024, 6, 1, 10);
252
253        store.write("p1", vec![num_item(ts, 72.0)]);
254        store.write("p1", vec![num_item(ts, 99.0)]);
255
256        let items = store.read("p1", None, None);
257        assert_eq!(items.len(), 1);
258        assert_eq!(items[0].val, Kind::Number(Number::unitless(99.0)));
259    }
260}