haystack_server/ops/
his.rs1use actix_web::{HttpRequest, HttpResponse, web};
53use chrono::{DateTime, FixedOffset, Local, NaiveDate, NaiveTime, TimeZone};
54
55use haystack_core::data::{HCol, HDict, HGrid};
56use haystack_core::kinds::{HDateTime, HRef, Kind};
57
58use crate::content;
59use crate::error::HaystackError;
60use crate::his_store::HisItem;
61use crate::state::AppState;
62
63pub async fn handle_read(
76 req: HttpRequest,
77 body: String,
78 state: web::Data<AppState>,
79) -> Result<HttpResponse, HaystackError> {
80 let content_type = req
81 .headers()
82 .get("Content-Type")
83 .and_then(|v| v.to_str().ok())
84 .unwrap_or("");
85 let accept = req
86 .headers()
87 .get("Accept")
88 .and_then(|v| v.to_str().ok())
89 .unwrap_or("");
90
91 let request_grid = content::decode_request_grid(&body, content_type)
92 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
93
94 let row = request_grid
96 .row(0)
97 .ok_or_else(|| HaystackError::bad_request("hisRead request has no rows"))?;
98
99 let id = match row.get("id") {
101 Some(Kind::Ref(r)) => r.val.clone(),
102 _ => {
103 return Err(HaystackError::bad_request(
104 "hisRead: missing or invalid 'id' Ref",
105 ));
106 }
107 };
108
109 let range_str = match row.get("range") {
111 Some(Kind::Str(s)) => s.as_str(),
112 _ => {
113 return Err(HaystackError::bad_request(
114 "hisRead: missing or invalid 'range' Str",
115 ));
116 }
117 };
118
119 if !state.graph.contains(&id)
121 && let Some(connector) = state.federation.owner_of(&id)
122 {
123 let grid = connector
124 .proxy_his_read(&id, range_str)
125 .await
126 .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
127
128 let (encoded, ct) = content::encode_response_grid(&grid, accept)
129 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
130 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
131 }
132
133 let (start, end) = parse_range(range_str)
135 .map_err(|e| HaystackError::bad_request(format!("hisRead: bad range: {e}")))?;
136
137 let items = state.his.his_read(&id, Some(start), Some(end)).await;
139
140 let cols = vec![HCol::new("ts"), HCol::new("val")];
142 let rows: Vec<HDict> = items
143 .into_iter()
144 .map(|item| {
145 let mut d = HDict::new();
146 d.set("ts", Kind::DateTime(HDateTime::new(item.ts, "UTC")));
147 d.set("val", item.val);
148 d
149 })
150 .collect();
151
152 let mut meta = HDict::new();
153 meta.set("id", Kind::Ref(HRef::from_val(&id)));
154 let grid = HGrid::from_parts(meta, cols, rows);
155
156 log::info!("hisRead: returning {} rows for point {}", grid.len(), id);
157 let (encoded, ct) = content::encode_response_grid(&grid, accept)
158 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
159
160 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
161}
162
163fn parse_range(range: &str) -> Result<(DateTime<FixedOffset>, DateTime<FixedOffset>), String> {
171 let range = range.trim();
172
173 match range {
174 "today" => {
175 let today = Local::now().date_naive();
176 Ok(date_range(today, today))
177 }
178 "yesterday" => {
179 let yesterday = Local::now().date_naive() - chrono::Duration::days(1);
180 Ok(date_range(yesterday, yesterday))
181 }
182 _ => {
183 if range.contains(',') {
184 let parts: Vec<&str> = range.splitn(2, ',').collect();
185 let start_date = parse_date(parts[0].trim())?;
186 let end_date = parse_date(parts[1].trim())?;
187 Ok(date_range(start_date, end_date))
188 } else {
189 let date = parse_date(range)?;
190 Ok(date_range(date, date))
191 }
192 }
193 }
194}
195
196fn parse_date(s: &str) -> Result<NaiveDate, String> {
198 NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| format!("invalid date '{s}': {e}"))
199}
200
201fn date_range(
205 start_date: NaiveDate,
206 end_date: NaiveDate,
207) -> (DateTime<FixedOffset>, DateTime<FixedOffset>) {
208 let utc = FixedOffset::east_opt(0).unwrap();
209 let start = utc
210 .from_local_datetime(&start_date.and_time(NaiveTime::MIN))
211 .unwrap();
212 let end = utc
213 .from_local_datetime(&end_date.and_hms_opt(23, 59, 59).unwrap())
214 .unwrap();
215 (start, end)
216}
217
218pub async fn handle_write(
227 req: HttpRequest,
228 body: String,
229 state: web::Data<AppState>,
230) -> Result<HttpResponse, HaystackError> {
231 let content_type = req
232 .headers()
233 .get("Content-Type")
234 .and_then(|v| v.to_str().ok())
235 .unwrap_or("");
236 let accept = req
237 .headers()
238 .get("Accept")
239 .and_then(|v| v.to_str().ok())
240 .unwrap_or("");
241
242 let request_grid = content::decode_request_grid(&body, content_type)
243 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
244
245 let id = match request_grid.meta.get("id") {
247 Some(Kind::Ref(r)) => r.val.clone(),
248 _ => {
249 return Err(HaystackError::bad_request(
250 "hisWrite: grid meta must contain 'id' Ref",
251 ));
252 }
253 };
254
255 if !state.graph.contains(&id) {
257 if let Some(connector) = state.federation.owner_of(&id) {
258 let items: Vec<HDict> = request_grid.rows.to_vec();
260 let grid = connector
261 .proxy_his_write(&id, items)
262 .await
263 .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
264
265 let (encoded, ct) = content::encode_response_grid(&grid, accept)
266 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
267 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
268 }
269 return Err(HaystackError::not_found(format!("entity not found: {id}")));
270 }
271
272 let mut items = Vec::with_capacity(request_grid.len());
274 for (i, row) in request_grid.iter().enumerate() {
275 let ts = match row.get("ts") {
276 Some(Kind::DateTime(hdt)) => hdt.dt,
277 _ => {
278 return Err(HaystackError::bad_request(format!(
279 "hisWrite: row {i} missing or invalid 'ts' DateTime"
280 )));
281 }
282 };
283 let val = row.get("val").cloned().unwrap_or(Kind::Null);
284
285 items.push(HisItem { ts, val });
286 }
287
288 let count = items.len();
289 state.his.his_write(&id, items).await;
290
291 log::info!("hisWrite: stored {} items for point {}", count, id);
292 let grid = HGrid::new();
293 let (encoded, ct) = content::encode_response_grid(&grid, accept)
294 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
295
296 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
297}