haystack_server/ops/
his.rs1use actix_web::{HttpRequest, HttpResponse, web};
52use chrono::{DateTime, FixedOffset, Local, NaiveDate, NaiveTime, TimeZone};
53
54use haystack_core::data::{HCol, HDict, HGrid};
55use haystack_core::kinds::{HDateTime, HRef, Kind};
56
57use crate::content;
58use crate::error::HaystackError;
59use crate::his_store::HisItem;
60use crate::state::AppState;
61
62pub async fn handle_read(
75 req: HttpRequest,
76 body: String,
77 state: web::Data<AppState>,
78) -> Result<HttpResponse, HaystackError> {
79 let content_type = req
80 .headers()
81 .get("Content-Type")
82 .and_then(|v| v.to_str().ok())
83 .unwrap_or("");
84 let accept = req
85 .headers()
86 .get("Accept")
87 .and_then(|v| v.to_str().ok())
88 .unwrap_or("");
89
90 let request_grid = content::decode_request_grid(&body, content_type)
91 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
92
93 let row = request_grid
95 .row(0)
96 .ok_or_else(|| HaystackError::bad_request("hisRead request has no rows"))?;
97
98 let id = match row.get("id") {
100 Some(Kind::Ref(r)) => r.val.clone(),
101 _ => {
102 return Err(HaystackError::bad_request(
103 "hisRead: missing or invalid 'id' Ref",
104 ));
105 }
106 };
107
108 let range_str = match row.get("range") {
110 Some(Kind::Str(s)) => s.as_str(),
111 _ => {
112 return Err(HaystackError::bad_request(
113 "hisRead: missing or invalid 'range' Str",
114 ));
115 }
116 };
117
118 if !state.graph.contains(&id)
120 && let Some(connector) = state.federation.owner_of(&id)
121 {
122 let grid = connector
123 .proxy_his_read(&id, range_str)
124 .await
125 .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
126
127 let (encoded, ct) = content::encode_response_grid(&grid, accept)
128 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
129 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
130 }
131
132 let (start, end) = parse_range(range_str)
134 .map_err(|e| HaystackError::bad_request(format!("hisRead: bad range: {e}")))?;
135
136 let items = state.his.read(&id, Some(start), Some(end));
138
139 let cols = vec![HCol::new("ts"), HCol::new("val")];
141 let rows: Vec<HDict> = items
142 .into_iter()
143 .map(|item| {
144 let mut d = HDict::new();
145 d.set("ts", Kind::DateTime(HDateTime::new(item.ts, "UTC")));
146 d.set("val", item.val);
147 d
148 })
149 .collect();
150
151 let mut meta = HDict::new();
152 meta.set("id", Kind::Ref(HRef::from_val(&id)));
153 let grid = HGrid::from_parts(meta, cols, rows);
154
155 log::info!("hisRead: returning {} rows for point {}", grid.len(), id);
156 let (encoded, ct) = content::encode_response_grid(&grid, accept)
157 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
158
159 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
160}
161
162fn parse_range(range: &str) -> Result<(DateTime<FixedOffset>, DateTime<FixedOffset>), String> {
170 let range = range.trim();
171
172 match range {
173 "today" => {
174 let today = Local::now().date_naive();
175 Ok(date_range(today, today))
176 }
177 "yesterday" => {
178 let yesterday = Local::now().date_naive() - chrono::Duration::days(1);
179 Ok(date_range(yesterday, yesterday))
180 }
181 _ => {
182 if range.contains(',') {
183 let parts: Vec<&str> = range.splitn(2, ',').collect();
184 let start_date = parse_date(parts[0].trim())?;
185 let end_date = parse_date(parts[1].trim())?;
186 Ok(date_range(start_date, end_date))
187 } else {
188 let date = parse_date(range)?;
189 Ok(date_range(date, date))
190 }
191 }
192 }
193}
194
195fn parse_date(s: &str) -> Result<NaiveDate, String> {
197 NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| format!("invalid date '{s}': {e}"))
198}
199
200fn date_range(
204 start_date: NaiveDate,
205 end_date: NaiveDate,
206) -> (DateTime<FixedOffset>, DateTime<FixedOffset>) {
207 let utc = FixedOffset::east_opt(0).unwrap();
208 let start = utc
209 .from_local_datetime(&start_date.and_time(NaiveTime::MIN))
210 .unwrap();
211 let end = utc
212 .from_local_datetime(&end_date.and_hms_opt(23, 59, 59).unwrap())
213 .unwrap();
214 (start, end)
215}
216
217pub async fn handle_write(
226 req: HttpRequest,
227 body: String,
228 state: web::Data<AppState>,
229) -> Result<HttpResponse, HaystackError> {
230 let content_type = req
231 .headers()
232 .get("Content-Type")
233 .and_then(|v| v.to_str().ok())
234 .unwrap_or("");
235 let accept = req
236 .headers()
237 .get("Accept")
238 .and_then(|v| v.to_str().ok())
239 .unwrap_or("");
240
241 let request_grid = content::decode_request_grid(&body, content_type)
242 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
243
244 let id = match request_grid.meta.get("id") {
246 Some(Kind::Ref(r)) => r.val.clone(),
247 _ => {
248 return Err(HaystackError::bad_request(
249 "hisWrite: grid meta must contain 'id' Ref",
250 ));
251 }
252 };
253
254 if !state.graph.contains(&id) {
256 if let Some(connector) = state.federation.owner_of(&id) {
257 let items: Vec<HDict> = request_grid.rows.to_vec();
259 let grid = connector
260 .proxy_his_write(&id, items)
261 .await
262 .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
263
264 let (encoded, ct) = content::encode_response_grid(&grid, accept)
265 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
266 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
267 }
268 return Err(HaystackError::not_found(format!("entity not found: {id}")));
269 }
270
271 let mut items = Vec::with_capacity(request_grid.len());
273 for (i, row) in request_grid.iter().enumerate() {
274 let ts = match row.get("ts") {
275 Some(Kind::DateTime(hdt)) => hdt.dt,
276 _ => {
277 return Err(HaystackError::bad_request(format!(
278 "hisWrite: row {i} missing or invalid 'ts' DateTime"
279 )));
280 }
281 };
282 let val = row.get("val").cloned().unwrap_or(Kind::Null);
283
284 items.push(HisItem { ts, val });
285 }
286
287 let count = items.len();
288 state.his.write(&id, items);
289
290 log::info!("hisWrite: stored {} items for point {}", count, id);
291 let grid = HGrid::new();
292 let (encoded, ct) = content::encode_response_grid(&grid, accept)
293 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
294
295 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
296}