haystack_server/ops/
watch.rs1use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::Kind;
7
8use crate::auth::AuthUser;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13pub async fn handle_sub(
19 req: HttpRequest,
20 body: String,
21 state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23 let content_type = req
24 .headers()
25 .get("Content-Type")
26 .and_then(|v| v.to_str().ok())
27 .unwrap_or("");
28 let accept = req
29 .headers()
30 .get("Accept")
31 .and_then(|v| v.to_str().ok())
32 .unwrap_or("");
33
34 let request_grid = content::decode_request_grid(&body, content_type)
35 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37 let username = req
38 .extensions()
39 .get::<AuthUser>()
40 .map(|u| u.username.clone())
41 .unwrap_or_else(|| "anonymous".to_string());
42
43 let ids: Vec<String> = request_grid
45 .rows
46 .iter()
47 .filter_map(|row| match row.get("id") {
48 Some(Kind::Ref(r)) => Some(r.val.clone()),
49 _ => None,
50 })
51 .collect();
52
53 if ids.is_empty() {
54 return Err(HaystackError::bad_request(
55 "request must have 'id' column with Ref values",
56 ));
57 }
58
59 let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
61 Kind::Str(s) => Some(s.clone()),
62 _ => None,
63 });
64
65 let watch_id = if let Some(ref wid) = existing_watch_id {
66 if state.watches.add_ids(wid, &username, ids.clone()) {
67 wid.clone()
68 } else {
69 return Err(HaystackError::not_found(format!("watch not found: {wid}")));
70 }
71 } else {
72 let graph_version = state.graph.version();
73 state
74 .watches
75 .subscribe(&username, ids.clone(), graph_version)
76 .map_err(HaystackError::bad_request)?
77 };
78
79 let mut rows: Vec<HDict> = Vec::new();
81 let mut col_set: Vec<String> = Vec::new();
82 let mut seen = std::collections::HashSet::new();
83
84 for id in &ids {
85 if let Some(entity) = state.graph.get(id) {
86 for name in entity.tag_names() {
88 if seen.insert(name.to_string()) {
89 col_set.push(name.to_string());
90 }
91 }
92 rows.push(entity);
93 } else if let Some(connector) = state.federation.owner_of(id) {
94 let cached = connector.cached_entities();
97 if let Some(entity) = cached
98 .iter()
99 .find(|e| matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *id))
100 {
101 for name in entity.tag_names() {
102 if seen.insert(name.to_string()) {
103 col_set.push(name.to_string());
104 }
105 }
106 rows.push(entity.clone());
107 }
108 connector.add_remote_watch(id);
109 }
113 }
114
115 col_set.sort();
116 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
117
118 let mut meta = HDict::new();
119 meta.set("watchId", Kind::Str(watch_id));
120
121 let grid = HGrid::from_parts(meta, cols, rows);
122 let (encoded, ct) = content::encode_response_grid(&grid, accept)
123 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
124
125 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
126}
127
128pub async fn handle_poll(
133 req: HttpRequest,
134 body: String,
135 state: web::Data<AppState>,
136) -> Result<HttpResponse, HaystackError> {
137 let content_type = req
138 .headers()
139 .get("Content-Type")
140 .and_then(|v| v.to_str().ok())
141 .unwrap_or("");
142 let accept = req
143 .headers()
144 .get("Accept")
145 .and_then(|v| v.to_str().ok())
146 .unwrap_or("");
147
148 let request_grid = content::decode_request_grid(&body, content_type)
149 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
150
151 let username = req
152 .extensions()
153 .get::<AuthUser>()
154 .map(|u| u.username.clone())
155 .unwrap_or_else(|| "anonymous".to_string());
156
157 let watch_id = request_grid
158 .meta
159 .get("watchId")
160 .and_then(|v| match v {
161 Kind::Str(s) => Some(s.clone()),
162 _ => None,
163 })
164 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
165
166 let changed = state
167 .watches
168 .poll(&watch_id, &username, &state.graph)
169 .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
170
171 if changed.is_empty() {
172 return respond_grid(&HGrid::new(), accept);
173 }
174
175 let mut col_set: Vec<String> = Vec::new();
177 let mut seen = std::collections::HashSet::new();
178 for entity in &changed {
179 for name in entity.tag_names() {
180 if seen.insert(name.to_string()) {
181 col_set.push(name.to_string());
182 }
183 }
184 }
185 col_set.sort();
186 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
187
188 let grid = HGrid::from_parts(HDict::new(), cols, changed);
189 respond_grid(&grid, accept)
190}
191
192pub async fn handle_unsub(
198 req: HttpRequest,
199 body: String,
200 state: web::Data<AppState>,
201) -> Result<HttpResponse, HaystackError> {
202 let content_type = req
203 .headers()
204 .get("Content-Type")
205 .and_then(|v| v.to_str().ok())
206 .unwrap_or("");
207 let accept = req
208 .headers()
209 .get("Accept")
210 .and_then(|v| v.to_str().ok())
211 .unwrap_or("");
212
213 let request_grid = content::decode_request_grid(&body, content_type)
214 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
215
216 let username = req
217 .extensions()
218 .get::<AuthUser>()
219 .map(|u| u.username.clone())
220 .unwrap_or_else(|| "anonymous".to_string());
221
222 let watch_id = request_grid
223 .meta
224 .get("watchId")
225 .and_then(|v| match v {
226 Kind::Str(s) => Some(s.clone()),
227 _ => None,
228 })
229 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
230
231 let ids: Vec<String> = request_grid
233 .rows
234 .iter()
235 .filter_map(|row| match row.get("id") {
236 Some(Kind::Ref(r)) => Some(r.val.clone()),
237 _ => None,
238 })
239 .collect();
240
241 if ids.is_empty() {
242 if !state.watches.unsubscribe(&watch_id, &username) {
244 return Err(HaystackError::not_found(format!(
245 "watch not found: {watch_id}"
246 )));
247 }
248 } else {
249 if !state.watches.remove_ids(&watch_id, &username, &ids) {
251 return Err(HaystackError::not_found(format!(
252 "watch not found: {watch_id}"
253 )));
254 }
255
256 for id in &ids {
258 if let Some(connector) = state.federation.owner_of(id) {
259 connector.remove_remote_watch(id);
260 }
261 }
262 }
263
264 respond_grid(&HGrid::new(), accept)
265}
266
267fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
268 let (encoded, ct) = content::encode_response_grid(grid, accept)
269 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
270 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
271}