haystack_server/ops/
his.rs1use actix_web::{HttpRequest, HttpResponse, web};
6use chrono::{DateTime, FixedOffset, Local, NaiveDate, NaiveTime, TimeZone};
7
8use haystack_core::data::{HCol, HDict, HGrid};
9use haystack_core::kinds::{HDateTime, HRef, Kind};
10
11use crate::content;
12use crate::error::HaystackError;
13use crate::his_store::HisItem;
14use crate::state::AppState;
15
16pub async fn handle_read(
29 req: HttpRequest,
30 body: String,
31 state: web::Data<AppState>,
32) -> Result<HttpResponse, HaystackError> {
33 let content_type = req
34 .headers()
35 .get("Content-Type")
36 .and_then(|v| v.to_str().ok())
37 .unwrap_or("");
38 let accept = req
39 .headers()
40 .get("Accept")
41 .and_then(|v| v.to_str().ok())
42 .unwrap_or("");
43
44 let request_grid = content::decode_request_grid(&body, content_type)
45 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
46
47 let row = request_grid
49 .row(0)
50 .ok_or_else(|| HaystackError::bad_request("hisRead request has no rows"))?;
51
52 let id = match row.get("id") {
54 Some(Kind::Ref(r)) => r.val.clone(),
55 _ => {
56 return Err(HaystackError::bad_request(
57 "hisRead: missing or invalid 'id' Ref",
58 ));
59 }
60 };
61
62 let range_str = match row.get("range") {
64 Some(Kind::Str(s)) => s.as_str(),
65 _ => {
66 return Err(HaystackError::bad_request(
67 "hisRead: missing or invalid 'range' Str",
68 ));
69 }
70 };
71
72 if !state.graph.contains(&id)
74 && let Some(connector) = state.federation.owner_of(&id)
75 {
76 let grid = connector
77 .proxy_his_read(&id, range_str)
78 .await
79 .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
80
81 let (encoded, ct) = content::encode_response_grid(&grid, accept)
82 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
83 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
84 }
85
86 let (start, end) = parse_range(range_str)
88 .map_err(|e| HaystackError::bad_request(format!("hisRead: bad range: {e}")))?;
89
90 let items = state.his.read(&id, Some(start), Some(end));
92
93 let cols = vec![HCol::new("ts"), HCol::new("val")];
95 let rows: Vec<HDict> = items
96 .into_iter()
97 .map(|item| {
98 let mut d = HDict::new();
99 d.set("ts", Kind::DateTime(HDateTime::new(item.ts, "UTC")));
100 d.set("val", item.val);
101 d
102 })
103 .collect();
104
105 let mut meta = HDict::new();
106 meta.set("id", Kind::Ref(HRef::from_val(&id)));
107 let grid = HGrid::from_parts(meta, cols, rows);
108
109 log::info!("hisRead: returning {} rows for point {}", grid.len(), id);
110 let (encoded, ct) = content::encode_response_grid(&grid, accept)
111 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
112
113 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
114}
115
116fn parse_range(range: &str) -> Result<(DateTime<FixedOffset>, DateTime<FixedOffset>), String> {
124 let range = range.trim();
125
126 match range {
127 "today" => {
128 let today = Local::now().date_naive();
129 Ok(date_range(today, today))
130 }
131 "yesterday" => {
132 let yesterday = Local::now().date_naive() - chrono::Duration::days(1);
133 Ok(date_range(yesterday, yesterday))
134 }
135 _ => {
136 if range.contains(',') {
137 let parts: Vec<&str> = range.splitn(2, ',').collect();
138 let start_date = parse_date(parts[0].trim())?;
139 let end_date = parse_date(parts[1].trim())?;
140 Ok(date_range(start_date, end_date))
141 } else {
142 let date = parse_date(range)?;
143 Ok(date_range(date, date))
144 }
145 }
146 }
147}
148
149fn parse_date(s: &str) -> Result<NaiveDate, String> {
151 NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| format!("invalid date '{s}': {e}"))
152}
153
154fn date_range(
158 start_date: NaiveDate,
159 end_date: NaiveDate,
160) -> (DateTime<FixedOffset>, DateTime<FixedOffset>) {
161 let utc = FixedOffset::east_opt(0).unwrap();
162 let start = utc
163 .from_local_datetime(&start_date.and_time(NaiveTime::MIN))
164 .unwrap();
165 let end = utc
166 .from_local_datetime(&end_date.and_hms_opt(23, 59, 59).unwrap())
167 .unwrap();
168 (start, end)
169}
170
171pub async fn handle_write(
180 req: HttpRequest,
181 body: String,
182 state: web::Data<AppState>,
183) -> Result<HttpResponse, HaystackError> {
184 let content_type = req
185 .headers()
186 .get("Content-Type")
187 .and_then(|v| v.to_str().ok())
188 .unwrap_or("");
189 let accept = req
190 .headers()
191 .get("Accept")
192 .and_then(|v| v.to_str().ok())
193 .unwrap_or("");
194
195 let request_grid = content::decode_request_grid(&body, content_type)
196 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
197
198 let id = match request_grid.meta.get("id") {
200 Some(Kind::Ref(r)) => r.val.clone(),
201 _ => {
202 return Err(HaystackError::bad_request(
203 "hisWrite: grid meta must contain 'id' Ref",
204 ));
205 }
206 };
207
208 if !state.graph.contains(&id) {
210 if let Some(connector) = state.federation.owner_of(&id) {
211 let items: Vec<HDict> = request_grid.rows.to_vec();
213 let grid = connector
214 .proxy_his_write(&id, items)
215 .await
216 .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
217
218 let (encoded, ct) = content::encode_response_grid(&grid, accept)
219 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
220 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
221 }
222 return Err(HaystackError::not_found(format!("entity not found: {id}")));
223 }
224
225 let mut items = Vec::with_capacity(request_grid.len());
227 for (i, row) in request_grid.iter().enumerate() {
228 let ts = match row.get("ts") {
229 Some(Kind::DateTime(hdt)) => hdt.dt,
230 _ => {
231 return Err(HaystackError::bad_request(format!(
232 "hisWrite: row {i} missing or invalid 'ts' DateTime"
233 )));
234 }
235 };
236 let val = row.get("val").cloned().unwrap_or(Kind::Null);
237
238 items.push(HisItem { ts, val });
239 }
240
241 let count = items.len();
242 state.his.write(&id, items);
243
244 log::info!("hisWrite: stored {} items for point {}", count, id);
245 let grid = HGrid::new();
246 let (encoded, ct) = content::encode_response_grid(&grid, accept)
247 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
248
249 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
250}