1use std::collections::HashMap;
4
5use chrono::{DateTime, FixedOffset};
6use parking_lot::RwLock;
7
8use haystack_core::kinds::Kind;
9
10use crate::his_provider::HistoryProvider;
11
12const MAX_ITEMS_PER_SERIES: usize = 1_000_000;
13
14#[derive(Debug, Clone)]
16pub struct HisItem {
17 pub ts: DateTime<FixedOffset>,
18 pub val: Kind,
19}
20
21pub struct HisStore {
25 items: RwLock<HashMap<String, Vec<HisItem>>>,
26}
27
28impl Default for HisStore {
29 fn default() -> Self {
30 Self {
31 items: RwLock::new(HashMap::new()),
32 }
33 }
34}
35
36impl HisStore {
37 pub fn new() -> Self {
39 Self::default()
40 }
41
42 pub fn write(&self, id: &str, new_items: Vec<HisItem>) {
47 let mut map = self.items.write();
48 let series = map.entry(id.to_string()).or_default();
49
50 for item in new_items {
51 match series.binary_search_by(|probe| probe.ts.cmp(&item.ts)) {
53 Ok(pos) => {
54 series[pos] = item;
56 }
57 Err(pos) => {
58 series.insert(pos, item);
60 }
61 }
62 }
63
64 if series.len() > MAX_ITEMS_PER_SERIES {
66 let excess = series.len() - MAX_ITEMS_PER_SERIES;
67 series.drain(..excess);
68 }
69 }
70
71 pub fn read(
76 &self,
77 id: &str,
78 start: Option<DateTime<FixedOffset>>,
79 end: Option<DateTime<FixedOffset>>,
80 ) -> Vec<HisItem> {
81 let map = self.items.read();
82 let series = match map.get(id) {
83 Some(s) => s,
84 None => return Vec::new(),
85 };
86
87 series
88 .iter()
89 .filter(|item| {
90 if let Some(ref s) = start
91 && item.ts < *s
92 {
93 return false;
94 }
95 if let Some(ref e) = end
96 && item.ts > *e
97 {
98 return false;
99 }
100 true
101 })
102 .cloned()
103 .collect()
104 }
105
106 pub fn is_empty(&self, id: &str) -> bool {
108 self.len(id) == 0
109 }
110
111 pub fn len(&self, id: &str) -> usize {
113 let map = self.items.read();
114 map.get(id).map_or(0, |s| s.len())
115 }
116}
117
118impl HistoryProvider for HisStore {
119 fn his_read(
120 &self,
121 id: &str,
122 start: Option<DateTime<FixedOffset>>,
123 end: Option<DateTime<FixedOffset>>,
124 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<HisItem>> + Send + '_>> {
125 let result = self.read(id, start, end);
126 Box::pin(async move { result })
127 }
128
129 fn his_write(
130 &self,
131 id: &str,
132 items: Vec<HisItem>,
133 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
134 self.write(id, items);
135 Box::pin(async {})
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use chrono::TimeZone;
143 use haystack_core::kinds::Number;
144
145 fn utc_dt(year: i32, month: u32, day: u32, hour: u32) -> DateTime<FixedOffset> {
147 let offset = FixedOffset::east_opt(0).unwrap();
148 offset
149 .with_ymd_and_hms(year, month, day, hour, 0, 0)
150 .unwrap()
151 }
152
153 fn num_item(ts: DateTime<FixedOffset>, v: f64) -> HisItem {
154 HisItem {
155 ts,
156 val: Kind::Number(Number::unitless(v)),
157 }
158 }
159
160 #[test]
161 fn write_and_read_back() {
162 let store = HisStore::new();
163 let ts1 = utc_dt(2024, 6, 1, 10);
164 let ts2 = utc_dt(2024, 6, 1, 11);
165 store.write("p1", vec![num_item(ts1, 72.0), num_item(ts2, 73.5)]);
166
167 let items = store.read("p1", None, None);
168 assert_eq!(items.len(), 2);
169 assert_eq!(items[0].ts, ts1);
170 assert_eq!(items[1].ts, ts2);
171 }
172
173 #[test]
174 fn range_query() {
175 let store = HisStore::new();
176 let ts1 = utc_dt(2024, 6, 1, 8);
177 let ts2 = utc_dt(2024, 6, 1, 10);
178 let ts3 = utc_dt(2024, 6, 1, 12);
179 let ts4 = utc_dt(2024, 6, 1, 14);
180 store.write(
181 "p1",
182 vec![
183 num_item(ts1, 70.0),
184 num_item(ts2, 72.0),
185 num_item(ts3, 74.0),
186 num_item(ts4, 76.0),
187 ],
188 );
189
190 let items = store.read("p1", Some(ts2), Some(ts3));
192 assert_eq!(items.len(), 2);
193 assert_eq!(items[0].ts, ts2);
194 assert_eq!(items[1].ts, ts3);
195 }
196
197 #[test]
198 fn empty_read_returns_empty() {
199 let store = HisStore::new();
200
201 let items = store.read("nonexistent", None, None);
203 assert!(items.is_empty());
204 assert_eq!(store.len("nonexistent"), 0);
205 }
206
207 #[test]
208 fn multiple_points_are_independent() {
209 let store = HisStore::new();
210 let ts1 = utc_dt(2024, 1, 1, 0);
211 let ts2 = utc_dt(2024, 1, 2, 0);
212
213 store.write("temp", vec![num_item(ts1, 68.0)]);
214 store.write("humidity", vec![num_item(ts2, 55.0)]);
215
216 assert_eq!(store.len("temp"), 1);
217 assert_eq!(store.len("humidity"), 1);
218
219 let temp_items = store.read("temp", None, None);
220 assert_eq!(temp_items.len(), 1);
221 assert_eq!(temp_items[0].ts, ts1);
222
223 let hum_items = store.read("humidity", None, None);
224 assert_eq!(hum_items.len(), 1);
225 assert_eq!(hum_items[0].ts, ts2);
226 }
227
228 #[test]
229 fn sorted_order_maintained() {
230 let store = HisStore::new();
231 let ts1 = utc_dt(2024, 3, 1, 12);
232 let ts2 = utc_dt(2024, 3, 1, 8);
233 let ts3 = utc_dt(2024, 3, 1, 16);
234 let ts4 = utc_dt(2024, 3, 1, 10);
235
236 store.write("p1", vec![num_item(ts1, 1.0), num_item(ts3, 3.0)]);
238 store.write("p1", vec![num_item(ts2, 2.0), num_item(ts4, 4.0)]);
239
240 let items = store.read("p1", None, None);
241 assert_eq!(items.len(), 4);
242 for window in items.windows(2) {
244 assert!(window[0].ts < window[1].ts);
245 }
246 assert_eq!(items[0].ts, ts2); assert_eq!(items[1].ts, ts4); assert_eq!(items[2].ts, ts1); assert_eq!(items[3].ts, ts3); }
251
252 #[test]
253 fn duplicate_timestamp_replaces_value() {
254 let store = HisStore::new();
255 let ts = utc_dt(2024, 6, 1, 10);
256
257 store.write("p1", vec![num_item(ts, 72.0)]);
258 store.write("p1", vec![num_item(ts, 99.0)]);
259
260 let items = store.read("p1", None, None);
261 assert_eq!(items.len(), 1);
262 assert_eq!(items[0].val, Kind::Number(Number::unitless(99.0)));
263 }
264}