1use std::collections::HashMap;
4
5use chrono::{DateTime, FixedOffset};
6use parking_lot::RwLock;
7
8use async_trait::async_trait;
9use haystack_core::kinds::Kind;
10
11use crate::his_provider::HistoryProvider;
12
13const MAX_ITEMS_PER_SERIES: usize = 1_000_000;
14
15#[derive(Debug, Clone)]
17pub struct HisItem {
18 pub ts: DateTime<FixedOffset>,
19 pub val: Kind,
20}
21
22pub struct HisStore {
26 items: RwLock<HashMap<String, Vec<HisItem>>>,
27}
28
29impl Default for HisStore {
30 fn default() -> Self {
31 Self {
32 items: RwLock::new(HashMap::new()),
33 }
34 }
35}
36
37impl HisStore {
38 pub fn new() -> Self {
40 Self::default()
41 }
42
43 pub fn write(&self, id: &str, new_items: Vec<HisItem>) {
48 let mut map = self.items.write();
49 let series = map.entry(id.to_string()).or_default();
50
51 for item in new_items {
52 match series.binary_search_by(|probe| probe.ts.cmp(&item.ts)) {
54 Ok(pos) => {
55 series[pos] = item;
57 }
58 Err(pos) => {
59 series.insert(pos, item);
61 }
62 }
63 }
64
65 if series.len() > MAX_ITEMS_PER_SERIES {
67 let excess = series.len() - MAX_ITEMS_PER_SERIES;
68 series.drain(..excess);
69 }
70 }
71
72 pub fn read(
77 &self,
78 id: &str,
79 start: Option<DateTime<FixedOffset>>,
80 end: Option<DateTime<FixedOffset>>,
81 ) -> Vec<HisItem> {
82 let map = self.items.read();
83 let series = match map.get(id) {
84 Some(s) => s,
85 None => return Vec::new(),
86 };
87
88 series
89 .iter()
90 .filter(|item| {
91 if let Some(ref s) = start
92 && item.ts < *s
93 {
94 return false;
95 }
96 if let Some(ref e) = end
97 && item.ts > *e
98 {
99 return false;
100 }
101 true
102 })
103 .cloned()
104 .collect()
105 }
106
107 pub fn is_empty(&self, id: &str) -> bool {
109 self.len(id) == 0
110 }
111
112 pub fn len(&self, id: &str) -> usize {
114 let map = self.items.read();
115 map.get(id).map_or(0, |s| s.len())
116 }
117}
118
119#[async_trait]
120impl HistoryProvider for HisStore {
121 async fn his_read(
122 &self,
123 id: &str,
124 start: Option<DateTime<FixedOffset>>,
125 end: Option<DateTime<FixedOffset>>,
126 ) -> Vec<HisItem> {
127 self.read(id, start, end)
128 }
129
130 async fn his_write(&self, id: &str, items: Vec<HisItem>) {
131 self.write(id, items);
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use chrono::TimeZone;
139 use haystack_core::kinds::Number;
140
141 fn utc_dt(year: i32, month: u32, day: u32, hour: u32) -> DateTime<FixedOffset> {
143 let offset = FixedOffset::east_opt(0).unwrap();
144 offset
145 .with_ymd_and_hms(year, month, day, hour, 0, 0)
146 .unwrap()
147 }
148
149 fn num_item(ts: DateTime<FixedOffset>, v: f64) -> HisItem {
150 HisItem {
151 ts,
152 val: Kind::Number(Number::unitless(v)),
153 }
154 }
155
156 #[test]
157 fn write_and_read_back() {
158 let store = HisStore::new();
159 let ts1 = utc_dt(2024, 6, 1, 10);
160 let ts2 = utc_dt(2024, 6, 1, 11);
161 store.write("p1", vec![num_item(ts1, 72.0), num_item(ts2, 73.5)]);
162
163 let items = store.read("p1", None, None);
164 assert_eq!(items.len(), 2);
165 assert_eq!(items[0].ts, ts1);
166 assert_eq!(items[1].ts, ts2);
167 }
168
169 #[test]
170 fn range_query() {
171 let store = HisStore::new();
172 let ts1 = utc_dt(2024, 6, 1, 8);
173 let ts2 = utc_dt(2024, 6, 1, 10);
174 let ts3 = utc_dt(2024, 6, 1, 12);
175 let ts4 = utc_dt(2024, 6, 1, 14);
176 store.write(
177 "p1",
178 vec![
179 num_item(ts1, 70.0),
180 num_item(ts2, 72.0),
181 num_item(ts3, 74.0),
182 num_item(ts4, 76.0),
183 ],
184 );
185
186 let items = store.read("p1", Some(ts2), Some(ts3));
188 assert_eq!(items.len(), 2);
189 assert_eq!(items[0].ts, ts2);
190 assert_eq!(items[1].ts, ts3);
191 }
192
193 #[test]
194 fn empty_read_returns_empty() {
195 let store = HisStore::new();
196
197 let items = store.read("nonexistent", None, None);
199 assert!(items.is_empty());
200 assert_eq!(store.len("nonexistent"), 0);
201 }
202
203 #[test]
204 fn multiple_points_are_independent() {
205 let store = HisStore::new();
206 let ts1 = utc_dt(2024, 1, 1, 0);
207 let ts2 = utc_dt(2024, 1, 2, 0);
208
209 store.write("temp", vec![num_item(ts1, 68.0)]);
210 store.write("humidity", vec![num_item(ts2, 55.0)]);
211
212 assert_eq!(store.len("temp"), 1);
213 assert_eq!(store.len("humidity"), 1);
214
215 let temp_items = store.read("temp", None, None);
216 assert_eq!(temp_items.len(), 1);
217 assert_eq!(temp_items[0].ts, ts1);
218
219 let hum_items = store.read("humidity", None, None);
220 assert_eq!(hum_items.len(), 1);
221 assert_eq!(hum_items[0].ts, ts2);
222 }
223
224 #[test]
225 fn sorted_order_maintained() {
226 let store = HisStore::new();
227 let ts1 = utc_dt(2024, 3, 1, 12);
228 let ts2 = utc_dt(2024, 3, 1, 8);
229 let ts3 = utc_dt(2024, 3, 1, 16);
230 let ts4 = utc_dt(2024, 3, 1, 10);
231
232 store.write("p1", vec![num_item(ts1, 1.0), num_item(ts3, 3.0)]);
234 store.write("p1", vec![num_item(ts2, 2.0), num_item(ts4, 4.0)]);
235
236 let items = store.read("p1", None, None);
237 assert_eq!(items.len(), 4);
238 for window in items.windows(2) {
240 assert!(window[0].ts < window[1].ts);
241 }
242 assert_eq!(items[0].ts, ts2); assert_eq!(items[1].ts, ts4); assert_eq!(items[2].ts, ts1); assert_eq!(items[3].ts, ts3); }
247
248 #[test]
249 fn duplicate_timestamp_replaces_value() {
250 let store = HisStore::new();
251 let ts = utc_dt(2024, 6, 1, 10);
252
253 store.write("p1", vec![num_item(ts, 72.0)]);
254 store.write("p1", vec![num_item(ts, 99.0)]);
255
256 let items = store.read("p1", None, None);
257 assert_eq!(items.len(), 1);
258 assert_eq!(items[0].val, Kind::Number(Number::unitless(99.0)));
259 }
260}