haystack_server/ops/
watch.rs1use axum::Extension;
4use axum::extract::State;
5use axum::http::HeaderMap;
6use axum::response::{IntoResponse, Response};
7
8use haystack_core::data::{HCol, HDict, HGrid};
9use haystack_core::kinds::Kind;
10
11use crate::auth::AuthUser;
12use crate::content;
13use crate::error::HaystackError;
14use crate::state::SharedState;
15
16pub async fn handle_sub(
18 State(state): State<SharedState>,
19 headers: HeaderMap,
20 auth: Option<Extension<AuthUser>>,
21 body: String,
22) -> Result<Response, HaystackError> {
23 let content_type = headers
24 .get("Content-Type")
25 .and_then(|v| v.to_str().ok())
26 .unwrap_or("");
27 let accept = headers
28 .get("Accept")
29 .and_then(|v| v.to_str().ok())
30 .unwrap_or("");
31
32 let request_grid = content::decode_request_grid(&body, content_type)
33 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
34
35 let username = auth
36 .map(|Extension(u)| u.username.clone())
37 .unwrap_or_else(|| "anonymous".into());
38
39 let ids: Vec<String> = request_grid
41 .rows
42 .iter()
43 .filter_map(|row| match row.get("id") {
44 Some(Kind::Ref(r)) => Some(r.val.clone()),
45 _ => None,
46 })
47 .collect();
48
49 if ids.is_empty() {
50 return Err(HaystackError::bad_request(
51 "request must have 'id' column with Ref values",
52 ));
53 }
54
55 let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
57 Kind::Str(s) => Some(s.clone()),
58 _ => None,
59 });
60
61 let watch_id = if let Some(ref wid) = existing_watch_id {
62 if state.watches.add_ids(wid, &username, ids.clone()) {
63 wid.clone()
64 } else {
65 return Err(HaystackError::not_found(format!("watch not found: {wid}")));
66 }
67 } else {
68 let graph_version = state.graph.version();
69 state
70 .watches
71 .subscribe(&username, ids.clone(), graph_version)
72 .map_err(HaystackError::bad_request)?
73 };
74
75 let mut rows: Vec<HDict> = Vec::new();
77 let mut col_set: Vec<String> = Vec::new();
78 let mut seen = std::collections::HashSet::new();
79
80 for id in &ids {
81 if let Some(entity) = state.graph.get(id) {
82 for name in entity.tag_names() {
83 if seen.insert(name.to_string()) {
84 col_set.push(name.to_string());
85 }
86 }
87 rows.push(entity);
88 }
89 }
90
91 col_set.sort();
92 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
93
94 let mut meta = HDict::new();
95 meta.set("watchId", Kind::Str(watch_id));
96
97 let grid = HGrid::from_parts(meta, cols, rows);
98 let (encoded, ct) = content::encode_response_grid(&grid, accept)
99 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
100
101 Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
102}
103
104pub async fn handle_poll(
106 State(state): State<SharedState>,
107 headers: HeaderMap,
108 auth: Option<Extension<AuthUser>>,
109 body: String,
110) -> Result<Response, HaystackError> {
111 let content_type = headers
112 .get("Content-Type")
113 .and_then(|v| v.to_str().ok())
114 .unwrap_or("");
115 let accept = headers
116 .get("Accept")
117 .and_then(|v| v.to_str().ok())
118 .unwrap_or("");
119
120 let request_grid = content::decode_request_grid(&body, content_type)
121 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
122
123 let username = auth
124 .map(|Extension(u)| u.username.clone())
125 .unwrap_or_else(|| "anonymous".into());
126
127 let watch_id = request_grid
128 .meta
129 .get("watchId")
130 .and_then(|v| match v {
131 Kind::Str(s) => Some(s.clone()),
132 _ => None,
133 })
134 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
135
136 let changed = state
137 .watches
138 .poll(&watch_id, &username, &state.graph)
139 .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
140
141 if changed.is_empty() {
142 return respond_grid(&HGrid::new(), accept);
143 }
144
145 let mut col_set: Vec<String> = Vec::new();
147 let mut seen = std::collections::HashSet::new();
148 for entity in &changed {
149 for name in entity.tag_names() {
150 if seen.insert(name.to_string()) {
151 col_set.push(name.to_string());
152 }
153 }
154 }
155 col_set.sort();
156 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
157
158 let grid = HGrid::from_parts(HDict::new(), cols, changed);
159 respond_grid(&grid, accept)
160}
161
162pub async fn handle_unsub(
164 State(state): State<SharedState>,
165 headers: HeaderMap,
166 auth: Option<Extension<AuthUser>>,
167 body: String,
168) -> Result<Response, HaystackError> {
169 let content_type = headers
170 .get("Content-Type")
171 .and_then(|v| v.to_str().ok())
172 .unwrap_or("");
173 let accept = headers
174 .get("Accept")
175 .and_then(|v| v.to_str().ok())
176 .unwrap_or("");
177
178 let request_grid = content::decode_request_grid(&body, content_type)
179 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
180
181 let username = auth
182 .map(|Extension(u)| u.username.clone())
183 .unwrap_or_else(|| "anonymous".into());
184
185 let watch_id = request_grid
186 .meta
187 .get("watchId")
188 .and_then(|v| match v {
189 Kind::Str(s) => Some(s.clone()),
190 _ => None,
191 })
192 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
193
194 let ids: Vec<String> = request_grid
196 .rows
197 .iter()
198 .filter_map(|row| match row.get("id") {
199 Some(Kind::Ref(r)) => Some(r.val.clone()),
200 _ => None,
201 })
202 .collect();
203
204 if ids.is_empty() {
205 if !state.watches.unsubscribe(&watch_id, &username) {
207 return Err(HaystackError::not_found(format!(
208 "watch not found: {watch_id}"
209 )));
210 }
211 } else {
212 if !state.watches.remove_ids(&watch_id, &username, &ids) {
214 return Err(HaystackError::not_found(format!(
215 "watch not found: {watch_id}"
216 )));
217 }
218 }
219
220 respond_grid(&HGrid::new(), accept)
221}
222
223fn respond_grid(grid: &HGrid, accept: &str) -> Result<Response, HaystackError> {
224 let (encoded, ct) = content::encode_response_grid(grid, accept)
225 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
226 Ok(([(axum::http::header::CONTENT_TYPE, ct)], encoded).into_response())
227}