haystack_server/ops/
watch.rs1use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
55
56use haystack_core::data::{HCol, HDict, HGrid};
57use haystack_core::kinds::Kind;
58
59use crate::auth::AuthUser;
60use crate::content;
61use crate::error::HaystackError;
62use crate::state::AppState;
63
64pub async fn handle_sub(
70 req: HttpRequest,
71 body: String,
72 state: web::Data<AppState>,
73) -> Result<HttpResponse, HaystackError> {
74 let content_type = req
75 .headers()
76 .get("Content-Type")
77 .and_then(|v| v.to_str().ok())
78 .unwrap_or("");
79 let accept = req
80 .headers()
81 .get("Accept")
82 .and_then(|v| v.to_str().ok())
83 .unwrap_or("");
84
85 let request_grid = content::decode_request_grid(&body, content_type)
86 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
87
88 let username = req
89 .extensions()
90 .get::<AuthUser>()
91 .map(|u| u.username.clone())
92 .unwrap_or_else(|| "anonymous".to_string());
93
94 let ids: Vec<String> = request_grid
96 .rows
97 .iter()
98 .filter_map(|row| match row.get("id") {
99 Some(Kind::Ref(r)) => Some(r.val.clone()),
100 _ => None,
101 })
102 .collect();
103
104 if ids.is_empty() {
105 return Err(HaystackError::bad_request(
106 "request must have 'id' column with Ref values",
107 ));
108 }
109
110 let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
112 Kind::Str(s) => Some(s.clone()),
113 _ => None,
114 });
115
116 let watch_id = if let Some(ref wid) = existing_watch_id {
117 if state.watches.add_ids(wid, &username, ids.clone()) {
118 wid.clone()
119 } else {
120 return Err(HaystackError::not_found(format!("watch not found: {wid}")));
121 }
122 } else {
123 let graph_version = state.graph.version();
124 state
125 .watches
126 .subscribe(&username, ids.clone(), graph_version)
127 .map_err(HaystackError::bad_request)?
128 };
129
130 let mut rows: Vec<HDict> = Vec::new();
132 let mut col_set: Vec<String> = Vec::new();
133 let mut seen = std::collections::HashSet::new();
134
135 for id in &ids {
136 if let Some(entity) = state.graph.get(id) {
137 for name in entity.tag_names() {
139 if seen.insert(name.to_string()) {
140 col_set.push(name.to_string());
141 }
142 }
143 rows.push(entity);
144 } else if let Some(connector) = state.federation.owner_of(id) {
145 let cached = connector.cached_entities();
148 if let Some(entity) = cached
149 .iter()
150 .find(|e| matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *id))
151 {
152 for name in entity.tag_names() {
153 if seen.insert(name.to_string()) {
154 col_set.push(name.to_string());
155 }
156 }
157 rows.push(entity.clone());
158 }
159 connector.add_remote_watch(id);
160 }
164 }
165
166 col_set.sort();
167 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
168
169 let mut meta = HDict::new();
170 meta.set("watchId", Kind::Str(watch_id));
171
172 let grid = HGrid::from_parts(meta, cols, rows);
173 let (encoded, ct) = content::encode_response_grid(&grid, accept)
174 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
175
176 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
177}
178
179pub async fn handle_poll(
184 req: HttpRequest,
185 body: String,
186 state: web::Data<AppState>,
187) -> Result<HttpResponse, HaystackError> {
188 let content_type = req
189 .headers()
190 .get("Content-Type")
191 .and_then(|v| v.to_str().ok())
192 .unwrap_or("");
193 let accept = req
194 .headers()
195 .get("Accept")
196 .and_then(|v| v.to_str().ok())
197 .unwrap_or("");
198
199 let request_grid = content::decode_request_grid(&body, content_type)
200 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
201
202 let username = req
203 .extensions()
204 .get::<AuthUser>()
205 .map(|u| u.username.clone())
206 .unwrap_or_else(|| "anonymous".to_string());
207
208 let watch_id = request_grid
209 .meta
210 .get("watchId")
211 .and_then(|v| match v {
212 Kind::Str(s) => Some(s.clone()),
213 _ => None,
214 })
215 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
216
217 let changed = state
218 .watches
219 .poll(&watch_id, &username, &state.graph)
220 .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
221
222 if changed.is_empty() {
223 return respond_grid(&HGrid::new(), accept);
224 }
225
226 let mut col_set: Vec<String> = Vec::new();
228 let mut seen = std::collections::HashSet::new();
229 for entity in &changed {
230 for name in entity.tag_names() {
231 if seen.insert(name.to_string()) {
232 col_set.push(name.to_string());
233 }
234 }
235 }
236 col_set.sort();
237 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
238
239 let grid = HGrid::from_parts(HDict::new(), cols, changed);
240 respond_grid(&grid, accept)
241}
242
243pub async fn handle_unsub(
249 req: HttpRequest,
250 body: String,
251 state: web::Data<AppState>,
252) -> Result<HttpResponse, HaystackError> {
253 let content_type = req
254 .headers()
255 .get("Content-Type")
256 .and_then(|v| v.to_str().ok())
257 .unwrap_or("");
258 let accept = req
259 .headers()
260 .get("Accept")
261 .and_then(|v| v.to_str().ok())
262 .unwrap_or("");
263
264 let request_grid = content::decode_request_grid(&body, content_type)
265 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
266
267 let username = req
268 .extensions()
269 .get::<AuthUser>()
270 .map(|u| u.username.clone())
271 .unwrap_or_else(|| "anonymous".to_string());
272
273 let watch_id = request_grid
274 .meta
275 .get("watchId")
276 .and_then(|v| match v {
277 Kind::Str(s) => Some(s.clone()),
278 _ => None,
279 })
280 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
281
282 let ids: Vec<String> = request_grid
284 .rows
285 .iter()
286 .filter_map(|row| match row.get("id") {
287 Some(Kind::Ref(r)) => Some(r.val.clone()),
288 _ => None,
289 })
290 .collect();
291
292 if ids.is_empty() {
293 if !state.watches.unsubscribe(&watch_id, &username) {
295 return Err(HaystackError::not_found(format!(
296 "watch not found: {watch_id}"
297 )));
298 }
299 } else {
300 if !state.watches.remove_ids(&watch_id, &username, &ids) {
302 return Err(HaystackError::not_found(format!(
303 "watch not found: {watch_id}"
304 )));
305 }
306
307 for id in &ids {
309 if let Some(connector) = state.federation.owner_of(id) {
310 connector.remove_remote_watch(id);
311 }
312 }
313 }
314
315 respond_grid(&HGrid::new(), accept)
316}
317
318fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
319 let (encoded, ct) = content::encode_response_grid(grid, accept)
320 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
321 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
322}