Skip to main content

haystack_server/ops/
changes.rs

1//! The `changes` op — return graph changelog entries since a given version.
2
3use axum::extract::State;
4use axum::http::HeaderMap;
5use axum::response::{IntoResponse, Response};
6
7use haystack_core::data::{HCol, HDict, HGrid};
8use haystack_core::graph::changelog::DiffOp;
9use haystack_core::kinds::Kind;
10
11use crate::content;
12use crate::error::HaystackError;
13use crate::state::SharedState;
14
15/// Maximum number of change rows returned in a single response.
16const MAX_CHANGE_ROWS: usize = 10_000;
17
18/// POST /api/changes
19pub async fn handle(
20    State(state): State<SharedState>,
21    headers: HeaderMap,
22    body: String,
23) -> Result<Response, HaystackError> {
24    let content_type = headers
25        .get("Content-Type")
26        .and_then(|v| v.to_str().ok())
27        .unwrap_or("");
28    let accept = headers
29        .get("Accept")
30        .and_then(|v| v.to_str().ok())
31        .unwrap_or("");
32
33    let request_grid = content::decode_request_grid(&body, content_type)
34        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
35
36    let since_version = request_grid
37        .row(0)
38        .and_then(|row| row.get("version"))
39        .and_then(|k| {
40            if let Kind::Number(n) = k {
41                Some(n.val as u64)
42            } else {
43                None
44            }
45        })
46        .unwrap_or(0);
47
48    let current_version = state.graph.version();
49    let diffs = match state.graph.changes_since(since_version) {
50        Ok(d) => d,
51        Err(gap) => {
52            let mut err_meta = HDict::new();
53            err_meta.set(
54                "curVer",
55                Kind::Number(haystack_core::kinds::Number::unitless(
56                    current_version as f64,
57                )),
58            );
59            err_meta.set(
60                "err",
61                Kind::Str(format!(
62                    "changelog gap: requested version {}, floor is {}",
63                    gap.subscriber_version, gap.floor_version
64                )),
65            );
66            let grid = HGrid::from_parts(err_meta, Vec::new(), Vec::new());
67            let (encoded, ct) = content::encode_response_grid(&grid, accept)
68                .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
69            return Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response());
70        }
71    };
72
73    let mut meta = HDict::new();
74    meta.set(
75        "curVer",
76        Kind::Number(haystack_core::kinds::Number::unitless(
77            current_version as f64,
78        )),
79    );
80
81    if diffs.is_empty() {
82        let grid = HGrid::from_parts(meta, Vec::new(), Vec::new());
83        let (encoded, ct) = content::encode_response_grid(&grid, accept)
84            .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
85        return Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response());
86    }
87
88    let cols = vec![
89        HCol::new("version"),
90        HCol::new("op"),
91        HCol::new("ref"),
92        HCol::new("ts"),
93        HCol::new("entity"),
94    ];
95
96    let mut rows: Vec<HDict> = diffs
97        .iter()
98        .map(|diff| {
99            let mut row = HDict::new();
100            row.set(
101                "version",
102                Kind::Number(haystack_core::kinds::Number::unitless(diff.version as f64)),
103            );
104            row.set(
105                "op",
106                Kind::Str(match diff.op {
107                    DiffOp::Add => "add".to_string(),
108                    DiffOp::Update => "update".to_string(),
109                    DiffOp::Remove => "remove".to_string(),
110                }),
111            );
112            row.set("ref", Kind::Str(diff.ref_val.clone()));
113            row.set(
114                "ts",
115                Kind::Number(haystack_core::kinds::Number::unitless(
116                    diff.timestamp as f64,
117                )),
118            );
119            if let Some(entity) = &diff.new {
120                row.set("entity", Kind::Dict(Box::new(entity.clone())));
121            } else if let Some(changed) = &diff.changed_tags {
122                row.set("entity", Kind::Dict(Box::new(changed.clone())));
123            }
124            row
125        })
126        .collect();
127
128    let truncated = rows.len() > MAX_CHANGE_ROWS;
129    if truncated {
130        rows.truncate(MAX_CHANGE_ROWS);
131        meta.set("truncated", Kind::Marker);
132        meta.set(
133            "maxRows",
134            Kind::Number(haystack_core::kinds::Number::unitless(
135                MAX_CHANGE_ROWS as f64,
136            )),
137        );
138    }
139
140    let grid = HGrid::from_parts(meta, cols, rows);
141    let (encoded, ct) = content::encode_response_grid(&grid, accept)
142        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
143
144    Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
145}