Skip to main content

haystack_server/ops/
his.rs

1//! The `hisRead` and `hisWrite` ops — historical time-series data.
2
3use axum::extract::State;
4use axum::http::HeaderMap;
5use axum::response::{IntoResponse, Response};
6use chrono::{DateTime, FixedOffset, NaiveDate, NaiveTime, TimeZone, Utc};
7
8use haystack_core::data::{HCol, HDict, HGrid};
9use haystack_core::kinds::{HDateTime, HRef, Kind};
10
11use crate::content;
12use crate::error::HaystackError;
13use crate::his_store::HisItem;
14use crate::state::SharedState;
15
16// ---------------------------------------------------------------------------
17// hisRead
18// ---------------------------------------------------------------------------
19
20/// POST /api/hisRead
21pub async fn handle_read(
22    State(state): State<SharedState>,
23    headers: HeaderMap,
24    body: String,
25) -> Result<Response, HaystackError> {
26    let content_type = headers
27        .get("Content-Type")
28        .and_then(|v| v.to_str().ok())
29        .unwrap_or("");
30    let accept = headers
31        .get("Accept")
32        .and_then(|v| v.to_str().ok())
33        .unwrap_or("");
34
35    let request_grid = content::decode_request_grid(&body, content_type)
36        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
37
38    let row = request_grid
39        .row(0)
40        .ok_or_else(|| HaystackError::bad_request("hisRead request has no rows"))?;
41
42    let id = match row.get("id") {
43        Some(Kind::Ref(r)) => r.val.clone(),
44        _ => {
45            return Err(HaystackError::bad_request(
46                "hisRead: missing or invalid 'id' Ref",
47            ));
48        }
49    };
50
51    let range_str = match row.get("range") {
52        Some(Kind::Str(s)) => s.as_str(),
53        _ => {
54            return Err(HaystackError::bad_request(
55                "hisRead: missing or invalid 'range' Str",
56            ));
57        }
58    };
59
60    // Parse range into (start, end) pair.
61    let (start, end) = parse_range(range_str)
62        .map_err(|e| HaystackError::bad_request(format!("hisRead: bad range: {e}")))?;
63
64    // Query the store.
65    let items = state.his.his_read(&id, Some(start), Some(end)).await;
66
67    // Build response grid.
68    let cols = vec![HCol::new("ts"), HCol::new("val")];
69    let rows: Vec<HDict> = items
70        .into_iter()
71        .map(|item| {
72            let mut d = HDict::new();
73            d.set("ts", Kind::DateTime(HDateTime::new(item.ts, "UTC")));
74            d.set("val", item.val);
75            d
76        })
77        .collect();
78
79    let mut meta = HDict::new();
80    meta.set("id", Kind::Ref(HRef::from_val(&id)));
81    let grid = HGrid::from_parts(meta, cols, rows);
82
83    log::info!("hisRead: returning {} rows for point {}", grid.len(), id);
84    let (encoded, ct) = content::encode_response_grid(&grid, accept)
85        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
86
87    Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
88}
89
90/// Parse a range string into a (start, end) pair of `DateTime<FixedOffset>`.
91fn parse_range(range: &str) -> Result<(DateTime<FixedOffset>, DateTime<FixedOffset>), String> {
92    let range = range.trim();
93
94    match range {
95        "today" => {
96            let today = Utc::now().date_naive();
97            Ok(date_range(today, today))
98        }
99        "yesterday" => {
100            let yesterday = Utc::now().date_naive() - chrono::Duration::days(1);
101            Ok(date_range(yesterday, yesterday))
102        }
103        _ => {
104            if range.contains(',') {
105                let parts: Vec<&str> = range.splitn(2, ',').collect();
106                let start_date = parse_date(parts[0].trim())?;
107                let end_date = parse_date(parts[1].trim())?;
108                Ok(date_range(start_date, end_date))
109            } else {
110                let date = parse_date(range)?;
111                Ok(date_range(date, date))
112            }
113        }
114    }
115}
116
117fn parse_date(s: &str) -> Result<NaiveDate, String> {
118    NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| format!("invalid date '{s}': {e}"))
119}
120
121fn date_range(
122    start_date: NaiveDate,
123    end_date: NaiveDate,
124) -> (DateTime<FixedOffset>, DateTime<FixedOffset>) {
125    let utc = FixedOffset::east_opt(0).unwrap();
126    let start = utc
127        .from_local_datetime(&start_date.and_time(NaiveTime::MIN))
128        .unwrap();
129    let end = utc
130        .from_local_datetime(&end_date.and_hms_opt(23, 59, 59).unwrap())
131        .unwrap();
132    (start, end)
133}
134
135// ---------------------------------------------------------------------------
136// hisWrite
137// ---------------------------------------------------------------------------
138
139const MAX_HIS_WRITE_ROWS: usize = 100_000;
140
141/// POST /api/hisWrite
142pub async fn handle_write(
143    State(state): State<SharedState>,
144    headers: HeaderMap,
145    body: String,
146) -> Result<Response, HaystackError> {
147    let content_type = headers
148        .get("Content-Type")
149        .and_then(|v| v.to_str().ok())
150        .unwrap_or("");
151    let accept = headers
152        .get("Accept")
153        .and_then(|v| v.to_str().ok())
154        .unwrap_or("");
155
156    let request_grid = content::decode_request_grid(&body, content_type)
157        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
158
159    if request_grid.rows.len() > MAX_HIS_WRITE_ROWS {
160        return Err(HaystackError::bad_request("too many history rows"));
161    }
162
163    let id = match request_grid.meta.get("id") {
164        Some(Kind::Ref(r)) => r.val.clone(),
165        _ => {
166            return Err(HaystackError::bad_request(
167                "hisWrite: grid meta must contain 'id' Ref",
168            ));
169        }
170    };
171
172    // Parse rows into HisItems.
173    let mut items = Vec::with_capacity(request_grid.len());
174    for (i, row) in request_grid.iter().enumerate() {
175        let ts = match row.get("ts") {
176            Some(Kind::DateTime(hdt)) => hdt.dt,
177            _ => {
178                return Err(HaystackError::bad_request(format!(
179                    "hisWrite: row {i} missing or invalid 'ts' DateTime"
180                )));
181            }
182        };
183        let val = row.get("val").cloned().unwrap_or(Kind::Null);
184
185        items.push(HisItem { ts, val });
186    }
187
188    let count = items.len();
189    state.his.his_write(&id, items).await;
190
191    log::info!("hisWrite: stored {} items for point {}", count, id);
192    let grid = HGrid::new();
193    let (encoded, ct) = content::encode_response_grid(&grid, accept)
194        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
195
196    Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
197}