Skip to main content

haystack_server/ops/
watch.rs

1//! Watch ops — subscribe, poll, and unsubscribe for entity changes.
2
3use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::Kind;
7
8use crate::auth::AuthUser;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13/// POST /api/watchSub — subscribe to entity changes.
14///
15/// Request grid has `id` column with Ref values for entities to watch.
16/// May also have a `watchId` column to add IDs to an existing watch.
17/// Returns the current state of watched entities with `watchId` in grid meta.
18pub async fn handle_sub(
19    req: HttpRequest,
20    body: String,
21    state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23    let content_type = req
24        .headers()
25        .get("Content-Type")
26        .and_then(|v| v.to_str().ok())
27        .unwrap_or("");
28    let accept = req
29        .headers()
30        .get("Accept")
31        .and_then(|v| v.to_str().ok())
32        .unwrap_or("");
33
34    let request_grid = content::decode_request_grid(&body, content_type)
35        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37    let username = req
38        .extensions()
39        .get::<AuthUser>()
40        .map(|u| u.username.clone())
41        .unwrap_or_else(|| "anonymous".to_string());
42
43    // Collect entity IDs
44    let ids: Vec<String> = request_grid
45        .rows
46        .iter()
47        .filter_map(|row| match row.get("id") {
48            Some(Kind::Ref(r)) => Some(r.val.clone()),
49            _ => None,
50        })
51        .collect();
52
53    if ids.is_empty() {
54        return Err(HaystackError::bad_request(
55            "request must have 'id' column with Ref values",
56        ));
57    }
58
59    // Check for existing watchId in grid meta (add to existing watch)
60    let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
61        Kind::Str(s) => Some(s.clone()),
62        _ => None,
63    });
64
65    let watch_id = if let Some(ref wid) = existing_watch_id {
66        if state.watches.add_ids(wid, &username, ids.clone()) {
67            wid.clone()
68        } else {
69            return Err(HaystackError::not_found(format!("watch not found: {wid}")));
70        }
71    } else {
72        let graph_version = state.graph.version();
73        state
74            .watches
75            .subscribe(&username, ids.clone(), graph_version)
76            .map_err(HaystackError::bad_request)?
77    };
78
79    // Return current state of watched entities
80    let mut rows: Vec<HDict> = Vec::new();
81    let mut col_set: Vec<String> = Vec::new();
82    let mut seen = std::collections::HashSet::new();
83
84    for id in &ids {
85        if let Some(entity) = state.graph.get(id) {
86            // Local entity — include directly.
87            for name in entity.tag_names() {
88                if seen.insert(name.to_string()) {
89                    col_set.push(name.to_string());
90                }
91            }
92            rows.push(entity);
93        } else if let Some(connector) = state.federation.owner_of(id) {
94            // Federated entity — look up in the connector's cache and register
95            // the ID for remote watch tracking.
96            let cached = connector.cached_entities();
97            if let Some(entity) = cached.iter().find(|e| {
98                matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *id)
99            }) {
100                for name in entity.tag_names() {
101                    if seen.insert(name.to_string()) {
102                        col_set.push(name.to_string());
103                    }
104                }
105                rows.push(entity.clone());
106            }
107            connector.add_remote_watch(id);
108            // TODO: For real-time push, establish a WS watch subscription on
109            // the remote server. Currently the background sync loop keeps the
110            // cache fresh and watchPoll detects changes from cache updates.
111        }
112    }
113
114    col_set.sort();
115    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
116
117    let mut meta = HDict::new();
118    meta.set("watchId", Kind::Str(watch_id));
119
120    let grid = HGrid::from_parts(meta, cols, rows);
121    let (encoded, ct) = content::encode_response_grid(&grid, accept)
122        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
123
124    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
125}
126
127/// POST /api/watchPoll — poll for changes since last poll.
128///
129/// Request grid has `watchId` in the grid meta.
130/// Returns changed entities since last poll.
131pub async fn handle_poll(
132    req: HttpRequest,
133    body: String,
134    state: web::Data<AppState>,
135) -> Result<HttpResponse, HaystackError> {
136    let content_type = req
137        .headers()
138        .get("Content-Type")
139        .and_then(|v| v.to_str().ok())
140        .unwrap_or("");
141    let accept = req
142        .headers()
143        .get("Accept")
144        .and_then(|v| v.to_str().ok())
145        .unwrap_or("");
146
147    let request_grid = content::decode_request_grid(&body, content_type)
148        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
149
150    let username = req
151        .extensions()
152        .get::<AuthUser>()
153        .map(|u| u.username.clone())
154        .unwrap_or_else(|| "anonymous".to_string());
155
156    let watch_id = request_grid
157        .meta
158        .get("watchId")
159        .and_then(|v| match v {
160            Kind::Str(s) => Some(s.clone()),
161            _ => None,
162        })
163        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
164
165    let changed = state
166        .watches
167        .poll(&watch_id, &username, &state.graph)
168        .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
169
170    if changed.is_empty() {
171        return respond_grid(&HGrid::new(), accept);
172    }
173
174    // Build grid from changed entities
175    let mut col_set: Vec<String> = Vec::new();
176    let mut seen = std::collections::HashSet::new();
177    for entity in &changed {
178        for name in entity.tag_names() {
179            if seen.insert(name.to_string()) {
180                col_set.push(name.to_string());
181            }
182        }
183    }
184    col_set.sort();
185    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
186
187    let grid = HGrid::from_parts(HDict::new(), cols, changed);
188    respond_grid(&grid, accept)
189}
190
191/// POST /api/watchUnsub — unsubscribe from a watch.
192///
193/// Request grid has `watchId` in the grid meta.
194/// Request rows may have `id` column to remove specific IDs from the watch.
195/// If no IDs are present in the request rows, the entire watch is removed.
196pub async fn handle_unsub(
197    req: HttpRequest,
198    body: String,
199    state: web::Data<AppState>,
200) -> Result<HttpResponse, HaystackError> {
201    let content_type = req
202        .headers()
203        .get("Content-Type")
204        .and_then(|v| v.to_str().ok())
205        .unwrap_or("");
206    let accept = req
207        .headers()
208        .get("Accept")
209        .and_then(|v| v.to_str().ok())
210        .unwrap_or("");
211
212    let request_grid = content::decode_request_grid(&body, content_type)
213        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
214
215    let username = req
216        .extensions()
217        .get::<AuthUser>()
218        .map(|u| u.username.clone())
219        .unwrap_or_else(|| "anonymous".to_string());
220
221    let watch_id = request_grid
222        .meta
223        .get("watchId")
224        .and_then(|v| match v {
225            Kind::Str(s) => Some(s.clone()),
226            _ => None,
227        })
228        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
229
230    // Collect entity IDs from request rows for selective removal
231    let ids: Vec<String> = request_grid
232        .rows
233        .iter()
234        .filter_map(|row| match row.get("id") {
235            Some(Kind::Ref(r)) => Some(r.val.clone()),
236            _ => None,
237        })
238        .collect();
239
240    if ids.is_empty() {
241        // No IDs specified — remove the entire watch
242        if !state.watches.unsubscribe(&watch_id, &username) {
243            return Err(HaystackError::not_found(format!(
244                "watch not found: {watch_id}"
245            )));
246        }
247    } else {
248        // Selective removal — remove only the specified IDs from the watch
249        if !state.watches.remove_ids(&watch_id, &username, &ids) {
250            return Err(HaystackError::not_found(format!(
251                "watch not found: {watch_id}"
252            )));
253        }
254
255        // Remove remote watch tracking for any federated IDs being unsubscribed.
256        for id in &ids {
257            if let Some(connector) = state.federation.owner_of(id) {
258                connector.remove_remote_watch(id);
259            }
260        }
261    }
262
263    respond_grid(&HGrid::new(), accept)
264}
265
266fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
267    let (encoded, ct) = content::encode_response_grid(grid, accept)
268        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
269    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
270}