haystack_server/ops/
his.rs1use axum::extract::State;
4use axum::http::HeaderMap;
5use axum::response::{IntoResponse, Response};
6use chrono::{DateTime, FixedOffset, NaiveDate, NaiveTime, TimeZone, Utc};
7
8use haystack_core::data::{HCol, HDict, HGrid};
9use haystack_core::kinds::{HDateTime, HRef, Kind};
10
11use crate::content;
12use crate::error::HaystackError;
13use crate::his_store::HisItem;
14use crate::state::SharedState;
15
16pub async fn handle_read(
22 State(state): State<SharedState>,
23 headers: HeaderMap,
24 body: String,
25) -> Result<Response, HaystackError> {
26 let content_type = headers
27 .get("Content-Type")
28 .and_then(|v| v.to_str().ok())
29 .unwrap_or("");
30 let accept = headers
31 .get("Accept")
32 .and_then(|v| v.to_str().ok())
33 .unwrap_or("");
34
35 let request_grid = content::decode_request_grid(&body, content_type)
36 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
37
38 let row = request_grid
39 .row(0)
40 .ok_or_else(|| HaystackError::bad_request("hisRead request has no rows"))?;
41
42 let id = match row.get("id") {
43 Some(Kind::Ref(r)) => r.val.clone(),
44 _ => {
45 return Err(HaystackError::bad_request(
46 "hisRead: missing or invalid 'id' Ref",
47 ));
48 }
49 };
50
51 let range_str = match row.get("range") {
52 Some(Kind::Str(s)) => s.as_str(),
53 _ => {
54 return Err(HaystackError::bad_request(
55 "hisRead: missing or invalid 'range' Str",
56 ));
57 }
58 };
59
60 let (start, end) = parse_range(range_str)
62 .map_err(|e| HaystackError::bad_request(format!("hisRead: bad range: {e}")))?;
63
64 let items = state.his.his_read(&id, Some(start), Some(end)).await;
66
67 let cols = vec![HCol::new("ts"), HCol::new("val")];
69 let rows: Vec<HDict> = items
70 .into_iter()
71 .map(|item| {
72 let mut d = HDict::new();
73 d.set("ts", Kind::DateTime(HDateTime::new(item.ts, "UTC")));
74 d.set("val", item.val);
75 d
76 })
77 .collect();
78
79 let mut meta = HDict::new();
80 meta.set("id", Kind::Ref(HRef::from_val(&id)));
81 let grid = HGrid::from_parts(meta, cols, rows);
82
83 log::info!("hisRead: returning {} rows for point {}", grid.len(), id);
84 let (encoded, ct) = content::encode_response_grid(&grid, accept)
85 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
86
87 Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
88}
89
90fn parse_range(range: &str) -> Result<(DateTime<FixedOffset>, DateTime<FixedOffset>), String> {
92 let range = range.trim();
93
94 match range {
95 "today" => {
96 let today = Utc::now().date_naive();
97 Ok(date_range(today, today))
98 }
99 "yesterday" => {
100 let yesterday = Utc::now().date_naive() - chrono::Duration::days(1);
101 Ok(date_range(yesterday, yesterday))
102 }
103 _ => {
104 if range.contains(',') {
105 let parts: Vec<&str> = range.splitn(2, ',').collect();
106 let start_date = parse_date(parts[0].trim())?;
107 let end_date = parse_date(parts[1].trim())?;
108 Ok(date_range(start_date, end_date))
109 } else {
110 let date = parse_date(range)?;
111 Ok(date_range(date, date))
112 }
113 }
114 }
115}
116
117fn parse_date(s: &str) -> Result<NaiveDate, String> {
118 NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| format!("invalid date '{s}': {e}"))
119}
120
121fn date_range(
122 start_date: NaiveDate,
123 end_date: NaiveDate,
124) -> (DateTime<FixedOffset>, DateTime<FixedOffset>) {
125 let utc = FixedOffset::east_opt(0).unwrap();
126 let start = utc
127 .from_local_datetime(&start_date.and_time(NaiveTime::MIN))
128 .unwrap();
129 let end = utc
130 .from_local_datetime(&end_date.and_hms_opt(23, 59, 59).unwrap())
131 .unwrap();
132 (start, end)
133}
134
135const MAX_HIS_WRITE_ROWS: usize = 100_000;
140
141pub async fn handle_write(
143 State(state): State<SharedState>,
144 headers: HeaderMap,
145 body: String,
146) -> Result<Response, HaystackError> {
147 let content_type = headers
148 .get("Content-Type")
149 .and_then(|v| v.to_str().ok())
150 .unwrap_or("");
151 let accept = headers
152 .get("Accept")
153 .and_then(|v| v.to_str().ok())
154 .unwrap_or("");
155
156 let request_grid = content::decode_request_grid(&body, content_type)
157 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
158
159 if request_grid.rows.len() > MAX_HIS_WRITE_ROWS {
160 return Err(HaystackError::bad_request("too many history rows"));
161 }
162
163 let id = match request_grid.meta.get("id") {
164 Some(Kind::Ref(r)) => r.val.clone(),
165 _ => {
166 return Err(HaystackError::bad_request(
167 "hisWrite: grid meta must contain 'id' Ref",
168 ));
169 }
170 };
171
172 let mut items = Vec::with_capacity(request_grid.len());
174 for (i, row) in request_grid.iter().enumerate() {
175 let ts = match row.get("ts") {
176 Some(Kind::DateTime(hdt)) => hdt.dt,
177 _ => {
178 return Err(HaystackError::bad_request(format!(
179 "hisWrite: row {i} missing or invalid 'ts' DateTime"
180 )));
181 }
182 };
183 let val = row.get("val").cloned().unwrap_or(Kind::Null);
184
185 items.push(HisItem { ts, val });
186 }
187
188 let count = items.len();
189 state.his.his_write(&id, items).await;
190
191 log::info!("hisWrite: stored {} items for point {}", count, id);
192 let grid = HGrid::new();
193 let (encoded, ct) = content::encode_response_grid(&grid, accept)
194 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
195
196 Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
197}