1use std::collections::HashMap;
4
5use chrono::{DateTime, FixedOffset};
6use parking_lot::RwLock;
7
8use haystack_core::kinds::Kind;
9
10const MAX_ITEMS_PER_SERIES: usize = 1_000_000;
11
12#[derive(Debug, Clone)]
14pub struct HisItem {
15 pub ts: DateTime<FixedOffset>,
16 pub val: Kind,
17}
18
19pub struct HisStore {
23 items: RwLock<HashMap<String, Vec<HisItem>>>,
24}
25
26impl Default for HisStore {
27 fn default() -> Self {
28 Self {
29 items: RwLock::new(HashMap::new()),
30 }
31 }
32}
33
34impl HisStore {
35 pub fn new() -> Self {
37 Self::default()
38 }
39
40 pub fn write(&self, id: &str, new_items: Vec<HisItem>) {
45 let mut map = self.items.write();
46 let series = map.entry(id.to_string()).or_default();
47
48 for item in new_items {
49 match series.binary_search_by(|probe| probe.ts.cmp(&item.ts)) {
51 Ok(pos) => {
52 series[pos] = item;
54 }
55 Err(pos) => {
56 series.insert(pos, item);
58 }
59 }
60 }
61
62 if series.len() > MAX_ITEMS_PER_SERIES {
64 let excess = series.len() - MAX_ITEMS_PER_SERIES;
65 series.drain(..excess);
66 }
67 }
68
69 pub fn read(
74 &self,
75 id: &str,
76 start: Option<DateTime<FixedOffset>>,
77 end: Option<DateTime<FixedOffset>>,
78 ) -> Vec<HisItem> {
79 let map = self.items.read();
80 let series = match map.get(id) {
81 Some(s) => s,
82 None => return Vec::new(),
83 };
84
85 series
86 .iter()
87 .filter(|item| {
88 if let Some(ref s) = start
89 && item.ts < *s
90 {
91 return false;
92 }
93 if let Some(ref e) = end
94 && item.ts > *e
95 {
96 return false;
97 }
98 true
99 })
100 .cloned()
101 .collect()
102 }
103
104 pub fn len(&self, id: &str) -> usize {
106 let map = self.items.read();
107 map.get(id).map_or(0, |s| s.len())
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use chrono::TimeZone;
115 use haystack_core::kinds::Number;
116
117 fn utc_dt(year: i32, month: u32, day: u32, hour: u32) -> DateTime<FixedOffset> {
119 let offset = FixedOffset::east_opt(0).unwrap();
120 offset
121 .with_ymd_and_hms(year, month, day, hour, 0, 0)
122 .unwrap()
123 }
124
125 fn num_item(ts: DateTime<FixedOffset>, v: f64) -> HisItem {
126 HisItem {
127 ts,
128 val: Kind::Number(Number::unitless(v)),
129 }
130 }
131
132 #[test]
133 fn write_and_read_back() {
134 let store = HisStore::new();
135 let ts1 = utc_dt(2024, 6, 1, 10);
136 let ts2 = utc_dt(2024, 6, 1, 11);
137 store.write("p1", vec![num_item(ts1, 72.0), num_item(ts2, 73.5)]);
138
139 let items = store.read("p1", None, None);
140 assert_eq!(items.len(), 2);
141 assert_eq!(items[0].ts, ts1);
142 assert_eq!(items[1].ts, ts2);
143 }
144
145 #[test]
146 fn range_query() {
147 let store = HisStore::new();
148 let ts1 = utc_dt(2024, 6, 1, 8);
149 let ts2 = utc_dt(2024, 6, 1, 10);
150 let ts3 = utc_dt(2024, 6, 1, 12);
151 let ts4 = utc_dt(2024, 6, 1, 14);
152 store.write(
153 "p1",
154 vec![
155 num_item(ts1, 70.0),
156 num_item(ts2, 72.0),
157 num_item(ts3, 74.0),
158 num_item(ts4, 76.0),
159 ],
160 );
161
162 let items = store.read("p1", Some(ts2), Some(ts3));
164 assert_eq!(items.len(), 2);
165 assert_eq!(items[0].ts, ts2);
166 assert_eq!(items[1].ts, ts3);
167 }
168
169 #[test]
170 fn empty_read_returns_empty() {
171 let store = HisStore::new();
172
173 let items = store.read("nonexistent", None, None);
175 assert!(items.is_empty());
176 assert_eq!(store.len("nonexistent"), 0);
177 }
178
179 #[test]
180 fn multiple_points_are_independent() {
181 let store = HisStore::new();
182 let ts1 = utc_dt(2024, 1, 1, 0);
183 let ts2 = utc_dt(2024, 1, 2, 0);
184
185 store.write("temp", vec![num_item(ts1, 68.0)]);
186 store.write("humidity", vec![num_item(ts2, 55.0)]);
187
188 assert_eq!(store.len("temp"), 1);
189 assert_eq!(store.len("humidity"), 1);
190
191 let temp_items = store.read("temp", None, None);
192 assert_eq!(temp_items.len(), 1);
193 assert_eq!(temp_items[0].ts, ts1);
194
195 let hum_items = store.read("humidity", None, None);
196 assert_eq!(hum_items.len(), 1);
197 assert_eq!(hum_items[0].ts, ts2);
198 }
199
200 #[test]
201 fn sorted_order_maintained() {
202 let store = HisStore::new();
203 let ts1 = utc_dt(2024, 3, 1, 12);
204 let ts2 = utc_dt(2024, 3, 1, 8);
205 let ts3 = utc_dt(2024, 3, 1, 16);
206 let ts4 = utc_dt(2024, 3, 1, 10);
207
208 store.write("p1", vec![num_item(ts1, 1.0), num_item(ts3, 3.0)]);
210 store.write("p1", vec![num_item(ts2, 2.0), num_item(ts4, 4.0)]);
211
212 let items = store.read("p1", None, None);
213 assert_eq!(items.len(), 4);
214 for window in items.windows(2) {
216 assert!(window[0].ts < window[1].ts);
217 }
218 assert_eq!(items[0].ts, ts2); assert_eq!(items[1].ts, ts4); assert_eq!(items[2].ts, ts1); assert_eq!(items[3].ts, ts3); }
223
224 #[test]
225 fn duplicate_timestamp_replaces_value() {
226 let store = HisStore::new();
227 let ts = utc_dt(2024, 6, 1, 10);
228
229 store.write("p1", vec![num_item(ts, 72.0)]);
230 store.write("p1", vec![num_item(ts, 99.0)]);
231
232 let items = store.read("p1", None, None);
233 assert_eq!(items.len(), 1);
234 assert_eq!(items[0].val, Kind::Number(Number::unitless(99.0)));
235 }
236}