haystack_server/ops/
watch.rs1use actix_web::{HttpMessage, HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::Kind;
7
8use crate::auth::AuthUser;
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13pub async fn handle_sub(
19 req: HttpRequest,
20 body: String,
21 state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23 let content_type = req
24 .headers()
25 .get("Content-Type")
26 .and_then(|v| v.to_str().ok())
27 .unwrap_or("");
28 let accept = req
29 .headers()
30 .get("Accept")
31 .and_then(|v| v.to_str().ok())
32 .unwrap_or("");
33
34 let request_grid = content::decode_request_grid(&body, content_type)
35 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37 let username = req
38 .extensions()
39 .get::<AuthUser>()
40 .map(|u| u.username.clone())
41 .unwrap_or_else(|| "anonymous".to_string());
42
43 let ids: Vec<String> = request_grid
45 .rows
46 .iter()
47 .filter_map(|row| match row.get("id") {
48 Some(Kind::Ref(r)) => Some(r.val.clone()),
49 _ => None,
50 })
51 .collect();
52
53 if ids.is_empty() {
54 return Err(HaystackError::bad_request(
55 "request must have 'id' column with Ref values",
56 ));
57 }
58
59 let existing_watch_id = request_grid.meta.get("watchId").and_then(|v| match v {
61 Kind::Str(s) => Some(s.clone()),
62 _ => None,
63 });
64
65 let watch_id = if let Some(ref wid) = existing_watch_id {
66 if state.watches.add_ids(wid, &username, ids.clone()) {
67 wid.clone()
68 } else {
69 return Err(HaystackError::not_found(format!("watch not found: {wid}")));
70 }
71 } else {
72 let graph_version = state.graph.version();
73 state
74 .watches
75 .subscribe(&username, ids.clone(), graph_version)
76 .map_err(HaystackError::bad_request)?
77 };
78
79 let mut rows: Vec<HDict> = Vec::new();
81 let mut col_set: Vec<String> = Vec::new();
82 let mut seen = std::collections::HashSet::new();
83
84 for id in &ids {
85 if let Some(entity) = state.graph.get(id) {
86 for name in entity.tag_names() {
87 if seen.insert(name.to_string()) {
88 col_set.push(name.to_string());
89 }
90 }
91 rows.push(entity);
92 }
93 }
94
95 col_set.sort();
96 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
97
98 let mut meta = HDict::new();
99 meta.set("watchId", Kind::Str(watch_id));
100
101 let grid = HGrid::from_parts(meta, cols, rows);
102 let (encoded, ct) = content::encode_response_grid(&grid, accept)
103 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
104
105 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
106}
107
108pub async fn handle_poll(
113 req: HttpRequest,
114 body: String,
115 state: web::Data<AppState>,
116) -> Result<HttpResponse, HaystackError> {
117 let content_type = req
118 .headers()
119 .get("Content-Type")
120 .and_then(|v| v.to_str().ok())
121 .unwrap_or("");
122 let accept = req
123 .headers()
124 .get("Accept")
125 .and_then(|v| v.to_str().ok())
126 .unwrap_or("");
127
128 let request_grid = content::decode_request_grid(&body, content_type)
129 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
130
131 let username = req
132 .extensions()
133 .get::<AuthUser>()
134 .map(|u| u.username.clone())
135 .unwrap_or_else(|| "anonymous".to_string());
136
137 let watch_id = request_grid
138 .meta
139 .get("watchId")
140 .and_then(|v| match v {
141 Kind::Str(s) => Some(s.clone()),
142 _ => None,
143 })
144 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
145
146 let changed = state
147 .watches
148 .poll(&watch_id, &username, &state.graph)
149 .ok_or_else(|| HaystackError::not_found(format!("watch not found: {watch_id}")))?;
150
151 if changed.is_empty() {
152 return respond_grid(&HGrid::new(), accept);
153 }
154
155 let mut col_set: Vec<String> = Vec::new();
157 let mut seen = std::collections::HashSet::new();
158 for entity in &changed {
159 for name in entity.tag_names() {
160 if seen.insert(name.to_string()) {
161 col_set.push(name.to_string());
162 }
163 }
164 }
165 col_set.sort();
166 let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
167
168 let grid = HGrid::from_parts(HDict::new(), cols, changed);
169 respond_grid(&grid, accept)
170}
171
172pub async fn handle_unsub(
178 req: HttpRequest,
179 body: String,
180 state: web::Data<AppState>,
181) -> Result<HttpResponse, HaystackError> {
182 let content_type = req
183 .headers()
184 .get("Content-Type")
185 .and_then(|v| v.to_str().ok())
186 .unwrap_or("");
187 let accept = req
188 .headers()
189 .get("Accept")
190 .and_then(|v| v.to_str().ok())
191 .unwrap_or("");
192
193 let request_grid = content::decode_request_grid(&body, content_type)
194 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
195
196 let username = req
197 .extensions()
198 .get::<AuthUser>()
199 .map(|u| u.username.clone())
200 .unwrap_or_else(|| "anonymous".to_string());
201
202 let watch_id = request_grid
203 .meta
204 .get("watchId")
205 .and_then(|v| match v {
206 Kind::Str(s) => Some(s.clone()),
207 _ => None,
208 })
209 .ok_or_else(|| HaystackError::bad_request("request meta must have 'watchId'"))?;
210
211 let ids: Vec<String> = request_grid
213 .rows
214 .iter()
215 .filter_map(|row| match row.get("id") {
216 Some(Kind::Ref(r)) => Some(r.val.clone()),
217 _ => None,
218 })
219 .collect();
220
221 if ids.is_empty() {
222 if !state.watches.unsubscribe(&watch_id, &username) {
224 return Err(HaystackError::not_found(format!(
225 "watch not found: {watch_id}"
226 )));
227 }
228 } else {
229 if !state.watches.remove_ids(&watch_id, &username, &ids) {
231 return Err(HaystackError::not_found(format!(
232 "watch not found: {watch_id}"
233 )));
234 }
235 }
236
237 respond_grid(&HGrid::new(), accept)
238}
239
240fn respond_grid(grid: &HGrid, accept: &str) -> Result<HttpResponse, HaystackError> {
241 let (encoded, ct) = content::encode_response_grid(grid, accept)
242 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
243 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
244}