Skip to main content

haystack_server/ops/
his.rs

1//! The `hisRead` and `hisWrite` ops — historical time-series data.
2//!
3//! Backed by the in-memory `HisStore` stored in `AppState`.
4
5use actix_web::{HttpRequest, HttpResponse, web};
6use chrono::{DateTime, FixedOffset, Local, NaiveDate, NaiveTime, TimeZone};
7
8use haystack_core::data::{HCol, HDict, HGrid};
9use haystack_core::kinds::{HDateTime, HRef, Kind};
10
11use crate::content;
12use crate::error::HaystackError;
13use crate::his_store::HisItem;
14use crate::state::AppState;
15
16// ---------------------------------------------------------------------------
17// hisRead
18// ---------------------------------------------------------------------------
19
20/// POST /api/hisRead
21///
22/// Request grid has one row with `id` (Ref) and `range` (Str) columns.
23///
24/// Supported `range` formats:
25///   - `"today"` / `"yesterday"` — date ranges based on local time
26///   - `"YYYY-MM-DD"` — a single date (midnight to midnight)
27///   - `"YYYY-MM-DD,YYYY-MM-DD"` — explicit start,end dates (start inclusive, end exclusive midnight)
28pub async fn handle_read(
29    req: HttpRequest,
30    body: String,
31    state: web::Data<AppState>,
32) -> Result<HttpResponse, HaystackError> {
33    let content_type = req
34        .headers()
35        .get("Content-Type")
36        .and_then(|v| v.to_str().ok())
37        .unwrap_or("");
38    let accept = req
39        .headers()
40        .get("Accept")
41        .and_then(|v| v.to_str().ok())
42        .unwrap_or("");
43
44    let request_grid = content::decode_request_grid(&body, content_type)
45        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
46
47    // Extract the first row.
48    let row = request_grid
49        .row(0)
50        .ok_or_else(|| HaystackError::bad_request("hisRead request has no rows"))?;
51
52    // Extract id.
53    let id = match row.get("id") {
54        Some(Kind::Ref(r)) => r.val.clone(),
55        _ => {
56            return Err(HaystackError::bad_request(
57                "hisRead: missing or invalid 'id' Ref",
58            ));
59        }
60    };
61
62    // Extract range string.
63    let range_str = match row.get("range") {
64        Some(Kind::Str(s)) => s.as_str(),
65        _ => {
66            return Err(HaystackError::bad_request(
67                "hisRead: missing or invalid 'range' Str",
68            ));
69        }
70    };
71
72    // Check if entity is local; if not, try federation proxy.
73    if !state.graph.contains(&id)
74        && let Some(connector) = state.federation.owner_of(&id)
75    {
76        let grid = connector
77            .proxy_his_read(&id, range_str)
78            .await
79            .map_err(|e| HaystackError::internal(format!("federation proxy error: {e}")))?;
80
81        let (encoded, ct) = content::encode_response_grid(&grid, accept)
82            .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
83        return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
84    }
85
86    // Parse range into (start, end) pair of DateTime<FixedOffset>.
87    let (start, end) = parse_range(range_str)
88        .map_err(|e| HaystackError::bad_request(format!("hisRead: bad range: {e}")))?;
89
90    // Query the store.
91    let items = state.his.read(&id, Some(start), Some(end));
92
93    // Build response grid.
94    let cols = vec![HCol::new("ts"), HCol::new("val")];
95    let rows: Vec<HDict> = items
96        .into_iter()
97        .map(|item| {
98            let mut d = HDict::new();
99            d.set("ts", Kind::DateTime(HDateTime::new(item.ts, "UTC")));
100            d.set("val", item.val);
101            d
102        })
103        .collect();
104
105    let mut meta = HDict::new();
106    meta.set("id", Kind::Ref(HRef::from_val(&id)));
107    let grid = HGrid::from_parts(meta, cols, rows);
108
109    log::info!("hisRead: returning {} rows for point {}", grid.len(), id);
110    let (encoded, ct) = content::encode_response_grid(&grid, accept)
111        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
112
113    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
114}
115
116/// Parse a range string into a (start, end) pair of `DateTime<FixedOffset>`.
117///
118/// Supported formats:
119///   - `"today"` — midnight-to-midnight of the current local date
120///   - `"yesterday"` — midnight-to-midnight of yesterday's local date
121///   - `"YYYY-MM-DD"` — a single date
122///   - `"YYYY-MM-DD,YYYY-MM-DD"` — explicit start,end
123fn parse_range(range: &str) -> Result<(DateTime<FixedOffset>, DateTime<FixedOffset>), String> {
124    let range = range.trim();
125
126    match range {
127        "today" => {
128            let today = Local::now().date_naive();
129            Ok(date_range(today, today))
130        }
131        "yesterday" => {
132            let yesterday = Local::now().date_naive() - chrono::Duration::days(1);
133            Ok(date_range(yesterday, yesterday))
134        }
135        _ => {
136            if range.contains(',') {
137                let parts: Vec<&str> = range.splitn(2, ',').collect();
138                let start_date = parse_date(parts[0].trim())?;
139                let end_date = parse_date(parts[1].trim())?;
140                Ok(date_range(start_date, end_date))
141            } else {
142                let date = parse_date(range)?;
143                Ok(date_range(date, date))
144            }
145        }
146    }
147}
148
149/// Parse a "YYYY-MM-DD" string into a NaiveDate.
150fn parse_date(s: &str) -> Result<NaiveDate, String> {
151    NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| format!("invalid date '{s}': {e}"))
152}
153
154/// Build a (start, end) DateTime pair from date(s).
155///
156/// Start is midnight on `start_date`, end is 23:59:59 on `end_date`, both at UTC.
157fn date_range(
158    start_date: NaiveDate,
159    end_date: NaiveDate,
160) -> (DateTime<FixedOffset>, DateTime<FixedOffset>) {
161    let utc = FixedOffset::east_opt(0).unwrap();
162    let start = utc
163        .from_local_datetime(&start_date.and_time(NaiveTime::MIN))
164        .unwrap();
165    let end = utc
166        .from_local_datetime(&end_date.and_hms_opt(23, 59, 59).unwrap())
167        .unwrap();
168    (start, end)
169}
170
171// ---------------------------------------------------------------------------
172// hisWrite
173// ---------------------------------------------------------------------------
174
175/// POST /api/hisWrite
176///
177/// Request grid meta must contain `id` (Ref). Rows contain `ts` and `val`
178/// columns. Data is stored in the in-memory `HisStore`.
179pub async fn handle_write(
180    req: HttpRequest,
181    body: String,
182    state: web::Data<AppState>,
183) -> Result<HttpResponse, HaystackError> {
184    let content_type = req
185        .headers()
186        .get("Content-Type")
187        .and_then(|v| v.to_str().ok())
188        .unwrap_or("");
189    let accept = req
190        .headers()
191        .get("Accept")
192        .and_then(|v| v.to_str().ok())
193        .unwrap_or("");
194
195    let request_grid = content::decode_request_grid(&body, content_type)
196        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
197
198    // Extract point id from grid meta.
199    let id = match request_grid.meta.get("id") {
200        Some(Kind::Ref(r)) => r.val.clone(),
201        _ => {
202            return Err(HaystackError::bad_request(
203                "hisWrite: grid meta must contain 'id' Ref",
204            ));
205        }
206    };
207
208    // Check federation: if entity is not in local graph, proxy to remote.
209    if !state.graph.contains(&id) {
210        if let Some(connector) = state.federation.owner_of(&id) {
211            // Forward the raw rows (with ts/val) to the remote server.
212            let items: Vec<HDict> = request_grid.rows.to_vec();
213            let grid = connector
214                .proxy_his_write(&id, items)
215                .await
216                .map_err(|e| {
217                    HaystackError::internal(format!("federation proxy error: {e}"))
218                })?;
219
220            let (encoded, ct) = content::encode_response_grid(&grid, accept)
221                .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
222            return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
223        }
224        return Err(HaystackError::not_found(format!(
225            "entity not found: {id}"
226        )));
227    }
228
229    // Parse rows into HisItems.
230    let mut items = Vec::with_capacity(request_grid.len());
231    for (i, row) in request_grid.iter().enumerate() {
232        let ts = match row.get("ts") {
233            Some(Kind::DateTime(hdt)) => hdt.dt,
234            _ => {
235                return Err(HaystackError::bad_request(format!(
236                    "hisWrite: row {i} missing or invalid 'ts' DateTime"
237                )));
238            }
239        };
240        let val = row.get("val").cloned().unwrap_or(Kind::Null);
241
242        items.push(HisItem { ts, val });
243    }
244
245    let count = items.len();
246    state.his.write(&id, items);
247
248    log::info!("hisWrite: stored {} items for point {}", count, id);
249    let grid = HGrid::new();
250    let (encoded, ct) = content::encode_response_grid(&grid, accept)
251        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
252
253    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
254}