haystack_server/ops/
watch.rs1use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::Kind;
7
8use crate::auth::AuthUser;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13pub async fn handle_sub(
19 req: HttpRequest,
20 body: String,
21 state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23 let content_type = req
24 .headers()
25 .get("Content-Type")
26 .and_then(|v| v.to_str().ok())
27 .unwrap_or("");
28 let accept = req
29 .headers()
30 .get("Accept")
31 .and_then(|v| v.to_str().ok())
32 .unwrap_or("");
33
34 let request_grid = content::decode_request_grid(&body, content_type)
35 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37 let username = req
38 .extensions()
39 .get::<AuthUser>()
40 .map(|u| u.username.clone())
41 .unwrap_or_else(|| "anonymous".to_string());
42
43 let ids: Vec<String> = request_grid
45 .rows
46 .iter()
47 .filter_map(|row| match row.get("id") {
48 Some(Kind::Ref(r)) => Some(r.val.clone()),
49 _ => None,
50 })
51 .collect();
52
53 if ids.is_empty() {
54 return Err(HaystackError::bad_request(
55 "request must have 'id' column with Ref values",
56 ));
57 }
58
59 let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
61 Kind::Str(s) => Some(s.clone()),
62 _ => None,
63 });
64
65 let watch_id = if let Some(ref wid) = existing_watch_id {
66 if state.watches.add_ids(wid, &username, ids.clone()) {
67 wid.clone()
68 } else {
69 return Err(HaystackError::not_found(format!("watch not found: {wid}")));
70 }
71 } else {
72 let graph_version = state.graph.version();
73 state
74 .watches
75 .subscribe(&username, ids.clone(), graph_version)
76 .map_err(HaystackError::bad_request)?
77 };
78
79 let mut rows: Vec<HDict> = Vec::new();
81 let mut col_set: Vec<String> = Vec::new();
82 let mut seen = std::collections::HashSet::new();
83
84 for id in &ids {
85 if let Some(entity) = state.graph.get(id) {
86 for name in entity.tag_names() {
88 if seen.insert(name.to_string()) {
89 col_set.push(name.to_string());
90 }
91 }
92 rows.push(entity);
93 } else if let Some(connector) = state.federation.owner_of(id) {
94 let cached = connector.cached_entities();
97 if let Some(entity) = cached.iter().find(|e| {
98 matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *id)
99 }) {
100 for name in entity.tag_names() {
101 if seen.insert(name.to_string()) {
102 col_set.push(name.to_string());
103 }
104 }
105 rows.push(entity.clone());
106 }
107 connector.add_remote_watch(id);
108 }
112 }
113
114 col_set.sort();
115 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
116
117 let mut meta = HDict::new();
118 meta.set("watchId", Kind::Str(watch_id));
119
120 let grid = HGrid::from_parts(meta, cols, rows);
121 let (encoded, ct) = content::encode_response_grid(&grid, accept)
122 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
123
124 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
125}
126
127pub async fn handle_poll(
132 req: HttpRequest,
133 body: String,
134 state: web::Data<AppState>,
135) -> Result<HttpResponse, HaystackError> {
136 let content_type = req
137 .headers()
138 .get("Content-Type")
139 .and_then(|v| v.to_str().ok())
140 .unwrap_or("");
141 let accept = req
142 .headers()
143 .get("Accept")
144 .and_then(|v| v.to_str().ok())
145 .unwrap_or("");
146
147 let request_grid = content::decode_request_grid(&body, content_type)
148 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
149
150 let username = req
151 .extensions()
152 .get::<AuthUser>()
153 .map(|u| u.username.clone())
154 .unwrap_or_else(|| "anonymous".to_string());
155
156 let watch_id = request_grid
157 .meta
158 .get("watchId")
159 .and_then(|v| match v {
160 Kind::Str(s) => Some(s.clone()),
161 _ => None,
162 })
163 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
164
165 let changed = state
166 .watches
167 .poll(&watch_id, &username, &state.graph)
168 .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
169
170 if changed.is_empty() {
171 return respond_grid(&HGrid::new(), accept);
172 }
173
174 let mut col_set: Vec<String> = Vec::new();
176 let mut seen = std::collections::HashSet::new();
177 for entity in &changed {
178 for name in entity.tag_names() {
179 if seen.insert(name.to_string()) {
180 col_set.push(name.to_string());
181 }
182 }
183 }
184 col_set.sort();
185 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
186
187 let grid = HGrid::from_parts(HDict::new(), cols, changed);
188 respond_grid(&grid, accept)
189}
190
191pub async fn handle_unsub(
197 req: HttpRequest,
198 body: String,
199 state: web::Data<AppState>,
200) -> Result<HttpResponse, HaystackError> {
201 let content_type = req
202 .headers()
203 .get("Content-Type")
204 .and_then(|v| v.to_str().ok())
205 .unwrap_or("");
206 let accept = req
207 .headers()
208 .get("Accept")
209 .and_then(|v| v.to_str().ok())
210 .unwrap_or("");
211
212 let request_grid = content::decode_request_grid(&body, content_type)
213 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
214
215 let username = req
216 .extensions()
217 .get::<AuthUser>()
218 .map(|u| u.username.clone())
219 .unwrap_or_else(|| "anonymous".to_string());
220
221 let watch_id = request_grid
222 .meta
223 .get("watchId")
224 .and_then(|v| match v {
225 Kind::Str(s) => Some(s.clone()),
226 _ => None,
227 })
228 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
229
230 let ids: Vec<String> = request_grid
232 .rows
233 .iter()
234 .filter_map(|row| match row.get("id") {
235 Some(Kind::Ref(r)) => Some(r.val.clone()),
236 _ => None,
237 })
238 .collect();
239
240 if ids.is_empty() {
241 if !state.watches.unsubscribe(&watch_id, &username) {
243 return Err(HaystackError::not_found(format!(
244 "watch not found: {watch_id}"
245 )));
246 }
247 } else {
248 if !state.watches.remove_ids(&watch_id, &username, &ids) {
250 return Err(HaystackError::not_found(format!(
251 "watch not found: {watch_id}"
252 )));
253 }
254
255 for id in &ids {
257 if let Some(connector) = state.federation.owner_of(id) {
258 connector.remove_remote_watch(id);
259 }
260 }
261 }
262
263 respond_grid(&HGrid::new(), accept)
264}
265
266fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
267 let (encoded, ct) = content::encode_response_grid(grid, accept)
268 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
269 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
270}