haystack_server/ops/
watch.rs1use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
55
56use haystack_core::data::{HCol, HDict, HGrid};
57use haystack_core::kinds::Kind;
58
59use crate::auth::AuthUser;
60use crate::content;
61use crate::error::HaystackError;
62use crate::state::AppState;
63
64pub async fn handle_sub(
70 req: HttpRequest,
71 body: String,
72 state: web::Data<AppState>,
73) -> Result<HttpResponse, HaystackError> {
74 let content_type = req
75 .headers()
76 .get("Content-Type")
77 .and_then(|v| v.to_str().ok())
78 .unwrap_or("");
79 let accept = req
80 .headers()
81 .get("Accept")
82 .and_then(|v| v.to_str().ok())
83 .unwrap_or("");
84
85 let request_grid = content::decode_request_grid(&body, content_type)
86 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
87
88 let username = req
89 .extensions()
90 .get::<AuthUser>()
91 .map(|u| u.username.clone())
92 .unwrap_or_else(|| "anonymous".to_string());
93
94 let ids: Vec<String> = request_grid
96 .rows
97 .iter()
98 .filter_map(|row| match row.get("id") {
99 Some(Kind::Ref(r)) => Some(r.val.clone()),
100 _ => None,
101 })
102 .collect();
103
104 if ids.is_empty() {
105 return Err(HaystackError::bad_request(
106 "request must have 'id' column with Ref values",
107 ));
108 }
109
110 let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
112 Kind::Str(s) => Some(s.clone()),
113 _ => None,
114 });
115
116 let watch_id = if let Some(ref wid) = existing_watch_id {
117 if state.watches.add_ids(wid, &username, ids.clone()) {
118 wid.clone()
119 } else {
120 return Err(HaystackError::not_found(format!("watch not found: {wid}")));
121 }
122 } else {
123 let graph_version = state.graph.version();
124 state
125 .watches
126 .subscribe(&username, ids.clone(), graph_version)
127 .map_err(HaystackError::bad_request)?
128 };
129
130 let mut rows: Vec<HDict> = Vec::new();
132 let mut col_set: Vec<String> = Vec::new();
133 let mut seen = std::collections::HashSet::new();
134
135 for id in &ids {
136 if let Some(entity) = state.graph.get(id) {
137 for name in entity.tag_names() {
139 if seen.insert(name.to_string()) {
140 col_set.push(name.to_string());
141 }
142 }
143 rows.push(entity);
144 } else if let Some(connector) = state.federation.owner_of(id) {
145 if let Some(entity) = connector.get_cached_entity(id) {
148 for name in entity.tag_names() {
149 if seen.insert(name.to_string()) {
150 col_set.push(name.to_string());
151 }
152 }
153 rows.push((*entity).clone());
154 }
155 connector.add_remote_watch(id);
156 }
160 }
161
162 col_set.sort();
163 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
164
165 let mut meta = HDict::new();
166 meta.set("watchId", Kind::Str(watch_id));
167
168 let grid = HGrid::from_parts(meta, cols, rows);
169 let (encoded, ct) = content::encode_response_grid(&grid, accept)
170 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
171
172 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
173}
174
175pub async fn handle_poll(
180 req: HttpRequest,
181 body: String,
182 state: web::Data<AppState>,
183) -> Result<HttpResponse, HaystackError> {
184 let content_type = req
185 .headers()
186 .get("Content-Type")
187 .and_then(|v| v.to_str().ok())
188 .unwrap_or("");
189 let accept = req
190 .headers()
191 .get("Accept")
192 .and_then(|v| v.to_str().ok())
193 .unwrap_or("");
194
195 let request_grid = content::decode_request_grid(&body, content_type)
196 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
197
198 let username = req
199 .extensions()
200 .get::<AuthUser>()
201 .map(|u| u.username.clone())
202 .unwrap_or_else(|| "anonymous".to_string());
203
204 let watch_id = request_grid
205 .meta
206 .get("watchId")
207 .and_then(|v| match v {
208 Kind::Str(s) => Some(s.clone()),
209 _ => None,
210 })
211 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
212
213 let changed = state
214 .watches
215 .poll(&watch_id, &username, &state.graph)
216 .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
217
218 if changed.is_empty() {
219 return respond_grid(&HGrid::new(), accept);
220 }
221
222 let mut col_set: Vec<String> = Vec::new();
224 let mut seen = std::collections::HashSet::new();
225 for entity in &changed {
226 for name in entity.tag_names() {
227 if seen.insert(name.to_string()) {
228 col_set.push(name.to_string());
229 }
230 }
231 }
232 col_set.sort();
233 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
234
235 let grid = HGrid::from_parts(HDict::new(), cols, changed);
236 respond_grid(&grid, accept)
237}
238
239pub async fn handle_unsub(
245 req: HttpRequest,
246 body: String,
247 state: web::Data<AppState>,
248) -> Result<HttpResponse, HaystackError> {
249 let content_type = req
250 .headers()
251 .get("Content-Type")
252 .and_then(|v| v.to_str().ok())
253 .unwrap_or("");
254 let accept = req
255 .headers()
256 .get("Accept")
257 .and_then(|v| v.to_str().ok())
258 .unwrap_or("");
259
260 let request_grid = content::decode_request_grid(&body, content_type)
261 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
262
263 let username = req
264 .extensions()
265 .get::<AuthUser>()
266 .map(|u| u.username.clone())
267 .unwrap_or_else(|| "anonymous".to_string());
268
269 let watch_id = request_grid
270 .meta
271 .get("watchId")
272 .and_then(|v| match v {
273 Kind::Str(s) => Some(s.clone()),
274 _ => None,
275 })
276 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
277
278 let ids: Vec<String> = request_grid
280 .rows
281 .iter()
282 .filter_map(|row| match row.get("id") {
283 Some(Kind::Ref(r)) => Some(r.val.clone()),
284 _ => None,
285 })
286 .collect();
287
288 if ids.is_empty() {
289 if !state.watches.unsubscribe(&watch_id, &username) {
291 return Err(HaystackError::not_found(format!(
292 "watch not found: {watch_id}"
293 )));
294 }
295 } else {
296 if !state.watches.remove_ids(&watch_id, &username, &ids) {
298 return Err(HaystackError::not_found(format!(
299 "watch not found: {watch_id}"
300 )));
301 }
302
303 for id in &ids {
305 if let Some(connector) = state.federation.owner_of(id) {
306 connector.remove_remote_watch(id);
307 }
308 }
309 }
310
311 respond_grid(&HGrid::new(), accept)
312}
313
314fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
315 let (encoded, ct) = content::encode_response_grid(grid, accept)
316 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
317 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
318}