Skip to main content

haystack_server/ops/
watch.rs

1//! Watch ops — subscribe, poll, and unsubscribe for entity changes.
2//!
3//! # Overview
4//!
5//! The watch ops allow clients to track changes to a set of entities.
6//! A watch is created via `watchSub`, polled via `watchPoll`, and
7//! torn down via `watchUnsub`.
8//!
9//! # watchSub (`POST /api/watchSub`)
10//!
11//! ## Request
12//!
13//! Grid meta may contain `watchId` (Str) to add IDs to an existing watch.
14//! Rows contain:
15//!
16//! | Column | Kind | Description            |
17//! |--------|------|------------------------|
18//! | `id`   | Ref  | Entity to subscribe to |
19//!
20//! ## Response
21//!
22//! Grid meta contains `watchId` (Str). Rows are the current state of
23//! each subscribed entity (one column per unique tag).
24//!
25//! # watchPoll (`POST /api/watchPoll`)
26//!
27//! ## Request
28//!
29//! Grid meta must contain `watchId` (Str). No rows required.
30//!
31//! ## Response
32//!
33//! Rows are entities that changed since the last poll. Empty grid if
34//! no changes occurred.
35//!
36//! # watchUnsub (`POST /api/watchUnsub`)
37//!
38//! ## Request
39//!
40//! Grid meta must contain `watchId` (Str). Rows may contain `id` (Ref)
41//! to selectively remove entities. If no rows are present, the entire
42//! watch is deleted.
43//!
44//! ## Response
45//!
46//! Empty grid on success.
47//!
48//! # Errors
49//!
50//! - **400 Bad Request** — missing `id` rows, missing `watchId` in meta, or
51//!   request decode failure.
52//! - **404 Not Found** — `watchId` does not exist or caller is not the owner.
53
54use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
55
56use haystack_core::data::{HCol, HDict, HGrid};
57use haystack_core::kinds::Kind;
58
59use crate::auth::AuthUser;
60use crate::content;
61use crate::error::HaystackError;
62use crate::state::AppState;
63
64/// POST /api/watchSub — subscribe to entity changes.
65///
66/// Request grid has `id` column with Ref values for entities to watch.
67/// May also have a `watchId` column to add IDs to an existing watch.
68/// Returns the current state of watched entities with `watchId` in grid meta.
69pub async fn handle_sub(
70    req: HttpRequest,
71    body: String,
72    state: web::Data<AppState>,
73) -> Result<HttpResponse, HaystackError> {
74    let content_type = req
75        .headers()
76        .get("Content-Type")
77        .and_then(|v| v.to_str().ok())
78        .unwrap_or("");
79    let accept = req
80        .headers()
81        .get("Accept")
82        .and_then(|v| v.to_str().ok())
83        .unwrap_or("");
84
85    let request_grid = content::decode_request_grid(&body, content_type)
86        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
87
88    let username = req
89        .extensions()
90        .get::<AuthUser>()
91        .map(|u| u.username.clone())
92        .unwrap_or_else(|| "anonymous".to_string());
93
94    // Collect entity IDs
95    let ids: Vec<String> = request_grid
96        .rows
97        .iter()
98        .filter_map(|row| match row.get("id") {
99            Some(Kind::Ref(r)) => Some(r.val.clone()),
100            _ => None,
101        })
102        .collect();
103
104    if ids.is_empty() {
105        return Err(HaystackError::bad_request(
106            "request must have 'id' column with Ref values",
107        ));
108    }
109
110    // Check for existing watchId in grid meta (add to existing watch)
111    let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
112        Kind::Str(s) => Some(s.clone()),
113        _ => None,
114    });
115
116    let watch_id = if let Some(ref wid) = existing_watch_id {
117        if state.watches.add_ids(wid, &username, ids.clone()) {
118            wid.clone()
119        } else {
120            return Err(HaystackError::not_found(format!("watch not found: {wid}")));
121        }
122    } else {
123        let graph_version = state.graph.version();
124        state
125            .watches
126            .subscribe(&username, ids.clone(), graph_version)
127            .map_err(HaystackError::bad_request)?
128    };
129
130    // Return current state of watched entities
131    let mut rows: Vec<HDict> = Vec::new();
132    let mut col_set: Vec<String> = Vec::new();
133    let mut seen = std::collections::HashSet::new();
134
135    for id in &ids {
136        if let Some(entity) = state.graph.get(id) {
137            // Local entity — include directly.
138            for name in entity.tag_names() {
139                if seen.insert(name.to_string()) {
140                    col_set.push(name.to_string());
141                }
142            }
143            rows.push(entity);
144        } else if let Some(connector) = state.federation.owner_of(id) {
145            // Federated entity — look up in the connector's cache and register
146            // the ID for remote watch tracking.
147            if let Some(entity) = connector.get_cached_entity(id) {
148                for name in entity.tag_names() {
149                    if seen.insert(name.to_string()) {
150                        col_set.push(name.to_string());
151                    }
152                }
153                rows.push((*entity).clone());
154            }
155            connector.add_remote_watch(id);
156            // TODO: For real-time push, establish a WS watch subscription on
157            // the remote server. Currently the background sync loop keeps the
158            // cache fresh and watchPoll detects changes from cache updates.
159        }
160    }
161
162    col_set.sort();
163    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
164
165    let mut meta = HDict::new();
166    meta.set("watchId", Kind::Str(watch_id));
167
168    let grid = HGrid::from_parts(meta, cols, rows);
169    let (encoded, ct) = content::encode_response_grid(&grid, accept)
170        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
171
172    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
173}
174
175/// POST /api/watchPoll — poll for changes since last poll.
176///
177/// Request grid has `watchId` in the grid meta.
178/// Returns changed entities since last poll.
179pub async fn handle_poll(
180    req: HttpRequest,
181    body: String,
182    state: web::Data<AppState>,
183) -> Result<HttpResponse, HaystackError> {
184    let content_type = req
185        .headers()
186        .get("Content-Type")
187        .and_then(|v| v.to_str().ok())
188        .unwrap_or("");
189    let accept = req
190        .headers()
191        .get("Accept")
192        .and_then(|v| v.to_str().ok())
193        .unwrap_or("");
194
195    let request_grid = content::decode_request_grid(&body, content_type)
196        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
197
198    let username = req
199        .extensions()
200        .get::<AuthUser>()
201        .map(|u| u.username.clone())
202        .unwrap_or_else(|| "anonymous".to_string());
203
204    let watch_id = request_grid
205        .meta
206        .get("watchId")
207        .and_then(|v| match v {
208            Kind::Str(s) => Some(s.clone()),
209            _ => None,
210        })
211        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
212
213    let changed = state
214        .watches
215        .poll(&watch_id, &username, &state.graph)
216        .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
217
218    if changed.is_empty() {
219        return respond_grid(&HGrid::new(), accept);
220    }
221
222    // Build grid from changed entities
223    let mut col_set: Vec<String> = Vec::new();
224    let mut seen = std::collections::HashSet::new();
225    for entity in &changed {
226        for name in entity.tag_names() {
227            if seen.insert(name.to_string()) {
228                col_set.push(name.to_string());
229            }
230        }
231    }
232    col_set.sort();
233    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
234
235    let grid = HGrid::from_parts(HDict::new(), cols, changed);
236    respond_grid(&grid, accept)
237}
238
239/// POST /api/watchUnsub — unsubscribe from a watch.
240///
241/// Request grid has `watchId` in the grid meta.
242/// Request rows may have `id` column to remove specific IDs from the watch.
243/// If no IDs are present in the request rows, the entire watch is removed.
244pub async fn handle_unsub(
245    req: HttpRequest,
246    body: String,
247    state: web::Data<AppState>,
248) -> Result<HttpResponse, HaystackError> {
249    let content_type = req
250        .headers()
251        .get("Content-Type")
252        .and_then(|v| v.to_str().ok())
253        .unwrap_or("");
254    let accept = req
255        .headers()
256        .get("Accept")
257        .and_then(|v| v.to_str().ok())
258        .unwrap_or("");
259
260    let request_grid = content::decode_request_grid(&body, content_type)
261        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
262
263    let username = req
264        .extensions()
265        .get::<AuthUser>()
266        .map(|u| u.username.clone())
267        .unwrap_or_else(|| "anonymous".to_string());
268
269    let watch_id = request_grid
270        .meta
271        .get("watchId")
272        .and_then(|v| match v {
273            Kind::Str(s) => Some(s.clone()),
274            _ => None,
275        })
276        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
277
278    // Collect entity IDs from request rows for selective removal
279    let ids: Vec<String> = request_grid
280        .rows
281        .iter()
282        .filter_map(|row| match row.get("id") {
283            Some(Kind::Ref(r)) => Some(r.val.clone()),
284            _ => None,
285        })
286        .collect();
287
288    if ids.is_empty() {
289        // No IDs specified — remove the entire watch
290        if !state.watches.unsubscribe(&watch_id, &username) {
291            return Err(HaystackError::not_found(format!(
292                "watch not found: {watch_id}"
293            )));
294        }
295    } else {
296        // Selective removal — remove only the specified IDs from the watch
297        if !state.watches.remove_ids(&watch_id, &username, &ids) {
298            return Err(HaystackError::not_found(format!(
299                "watch not found: {watch_id}"
300            )));
301        }
302
303        // Remove remote watch tracking for any federated IDs being unsubscribed.
304        for id in &ids {
305            if let Some(connector) = state.federation.owner_of(id) {
306                connector.remove_remote_watch(id);
307            }
308        }
309    }
310
311    respond_grid(&HGrid::new(), accept)
312}
313
314fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
315    let (encoded, ct) = content::encode_response_grid(grid, accept)
316        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
317    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
318}