Skip to main content

haystack_server/ops/
watch.rs

1//! Watch ops — subscribe, poll, and unsubscribe for entity changes.
2
3use axum::Extension;
4use axum::extract::State;
5use axum::http::HeaderMap;
6use axum::response::{IntoResponse, Response};
7
8use haystack_core::data::{HCol, HDict, HGrid};
9use haystack_core::kinds::Kind;
10
11use crate::auth::AuthUser;
12use crate::content;
13use crate::error::HaystackError;
14use crate::state::SharedState;
15
16/// POST /api/watchSub — subscribe to entity changes.
17pub async fn handle_sub(
18    State(state): State<SharedState>,
19    headers: HeaderMap,
20    auth: Option<Extension<AuthUser>>,
21    body: String,
22) -> Result<Response, HaystackError> {
23    let content_type = headers
24        .get("Content-Type")
25        .and_then(|v| v.to_str().ok())
26        .unwrap_or("");
27    let accept = headers
28        .get("Accept")
29        .and_then(|v| v.to_str().ok())
30        .unwrap_or("");
31
32    let request_grid = content::decode_request_grid(&body, content_type)
33        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
34
35    let username = auth
36        .map(|Extension(u)| u.username.clone())
37        .unwrap_or_else(|| "anonymous".into());
38
39    // Collect entity IDs
40    let ids: Vec<String> = request_grid
41        .rows
42        .iter()
43        .filter_map(|row| match row.get("id") {
44            Some(Kind::Ref(r)) => Some(r.val.clone()),
45            _ => None,
46        })
47        .collect();
48
49    if ids.is_empty() {
50        return Err(HaystackError::bad_request(
51            "request must have 'id' column with Ref values",
52        ));
53    }
54
55    // Check for existing watchId in grid meta (add to existing watch)
56    let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
57        Kind::Str(s) => Some(s.clone()),
58        _ => None,
59    });
60
61    let watch_id = if let Some(ref wid) = existing_watch_id {
62        if state.watches.add_ids(wid, &username, ids.clone()) {
63            wid.clone()
64        } else {
65            return Err(HaystackError::not_found(format!("watch not found: {wid}")));
66        }
67    } else {
68        let graph_version = state.graph.version();
69        state
70            .watches
71            .subscribe(&username, ids.clone(), graph_version)
72            .map_err(HaystackError::bad_request)?
73    };
74
75    // Return current state of watched entities
76    let mut rows: Vec<HDict> = Vec::new();
77    let mut col_set: Vec<String> = Vec::new();
78    let mut seen = std::collections::HashSet::new();
79
80    for id in &ids {
81        if let Some(entity) = state.graph.get(id) {
82            for name in entity.tag_names() {
83                if seen.insert(name.to_string()) {
84                    col_set.push(name.to_string());
85                }
86            }
87            rows.push(entity);
88        }
89    }
90
91    col_set.sort();
92    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
93
94    let mut meta = HDict::new();
95    meta.set("watchId", Kind::Str(watch_id));
96
97    let grid = HGrid::from_parts(meta, cols, rows);
98    let (encoded, ct) = content::encode_response_grid(&grid, accept)
99        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
100
101    Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
102}
103
104/// POST /api/watchPoll — poll for changes since last poll.
105pub async fn handle_poll(
106    State(state): State<SharedState>,
107    headers: HeaderMap,
108    auth: Option<Extension<AuthUser>>,
109    body: String,
110) -> Result<Response, HaystackError> {
111    let content_type = headers
112        .get("Content-Type")
113        .and_then(|v| v.to_str().ok())
114        .unwrap_or("");
115    let accept = headers
116        .get("Accept")
117        .and_then(|v| v.to_str().ok())
118        .unwrap_or("");
119
120    let request_grid = content::decode_request_grid(&body, content_type)
121        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
122
123    let username = auth
124        .map(|Extension(u)| u.username.clone())
125        .unwrap_or_else(|| "anonymous".into());
126
127    let watch_id = request_grid
128        .meta
129        .get("watchId")
130        .and_then(|v| match v {
131            Kind::Str(s) => Some(s.clone()),
132            _ => None,
133        })
134        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
135
136    let changed = state
137        .watches
138        .poll(&watch_id, &username, &state.graph)
139        .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
140
141    if changed.is_empty() {
142        return respond_grid(&HGrid::new(), accept);
143    }
144
145    // Build grid from changed entities
146    let mut col_set: Vec<String> = Vec::new();
147    let mut seen = std::collections::HashSet::new();
148    for entity in &changed {
149        for name in entity.tag_names() {
150            if seen.insert(name.to_string()) {
151                col_set.push(name.to_string());
152            }
153        }
154    }
155    col_set.sort();
156    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
157
158    let grid = HGrid::from_parts(HDict::new(), cols, changed);
159    respond_grid(&grid, accept)
160}
161
162/// POST /api/watchUnsub — unsubscribe from a watch.
163pub async fn handle_unsub(
164    State(state): State<SharedState>,
165    headers: HeaderMap,
166    auth: Option<Extension<AuthUser>>,
167    body: String,
168) -> Result<Response, HaystackError> {
169    let content_type = headers
170        .get("Content-Type")
171        .and_then(|v| v.to_str().ok())
172        .unwrap_or("");
173    let accept = headers
174        .get("Accept")
175        .and_then(|v| v.to_str().ok())
176        .unwrap_or("");
177
178    let request_grid = content::decode_request_grid(&body, content_type)
179        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
180
181    let username = auth
182        .map(|Extension(u)| u.username.clone())
183        .unwrap_or_else(|| "anonymous".into());
184
185    let watch_id = request_grid
186        .meta
187        .get("watchId")
188        .and_then(|v| match v {
189            Kind::Str(s) => Some(s.clone()),
190            _ => None,
191        })
192        .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
193
194    // Collect entity IDs from request rows for selective removal
195    let ids: Vec<String> = request_grid
196        .rows
197        .iter()
198        .filter_map(|row| match row.get("id") {
199            Some(Kind::Ref(r)) => Some(r.val.clone()),
200            _ => None,
201        })
202        .collect();
203
204    if ids.is_empty() {
205        // No IDs specified — remove the entire watch
206        if !state.watches.unsubscribe(&watch_id, &username) {
207            return Err(HaystackError::not_found(format!(
208                "watch not found: {watch_id}"
209            )));
210        }
211    } else {
212        // Selective removal — remove only the specified IDs from the watch
213        if !state.watches.remove_ids(&watch_id, &username, &ids) {
214            return Err(HaystackError::not_found(format!(
215                "watch not found: {watch_id}"
216            )));
217        }
218    }
219
220    respond_grid(&HGrid::new(), accept)
221}
222
223fn respond_grid(grid: &HGrid, accept: &str) -> Result<Response, HaystackError> {
224    let (encoded, ct) = content::encode_response_grid(grid, accept)
225        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
226    Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
227}