haystack_server/ops/
read.rs1use actix_web::web::Bytes;
38use actix_web::{HttpRequest, HttpResponse, web};
39use futures_util::stream;
40
41use haystack_core::data::{HCol, HDict, HGrid};
42use haystack_core::kinds::{HRef, Kind};
43use std::sync::Arc;
44
45use crate::content;
46use crate::error::HaystackError;
47use crate::state::AppState;
48
49pub async fn handle(
55 req: HttpRequest,
56 body: String,
57 state: web::Data<AppState>,
58) -> Result<HttpResponse, HaystackError> {
59 let content_type = req
60 .headers()
61 .get("Content-Type")
62 .and_then(|v| v.to_str().ok())
63 .unwrap_or("");
64 let accept = req
65 .headers()
66 .get("Accept")
67 .and_then(|v| v.to_str().ok())
68 .unwrap_or("");
69
70 let request_grid = content::decode_request_grid(&body, content_type)
71 .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
72
73 let result_grid = if request_grid.col("id").is_some() {
74 read_by_id(&request_grid, &state)
76 } else if request_grid.col("filter").is_some() {
77 read_by_filter(&request_grid, &state)
79 } else {
80 Err(HaystackError::bad_request(
81 "request must have 'id' or 'filter' column",
82 ))
83 }?;
84
85 if result_grid.rows.len() > 25_000 {
86 let (header, rows, ct) = content::encode_response_streaming(&result_grid, accept)
87 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
88
89 if rows.is_empty() {
90 return Ok(HttpResponse::Ok().content_type(ct).body(header));
91 }
92
93 let chunks = std::iter::once(Ok::<Bytes, actix_web::Error>(Bytes::from(header)))
94 .chain(rows.into_iter().map(|r| Ok(Bytes::from(r))));
95
96 return Ok(HttpResponse::Ok()
97 .content_type(ct)
98 .streaming(stream::iter(chunks)));
99 }
100
101 let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
102 .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
103
104 Ok(HttpResponse::Ok().content_type(ct).body(encoded))
105}
106
107fn read_by_filter(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
112 let row = request_grid
113 .row(0)
114 .ok_or_else(|| HaystackError::bad_request("request grid has no rows"))?;
115
116 let filter = match row.get("filter") {
117 Some(Kind::Str(s)) => s.as_str(),
118 _ => return Err(HaystackError::bad_request("filter must be a Str value")),
119 };
120
121 let limit = match row.get("limit") {
122 Some(Kind::Number(n)) => n.val as usize,
123 _ => 0, };
125
126 let effective_limit = if limit == 0 { usize::MAX } else { limit };
127 let is_wildcard = filter == "*";
128
129 let mut results: Vec<Arc<HDict>> = if is_wildcard {
131 state
133 .graph
134 .all_entities()
135 .into_iter()
136 .map(Arc::new)
137 .collect()
138 } else {
139 let local_grid = state
140 .graph
141 .read_filter(filter, limit)
142 .map_err(|e| HaystackError::bad_request(format!("filter error: {e}")))?;
143 local_grid.rows.into_iter().map(Arc::new).collect()
144 };
145
146 if results.len() > effective_limit {
148 results.truncate(effective_limit);
149 }
150
151 if results.len() < effective_limit {
153 let remaining = effective_limit - results.len();
154 if is_wildcard {
155 let federated = state.federation.all_cached_entities();
157 for entity in federated {
158 if results.len() >= effective_limit {
159 break;
160 }
161 results.push(entity);
162 }
163 } else {
164 match state.federation.filter_cached_entities(filter, remaining) {
166 Ok(federated) => results.extend(federated),
167 Err(e) => {
168 return Err(HaystackError::bad_request(format!(
169 "federation filter error: {e}"
170 )));
171 }
172 }
173 }
174 }
175
176 if results.is_empty() {
177 return Ok(HGrid::new());
178 }
179
180 let mut col_set: Vec<&str> = Vec::new();
182 let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
183 for entity in &results {
184 for name in entity.tag_names() {
185 if seen.insert(name) {
186 col_set.push(name);
187 }
188 }
189 }
190 col_set.sort_unstable();
191 let cols: Vec<HCol> = col_set.iter().map(|&n| HCol::new(n)).collect();
192
193 Ok(HGrid::from_parts_arc(HDict::new(), cols, results))
194}
195
196fn read_by_id(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
202 let mut results: Vec<Arc<HDict>> = Vec::new();
203 let mut unknown_ids: Vec<String> = Vec::new();
204
205 for row in request_grid.rows.iter() {
207 let ref_val = match row.get("id") {
208 Some(Kind::Ref(r)) => &r.val,
209 _ => continue,
210 };
211
212 if let Some(entity) = state.graph.get(ref_val) {
213 results.push(Arc::new(entity));
214 } else {
215 unknown_ids.push(ref_val.clone());
216 }
217 }
218
219 if !unknown_ids.is_empty() && !state.federation.connectors.is_empty() {
221 let id_refs: Vec<&str> = unknown_ids.iter().map(|s| s.as_str()).collect();
222 let (found, still_missing) = state.federation.batch_read_by_id(id_refs);
223 results.extend(found);
224
225 for id in still_missing {
227 let mut missing = HDict::new();
228 missing.set("id", Kind::Ref(HRef::from_val(id.as_str())));
229 results.push(Arc::new(missing));
230 }
231 } else {
232 for id in unknown_ids {
234 let mut missing = HDict::new();
235 missing.set("id", Kind::Ref(HRef::from_val(id.as_str())));
236 results.push(Arc::new(missing));
237 }
238 }
239
240 if results.is_empty() {
241 return Ok(HGrid::new());
242 }
243
244 let mut col_set: Vec<&str> = Vec::new();
246 let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
247 for entity in &results {
248 for name in entity.tag_names() {
249 if seen.insert(name) {
250 col_set.push(name);
251 }
252 }
253 }
254
255 col_set.sort_unstable();
256 let cols: Vec<HCol> = col_set.iter().map(|&n| HCol::new(n)).collect();
257 Ok(HGrid::from_parts_arc(HDict::new(), cols, results))
258}