haystack_server/ops/
changes.rs1use actix_web::{HttpRequest, HttpResponse, web};
29
30use haystack_core::data::{HCol, HDict, HGrid};
31use haystack_core::graph::changelog::DiffOp;
32use haystack_core::kinds::Kind;
33
34use crate::content;
35use crate::error::HaystackError;
36use crate::state::AppState;
37
38const MAX_CHANGE_ROWS: usize = 10_000;
40
41pub async fn handle(
53 req: HttpRequest,
54 body: String,
55 state: web::Data<AppState>,
56) -> Result<HttpResponse, HaystackError> {
57 let content_type = req
58 .headers()
59 .get("Content-Type")
60 .and_then(|v| v.to_str().ok())
61 .unwrap_or("");
62 let accept = req
63 .headers()
64 .get("Accept")
65 .and_then(|v| v.to_str().ok())
66 .unwrap_or("");
67
68 let request_grid = content::decode_request_grid(&body, content_type)
69 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
70
71 let since_version = request_grid
72 .row(0)
73 .and_then(|row| row.get("version"))
74 .and_then(|k| {
75 if let Kind::Number(n) = k {
76 Some(n.val as u64)
77 } else {
78 None
79 }
80 })
81 .unwrap_or(0);
82
83 let current_version = state.graph.version();
84 let diffs = match state.graph.changes_since(since_version) {
85 Ok(d) => d,
86 Err(gap) => {
87 let mut err_meta = HDict::new();
90 err_meta.set(
91 "curVer",
92 Kind::Number(haystack_core::kinds::Number::unitless(
93 current_version as f64,
94 )),
95 );
96 err_meta.set(
97 "err",
98 Kind::Str(format!(
99 "changelog gap: requested version {}, floor is {}",
100 gap.subscriber_version, gap.floor_version
101 )),
102 );
103 let grid = HGrid::from_parts(err_meta, Vec::new(), Vec::new());
104 let (encoded, ct) = content::encode_response_grid(&grid, accept)
105 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
106 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
107 }
108 };
109
110 let mut meta = HDict::new();
111 meta.set(
112 "curVer",
113 Kind::Number(haystack_core::kinds::Number::unitless(
114 current_version as f64,
115 )),
116 );
117
118 if diffs.is_empty() {
119 let grid = HGrid::from_parts(meta, Vec::new(), Vec::new());
120 let (encoded, ct) = content::encode_response_grid(&grid, accept)
121 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
122 return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
123 }
124
125 let cols = vec![
126 HCol::new("version"),
127 HCol::new("op"),
128 HCol::new("ref"),
129 HCol::new("ts"),
130 HCol::new("entity"),
131 ];
132
133 let mut rows: Vec<HDict> = diffs
134 .iter()
135 .map(|diff| {
136 let mut row = HDict::new();
137 row.set(
138 "version",
139 Kind::Number(haystack_core::kinds::Number::unitless(diff.version as f64)),
140 );
141 row.set(
142 "op",
143 Kind::Str(match diff.op {
144 DiffOp::Add => "add".to_string(),
145 DiffOp::Update => "update".to_string(),
146 DiffOp::Remove => "remove".to_string(),
147 }),
148 );
149 row.set("ref", Kind::Str(diff.ref_val.clone()));
150 row.set(
151 "ts",
152 Kind::Number(haystack_core::kinds::Number::unitless(
153 diff.timestamp as f64,
154 )),
155 );
156 if let Some(entity) = &diff.new {
158 row.set("entity", Kind::Dict(Box::new(entity.clone())));
159 } else if let Some(changed) = &diff.changed_tags {
160 row.set("entity", Kind::Dict(Box::new(changed.clone())));
161 }
162 row
163 })
164 .collect();
165
166 let truncated = rows.len() > MAX_CHANGE_ROWS;
167 if truncated {
168 rows.truncate(MAX_CHANGE_ROWS);
169 meta.set("truncated", Kind::Marker);
170 meta.set(
171 "maxRows",
172 Kind::Number(haystack_core::kinds::Number::unitless(
173 MAX_CHANGE_ROWS as f64,
174 )),
175 );
176 }
177
178 let grid = HGrid::from_parts(meta, cols, rows);
179 let (encoded, ct) = content::encode_response_grid(&grid, accept)
180 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
181
182 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
183}