Skip to main content

haystack_server/ops/
watch.rs

1//! Watch ops — subscribe, poll, and unsubscribe for entity changes.
2
3use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::Kind;
7
8use crate::auth::AuthUser;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13/// POST /api/watchSub — subscribe to entity changes.
14///
15/// Request grid has `id` column with Ref values for entities to watch.
16/// May also have a `watchId` column to add IDs to an existing watch.
17/// Returns the current state of watched entities with `watchId` in grid meta.
18pub async fn handle_sub(
19    req: HttpRequest,
20    body: String,
21    state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23    let content_type = req
24        .headers()
25        .get("Content-Type")
26        .and_then(|v| v.to_str().ok())
27        .unwrap_or("");
28    let accept = req
29        .headers()
30        .get("Accept")
31        .and_then(|v| v.to_str().ok())
32        .unwrap_or("");
33
34    let request_grid = content::decode_request_grid(&body, content_type)
35        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37    let username = req
38        .extensions()
39        .get::<AuthUser>()
40        .map(|u| u.username.clone())
41        .unwrap_or_else(|| "anonymous".to_string());
42
43    // Collect entity IDs
44    let ids: Vec<String> = request_grid
45        .rows
46        .iter()
47        .filter_map(|row| match row.get("id") {
48            Some(Kind::Ref(r)) => Some(r.val.clone()),
49            _ => None,
50        })
51        .collect();
52
53    if ids.is_empty() {
54        return Err(HaystackError::bad_request(
55            "request must have 'id' column with Ref values",
56        ));
57    }
58
59    // Check for existing watchId in grid meta (add to existing watch)
60    let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
61        Kind::Str(s) => Some(s.clone()),
62        _ => None,
63    });
64
65    let watch_id = if let Some(ref wid) = existing_watch_id {
66        if state.watches.add_ids(wid, &username, ids.clone()) {
67            wid.clone()
68        } else {
69            return Err(HaystackError::not_found(format!("watch not found: {wid}")));
70        }
71    } else {
72        let graph_version = state.graph.version();
73        state
74            .watches
75            .subscribe(&username, ids.clone(), graph_version)
76            .map_err(HaystackError::bad_request)?
77    };
78
79    // Return current state of watched entities
80    let mut rows: Vec<HDict> = Vec::new();
81    let mut col_set: Vec<String> = Vec::new();
82    let mut seen = std::collections::HashSet::new();
83
84    for id in &ids {
85        if let Some(entity) = state.graph.get(id) {
86            for name in entity.tag_names() {
87                if seen.insert(name.to_string()) {
88                    col_set.push(name.to_string());
89                }
90            }
91            rows.push(entity);
92        }
93    }
94
95    col_set.sort();
96    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
97
98    let mut meta = HDict::new();
99    meta.set("watchId", Kind::Str(watch_id));
100
101    let grid = HGrid::from_parts(meta, cols, rows);
102    let (encoded, ct) = content::encode_response_grid(&grid, accept)
103        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
104
105    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
106}
107
108/// POST /api/watchPoll — poll for changes since last poll.
109///
110/// Request grid has `watchId` in the grid meta.
111/// Returns changed entities since last poll.
112pub async fn handle_poll(
113    req: HttpRequest,
114    body: String,
115    state: web::Data<AppState>,
116) -> Result<HttpResponse, HaystackError> {
117    let content_type = req
118        .headers()
119        .get("Content-Type")
120        .and_then(|v| v.to_str().ok())
121        .unwrap_or("");
122    let accept = req
123        .headers()
124        .get("Accept")
125        .and_then(|v| v.to_str().ok())
126        .unwrap_or("");
127
128    let request_grid = content::decode_request_grid(&body, content_type)
129        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
130
131    let username = req
132        .extensions()
133        .get::<AuthUser>()
134        .map(|u| u.username.clone())
135        .unwrap_or_else(|| "anonymous".to_string());
136
137    let watch_id = request_grid
138        .meta
139        .get("watchId")
140        .and_then(|v| match v {
141            Kind::Str(s) => Some(s.clone()),
142            _ => None,
143        })
144        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
145
146    let changed = state
147        .watches
148        .poll(&watch_id, &username, &state.graph)
149        .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
150
151    if changed.is_empty() {
152        return respond_grid(&HGrid::new(), accept);
153    }
154
155    // Build grid from changed entities
156    let mut col_set: Vec<String> = Vec::new();
157    let mut seen = std::collections::HashSet::new();
158    for entity in &changed {
159        for name in entity.tag_names() {
160            if seen.insert(name.to_string()) {
161                col_set.push(name.to_string());
162            }
163        }
164    }
165    col_set.sort();
166    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
167
168    let grid = HGrid::from_parts(HDict::new(), cols, changed);
169    respond_grid(&grid, accept)
170}
171
172/// POST /api/watchUnsub — unsubscribe from a watch.
173///
174/// Request grid has `watchId` in the grid meta.
175/// Request rows may have `id` column to remove specific IDs from the watch.
176/// If no IDs are present in the request rows, the entire watch is removed.
177pub async fn handle_unsub(
178    req: HttpRequest,
179    body: String,
180    state: web::Data<AppState>,
181) -> Result<HttpResponse, HaystackError> {
182    let content_type = req
183        .headers()
184        .get("Content-Type")
185        .and_then(|v| v.to_str().ok())
186        .unwrap_or("");
187    let accept = req
188        .headers()
189        .get("Accept")
190        .and_then(|v| v.to_str().ok())
191        .unwrap_or("");
192
193    let request_grid = content::decode_request_grid(&body, content_type)
194        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
195
196    let username = req
197        .extensions()
198        .get::<AuthUser>()
199        .map(|u| u.username.clone())
200        .unwrap_or_else(|| "anonymous".to_string());
201
202    let watch_id = request_grid
203        .meta
204        .get("watchId")
205        .and_then(|v| match v {
206            Kind::Str(s) => Some(s.clone()),
207            _ => None,
208        })
209        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
210
211    // Collect entity IDs from request rows for selective removal
212    let ids: Vec<String> = request_grid
213        .rows
214        .iter()
215        .filter_map(|row| match row.get("id") {
216            Some(Kind::Ref(r)) => Some(r.val.clone()),
217            _ => None,
218        })
219        .collect();
220
221    if ids.is_empty() {
222        // No IDs specified — remove the entire watch
223        if !state.watches.unsubscribe(&watch_id, &username) {
224            return Err(HaystackError::not_found(format!(
225                "watch not found: {watch_id}"
226            )));
227        }
228    } else {
229        // Selective removal — remove only the specified IDs from the watch
230        if !state.watches.remove_ids(&watch_id, &username, &ids) {
231            return Err(HaystackError::not_found(format!(
232                "watch not found: {watch_id}"
233            )));
234        }
235    }
236
237    respond_grid(&HGrid::new(), accept)
238}
239
240fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
241    let (encoded, ct) = content::encode_response_grid(grid, accept)
242        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
243    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
244}