Skip to main content

haystack_server/ops/
watch.rs

1//! Watch ops — subscribe, poll, and unsubscribe for entity changes.
2
3use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::Kind;
7
8use crate::auth::AuthUser;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13/// POST /api/watchSub — subscribe to entity changes.
14///
15/// Request grid has `id` column with Ref values for entities to watch.
16/// May also have a `watchId` column to add IDs to an existing watch.
17/// Returns the current state of watched entities with `watchId` in grid meta.
18pub async fn handle_sub(
19    req: HttpRequest,
20    body: String,
21    state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23    let content_type = req
24        .headers()
25        .get("Content-Type")
26        .and_then(|v| v.to_str().ok())
27        .unwrap_or("");
28    let accept = req
29        .headers()
30        .get("Accept")
31        .and_then(|v| v.to_str().ok())
32        .unwrap_or("");
33
34    let request_grid = content::decode_request_grid(&body, content_type)
35        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37    let username = req
38        .extensions()
39        .get::<AuthUser>()
40        .map(|u| u.username.clone())
41        .unwrap_or_else(|| "anonymous".to_string());
42
43    // Collect entity IDs
44    let ids: Vec<String> = request_grid
45        .rows
46        .iter()
47        .filter_map(|row| match row.get("id") {
48            Some(Kind::Ref(r)) => Some(r.val.clone()),
49            _ => None,
50        })
51        .collect();
52
53    if ids.is_empty() {
54        return Err(HaystackError::bad_request(
55            "request must have 'id' column with Ref values",
56        ));
57    }
58
59    // Check for existing watchId in grid meta (add to existing watch)
60    let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
61        Kind::Str(s) => Some(s.clone()),
62        _ => None,
63    });
64
65    let watch_id = if let Some(ref wid) = existing_watch_id {
66        if state.watches.add_ids(wid, &username, ids.clone()) {
67            wid.clone()
68        } else {
69            return Err(HaystackError::not_found(format!("watch not found: {wid}")));
70        }
71    } else {
72        let graph_version = state.graph.version();
73        state
74            .watches
75            .subscribe(&username, ids.clone(), graph_version)
76            .map_err(HaystackError::bad_request)?
77    };
78
79    // Return current state of watched entities
80    let mut rows: Vec<HDict> = Vec::new();
81    let mut col_set: Vec<String> = Vec::new();
82    let mut seen = std::collections::HashSet::new();
83
84    for id in &ids {
85        if let Some(entity) = state.graph.get(id) {
86            // Local entity — include directly.
87            for name in entity.tag_names() {
88                if seen.insert(name.to_string()) {
89                    col_set.push(name.to_string());
90                }
91            }
92            rows.push(entity);
93        } else if let Some(connector) = state.federation.owner_of(id) {
94            // Federated entity — look up in the connector's cache and register
95            // the ID for remote watch tracking.
96            let cached = connector.cached_entities();
97            if let Some(entity) = cached
98                .iter()
99                .find(|e| matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *id))
100            {
101                for name in entity.tag_names() {
102                    if seen.insert(name.to_string()) {
103                        col_set.push(name.to_string());
104                    }
105                }
106                rows.push(entity.clone());
107            }
108            connector.add_remote_watch(id);
109            // TODO: For real-time push, establish a WS watch subscription on
110            // the remote server. Currently the background sync loop keeps the
111            // cache fresh and watchPoll detects changes from cache updates.
112        }
113    }
114
115    col_set.sort();
116    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
117
118    let mut meta = HDict::new();
119    meta.set("watchId", Kind::Str(watch_id));
120
121    let grid = HGrid::from_parts(meta, cols, rows);
122    let (encoded, ct) = content::encode_response_grid(&grid, accept)
123        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
124
125    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
126}
127
128/// POST /api/watchPoll — poll for changes since last poll.
129///
130/// Request grid has `watchId` in the grid meta.
131/// Returns changed entities since last poll.
132pub async fn handle_poll(
133    req: HttpRequest,
134    body: String,
135    state: web::Data<AppState>,
136) -> Result<HttpResponse, HaystackError> {
137    let content_type = req
138        .headers()
139        .get("Content-Type")
140        .and_then(|v| v.to_str().ok())
141        .unwrap_or("");
142    let accept = req
143        .headers()
144        .get("Accept")
145        .and_then(|v| v.to_str().ok())
146        .unwrap_or("");
147
148    let request_grid = content::decode_request_grid(&body, content_type)
149        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
150
151    let username = req
152        .extensions()
153        .get::<AuthUser>()
154        .map(|u| u.username.clone())
155        .unwrap_or_else(|| "anonymous".to_string());
156
157    let watch_id = request_grid
158        .meta
159        .get("watchId")
160        .and_then(|v| match v {
161            Kind::Str(s) => Some(s.clone()),
162            _ => None,
163        })
164        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
165
166    let changed = state
167        .watches
168        .poll(&watch_id, &username, &state.graph)
169        .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
170
171    if changed.is_empty() {
172        return respond_grid(&HGrid::new(), accept);
173    }
174
175    // Build grid from changed entities
176    let mut col_set: Vec<String> = Vec::new();
177    let mut seen = std::collections::HashSet::new();
178    for entity in &changed {
179        for name in entity.tag_names() {
180            if seen.insert(name.to_string()) {
181                col_set.push(name.to_string());
182            }
183        }
184    }
185    col_set.sort();
186    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
187
188    let grid = HGrid::from_parts(HDict::new(), cols, changed);
189    respond_grid(&grid, accept)
190}
191
192/// POST /api/watchUnsub — unsubscribe from a watch.
193///
194/// Request grid has `watchId` in the grid meta.
195/// Request rows may have `id` column to remove specific IDs from the watch.
196/// If no IDs are present in the request rows, the entire watch is removed.
197pub async fn handle_unsub(
198    req: HttpRequest,
199    body: String,
200    state: web::Data<AppState>,
201) -> Result<HttpResponse, HaystackError> {
202    let content_type = req
203        .headers()
204        .get("Content-Type")
205        .and_then(|v| v.to_str().ok())
206        .unwrap_or("");
207    let accept = req
208        .headers()
209        .get("Accept")
210        .and_then(|v| v.to_str().ok())
211        .unwrap_or("");
212
213    let request_grid = content::decode_request_grid(&body, content_type)
214        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
215
216    let username = req
217        .extensions()
218        .get::<AuthUser>()
219        .map(|u| u.username.clone())
220        .unwrap_or_else(|| "anonymous".to_string());
221
222    let watch_id = request_grid
223        .meta
224        .get("watchId")
225        .and_then(|v| match v {
226            Kind::Str(s) => Some(s.clone()),
227            _ => None,
228        })
229        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
230
231    // Collect entity IDs from request rows for selective removal
232    let ids: Vec<String> = request_grid
233        .rows
234        .iter()
235        .filter_map(|row| match row.get("id") {
236            Some(Kind::Ref(r)) => Some(r.val.clone()),
237            _ => None,
238        })
239        .collect();
240
241    if ids.is_empty() {
242        // No IDs specified — remove the entire watch
243        if !state.watches.unsubscribe(&watch_id, &username) {
244            return Err(HaystackError::not_found(format!(
245                "watch not found: {watch_id}"
246            )));
247        }
248    } else {
249        // Selective removal — remove only the specified IDs from the watch
250        if !state.watches.remove_ids(&watch_id, &username, &ids) {
251            return Err(HaystackError::not_found(format!(
252                "watch not found: {watch_id}"
253            )));
254        }
255
256        // Remove remote watch tracking for any federated IDs being unsubscribed.
257        for id in &ids {
258            if let Some(connector) = state.federation.owner_of(id) {
259                connector.remove_remote_watch(id);
260            }
261        }
262    }
263
264    respond_grid(&HGrid::new(), accept)
265}
266
267fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
268    let (encoded, ct) = content::encode_response_grid(grid, accept)
269        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
270    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
271}