haystack_server/ops/
changes.rs1use axum::extract::State;
4use axum::http::HeaderMap;
5use axum::response::{IntoResponse, Response};
6
7use haystack_core::data::{HCol, HDict, HGrid};
8use haystack_core::graph::changelog::DiffOp;
9use haystack_core::kinds::Kind;
10
11use crate::content;
12use crate::error::HaystackError;
13use crate::state::SharedState;
14
15const MAX_CHANGE_ROWS: usize = 10_000;
17
18pub async fn handle(
20 State(state): State<SharedState>,
21 headers: HeaderMap,
22 body: String,
23) -> Result<Response, HaystackError> {
24 let content_type = headers
25 .get("Content-Type")
26 .and_then(|v| v.to_str().ok())
27 .unwrap_or("");
28 let accept = headers
29 .get("Accept")
30 .and_then(|v| v.to_str().ok())
31 .unwrap_or("");
32
33 let request_grid = content::decode_request_grid(&body, content_type)
34 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
35
36 let since_version = request_grid
37 .row(0)
38 .and_then(|row| row.get("version"))
39 .and_then(|k| {
40 if let Kind::Number(n) = k {
41 Some(n.val as u64)
42 } else {
43 None
44 }
45 })
46 .unwrap_or(0);
47
48 let current_version = state.graph.version();
49 let diffs = match state.graph.changes_since(since_version) {
50 Ok(d) => d,
51 Err(gap) => {
52 let mut err_meta = HDict::new();
53 err_meta.set(
54 "curVer",
55 Kind::Number(haystack_core::kinds::Number::unitless(
56 current_version as f64,
57 )),
58 );
59 err_meta.set(
60 "err",
61 Kind::Str(format!(
62 "changelog gap: requested version {}, floor is {}",
63 gap.subscriber_version, gap.floor_version
64 )),
65 );
66 let grid = HGrid::from_parts(err_meta, Vec::new(), Vec::new());
67 let (encoded, ct) = content::encode_response_grid(&grid, accept)
68 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
69 return Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response());
70 }
71 };
72
73 let mut meta = HDict::new();
74 meta.set(
75 "curVer",
76 Kind::Number(haystack_core::kinds::Number::unitless(
77 current_version as f64,
78 )),
79 );
80
81 if diffs.is_empty() {
82 let grid = HGrid::from_parts(meta, Vec::new(), Vec::new());
83 let (encoded, ct) = content::encode_response_grid(&grid, accept)
84 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
85 return Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response());
86 }
87
88 let cols = vec![
89 HCol::new("version"),
90 HCol::new("op"),
91 HCol::new("ref"),
92 HCol::new("ts"),
93 HCol::new("entity"),
94 ];
95
96 let mut rows: Vec<HDict> = diffs
97 .iter()
98 .map(|diff| {
99 let mut row = HDict::new();
100 row.set(
101 "version",
102 Kind::Number(haystack_core::kinds::Number::unitless(diff.version as f64)),
103 );
104 row.set(
105 "op",
106 Kind::Str(match diff.op {
107 DiffOp::Add => "add".to_string(),
108 DiffOp::Update => "update".to_string(),
109 DiffOp::Remove => "remove".to_string(),
110 }),
111 );
112 row.set("ref", Kind::Str(diff.ref_val.clone()));
113 row.set(
114 "ts",
115 Kind::Number(haystack_core::kinds::Number::unitless(
116 diff.timestamp as f64,
117 )),
118 );
119 if let Some(entity) = &diff.new {
120 row.set("entity", Kind::Dict(Box::new(entity.clone())));
121 } else if let Some(changed) = &diff.changed_tags {
122 row.set("entity", Kind::Dict(Box::new(changed.clone())));
123 }
124 row
125 })
126 .collect();
127
128 let truncated = rows.len() > MAX_CHANGE_ROWS;
129 if truncated {
130 rows.truncate(MAX_CHANGE_ROWS);
131 meta.set("truncated", Kind::Marker);
132 meta.set(
133 "maxRows",
134 Kind::Number(haystack_core::kinds::Number::unitless(
135 MAX_CHANGE_ROWS as f64,
136 )),
137 );
138 }
139
140 let grid = HGrid::from_parts(meta, cols, rows);
141 let (encoded, ct) = content::encode_response_grid(&grid, accept)
142 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
143
144 Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
145}