Skip to main content

haystack_server/ops/
watch.rs

1//! Watch ops — subscribe, poll, and unsubscribe for entity changes.
2//!
3//! # Overview
4//!
5//! The watch ops allow clients to track changes to a set of entities.
6//! A watch is created via `watchSub`, polled via `watchPoll`, and
7//! torn down via `watchUnsub`.
8//!
9//! # watchSub (`POST /api/watchSub`)
10//!
11//! ## Request
12//!
13//! Grid meta may contain `watchId` (Str) to add IDs to an existing watch.
14//! Rows contain:
15//!
16//! | Column | Kind | Description            |
17//! |--------|------|------------------------|
18//! | `id`   | Ref  | Entity to subscribe to |
19//!
20//! ## Response
21//!
22//! Grid meta contains `watchId` (Str). Rows are the current state of
23//! each subscribed entity (one column per unique tag).
24//!
25//! # watchPoll (`POST /api/watchPoll`)
26//!
27//! ## Request
28//!
29//! Grid meta must contain `watchId` (Str). No rows required.
30//!
31//! ## Response
32//!
33//! Rows are entities that changed since the last poll. Empty grid if
34//! no changes occurred.
35//!
36//! # watchUnsub (`POST /api/watchUnsub`)
37//!
38//! ## Request
39//!
40//! Grid meta must contain `watchId` (Str). Rows may contain `id` (Ref)
41//! to selectively remove entities. If no rows are present, the entire
42//! watch is deleted.
43//!
44//! ## Response
45//!
46//! Empty grid on success.
47//!
48//! # Errors
49//!
50//! - **400 Bad Request** — missing `id` rows, missing `watchId` in meta, or
51//!   request decode failure.
52//! - **404 Not Found** — `watchId` does not exist or caller is not the owner.
53
54use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
55
56use haystack_core::data::{HCol, HDict, HGrid};
57use haystack_core::kinds::Kind;
58
59use crate::auth::AuthUser;
60use crate::content;
61use crate::error::HaystackError;
62use crate::state::AppState;
63
64/// POST /api/watchSub — subscribe to entity changes.
65///
66/// Request grid has `id` column with Ref values for entities to watch.
67/// May also have a `watchId` column to add IDs to an existing watch.
68/// Returns the current state of watched entities with `watchId` in grid meta.
69pub async fn handle_sub(
70    req: HttpRequest,
71    body: String,
72    state: web::Data<AppState>,
73) -> Result<HttpResponse, HaystackError> {
74    let content_type = req
75        .headers()
76        .get("Content-Type")
77        .and_then(|v| v.to_str().ok())
78        .unwrap_or("");
79    let accept = req
80        .headers()
81        .get("Accept")
82        .and_then(|v| v.to_str().ok())
83        .unwrap_or("");
84
85    let request_grid = content::decode_request_grid(&body, content_type)
86        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
87
88    let username = req
89        .extensions()
90        .get::<AuthUser>()
91        .map(|u| u.username.clone())
92        .unwrap_or_else(|| "anonymous".to_string());
93
94    // Collect entity IDs
95    let ids: Vec<String> = request_grid
96        .rows
97        .iter()
98        .filter_map(|row| match row.get("id") {
99            Some(Kind::Ref(r)) => Some(r.val.clone()),
100            _ => None,
101        })
102        .collect();
103
104    if ids.is_empty() {
105        return Err(HaystackError::bad_request(
106            "request must have 'id' column with Ref values",
107        ));
108    }
109
110    // Check for existing watchId in grid meta (add to existing watch)
111    let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
112        Kind::Str(s) => Some(s.clone()),
113        _ => None,
114    });
115
116    let watch_id = if let Some(ref wid) = existing_watch_id {
117        if state.watches.add_ids(wid, &username, ids.clone()) {
118            wid.clone()
119        } else {
120            return Err(HaystackError::not_found(format!("watch not found: {wid}")));
121        }
122    } else {
123        let graph_version = state.graph.version();
124        state
125            .watches
126            .subscribe(&username, ids.clone(), graph_version)
127            .map_err(HaystackError::bad_request)?
128    };
129
130    // Return current state of watched entities
131    let mut rows: Vec<HDict> = Vec::new();
132    let mut col_set: Vec<String> = Vec::new();
133    let mut seen = std::collections::HashSet::new();
134
135    for id in &ids {
136        if let Some(entity) = state.graph.get(id) {
137            // Local entity — include directly.
138            for name in entity.tag_names() {
139                if seen.insert(name.to_string()) {
140                    col_set.push(name.to_string());
141                }
142            }
143            rows.push(entity);
144        } else if let Some(connector) = state.federation.owner_of(id) {
145            // Federated entity — look up in the connector's cache and register
146            // the ID for remote watch tracking.
147            let cached = connector.cached_entities();
148            if let Some(entity) = cached
149                .iter()
150                .find(|e| matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *id))
151            {
152                for name in entity.tag_names() {
153                    if seen.insert(name.to_string()) {
154                        col_set.push(name.to_string());
155                    }
156                }
157                rows.push(entity.clone());
158            }
159            connector.add_remote_watch(id);
160            // TODO: For real-time push, establish a WS watch subscription on
161            // the remote server. Currently the background sync loop keeps the
162            // cache fresh and watchPoll detects changes from cache updates.
163        }
164    }
165
166    col_set.sort();
167    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
168
169    let mut meta = HDict::new();
170    meta.set("watchId", Kind::Str(watch_id));
171
172    let grid = HGrid::from_parts(meta, cols, rows);
173    let (encoded, ct) = content::encode_response_grid(&grid, accept)
174        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
175
176    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
177}
178
179/// POST /api/watchPoll — poll for changes since last poll.
180///
181/// Request grid has `watchId` in the grid meta.
182/// Returns changed entities since last poll.
183pub async fn handle_poll(
184    req: HttpRequest,
185    body: String,
186    state: web::Data<AppState>,
187) -> Result<HttpResponse, HaystackError> {
188    let content_type = req
189        .headers()
190        .get("Content-Type")
191        .and_then(|v| v.to_str().ok())
192        .unwrap_or("");
193    let accept = req
194        .headers()
195        .get("Accept")
196        .and_then(|v| v.to_str().ok())
197        .unwrap_or("");
198
199    let request_grid = content::decode_request_grid(&body, content_type)
200        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
201
202    let username = req
203        .extensions()
204        .get::<AuthUser>()
205        .map(|u| u.username.clone())
206        .unwrap_or_else(|| "anonymous".to_string());
207
208    let watch_id = request_grid
209        .meta
210        .get("watchId")
211        .and_then(|v| match v {
212            Kind::Str(s) => Some(s.clone()),
213            _ => None,
214        })
215        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
216
217    let changed = state
218        .watches
219        .poll(&watch_id, &username, &state.graph)
220        .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
221
222    if changed.is_empty() {
223        return respond_grid(&HGrid::new(), accept);
224    }
225
226    // Build grid from changed entities
227    let mut col_set: Vec<String> = Vec::new();
228    let mut seen = std::collections::HashSet::new();
229    for entity in &changed {
230        for name in entity.tag_names() {
231            if seen.insert(name.to_string()) {
232                col_set.push(name.to_string());
233            }
234        }
235    }
236    col_set.sort();
237    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
238
239    let grid = HGrid::from_parts(HDict::new(), cols, changed);
240    respond_grid(&grid, accept)
241}
242
243/// POST /api/watchUnsub — unsubscribe from a watch.
244///
245/// Request grid has `watchId` in the grid meta.
246/// Request rows may have `id` column to remove specific IDs from the watch.
247/// If no IDs are present in the request rows, the entire watch is removed.
248pub async fn handle_unsub(
249    req: HttpRequest,
250    body: String,
251    state: web::Data<AppState>,
252) -> Result<HttpResponse, HaystackError> {
253    let content_type = req
254        .headers()
255        .get("Content-Type")
256        .and_then(|v| v.to_str().ok())
257        .unwrap_or("");
258    let accept = req
259        .headers()
260        .get("Accept")
261        .and_then(|v| v.to_str().ok())
262        .unwrap_or("");
263
264    let request_grid = content::decode_request_grid(&body, content_type)
265        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
266
267    let username = req
268        .extensions()
269        .get::<AuthUser>()
270        .map(|u| u.username.clone())
271        .unwrap_or_else(|| "anonymous".to_string());
272
273    let watch_id = request_grid
274        .meta
275        .get("watchId")
276        .and_then(|v| match v {
277            Kind::Str(s) => Some(s.clone()),
278            _ => None,
279        })
280        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
281
282    // Collect entity IDs from request rows for selective removal
283    let ids: Vec<String> = request_grid
284        .rows
285        .iter()
286        .filter_map(|row| match row.get("id") {
287            Some(Kind::Ref(r)) => Some(r.val.clone()),
288            _ => None,
289        })
290        .collect();
291
292    if ids.is_empty() {
293        // No IDs specified — remove the entire watch
294        if !state.watches.unsubscribe(&watch_id, &username) {
295            return Err(HaystackError::not_found(format!(
296                "watch not found: {watch_id}"
297            )));
298        }
299    } else {
300        // Selective removal — remove only the specified IDs from the watch
301        if !state.watches.remove_ids(&watch_id, &username, &ids) {
302            return Err(HaystackError::not_found(format!(
303                "watch not found: {watch_id}"
304            )));
305        }
306
307        // Remove remote watch tracking for any federated IDs being unsubscribed.
308        for id in &ids {
309            if let Some(connector) = state.federation.owner_of(id) {
310                connector.remove_remote_watch(id);
311            }
312        }
313    }
314
315    respond_grid(&HGrid::new(), accept)
316}
317
318fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
319    let (encoded, ct) = content::encode_response_grid(grid, accept)
320        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
321    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
322}