Skip to main content

haystack_server/ops/
read.rs

1//! The `read` op — read entities by filter or by id.
2//!
3//! # Overview
4//!
5//! `POST /api/read` returns entity records matching a filter expression or a
6//! list of explicit IDs. Results include local entities and, when federation is
7//! enabled, matching entities from remote connectors.
8//!
9//! # Request Grid Columns
10//!
11//! Two mutually-exclusive request forms:
12//!
13//! **Filter read** — first row contains:
14//!
15//! | Column   | Kind   | Description                                      |
16//! |----------|--------|--------------------------------------------------|
17//! | `filter` | Str    | Haystack filter expression (e.g. `"site"`, `"point and siteRef==@s1"`) |
18//! | `limit`  | Number | *(optional)* Max entities to return (0 = no limit) |
19//!
20//! **ID read** — each row contains:
21//!
22//! | Column | Kind | Description      |
23//! |--------|------|------------------|
24//! | `id`   | Ref  | Entity reference |
25//!
26//! # Response Grid Columns
27//!
28//! The response grid has one column per unique tag across all matched entities.
29//! For ID reads, rows for unknown IDs appear as stubs with only the `id` column.
30//!
31//! # Errors
32//!
33//! - **400 Bad Request** — missing `filter`/`id` column, invalid filter syntax, or
34//!   request decode failure.
35//! - **500 Internal Server Error** — graph or encoding error.
36
37use actix_web::web::Bytes;
38use actix_web::{HttpRequest, HttpResponse, web};
39use futures_util::stream;
40
41use haystack_core::data::{HCol, HDict, HGrid};
42use haystack_core::kinds::{HRef, Kind};
43use std::sync::Arc;
44
45use crate::content;
46use crate::error::HaystackError;
47use crate::state::AppState;
48
49/// POST /api/read
50///
51/// Request grid may have:
52/// - A `filter` column with a filter expression string, and optional `limit` column
53/// - An `id` column with Ref values for reading specific entities
54pub async fn handle(
55    req: HttpRequest,
56    body: String,
57    state: web::Data<AppState>,
58) -> Result<HttpResponse, HaystackError> {
59    let content_type = req
60        .headers()
61        .get("Content-Type")
62        .and_then(|v| v.to_str().ok())
63        .unwrap_or("");
64    let accept = req
65        .headers()
66        .get("Accept")
67        .and_then(|v| v.to_str().ok())
68        .unwrap_or("");
69
70    let request_grid = content::decode_request_grid(&body, content_type)
71        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
72
73    let result_grid = if request_grid.col("id").is_some() {
74        // Read by ID
75        read_by_id(&request_grid, &state)
76    } else if request_grid.col("filter").is_some() {
77        // Read by filter
78        read_by_filter(&request_grid, &state)
79    } else {
80        Err(HaystackError::bad_request(
81            "request must have 'id' or 'filter' column",
82        ))
83    }?;
84
85    if result_grid.rows.len() > 25_000 {
86        let (header, rows, ct) = content::encode_response_streaming(&result_grid, accept)
87            .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
88
89        if rows.is_empty() {
90            return Ok(HttpResponse::Ok().content_type(ct).body(header));
91        }
92
93        let chunks = std::iter::once(Ok::<Bytes, actix_web::Error>(Bytes::from(header)))
94            .chain(rows.into_iter().map(|r| Ok(Bytes::from(r))));
95
96        return Ok(HttpResponse::Ok()
97            .content_type(ct)
98            .streaming(stream::iter(chunks)));
99    }
100
101    let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
102        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
103
104    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
105}
106
107/// Read entities by filter expression.
108///
109/// After reading from the local graph, also includes matching entities from
110/// any federated remote connectors.
111fn read_by_filter(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
112    let row = request_grid
113        .row(0)
114        .ok_or_else(|| HaystackError::bad_request("request grid has no rows"))?;
115
116    let filter = match row.get("filter") {
117        Some(Kind::Str(s)) => s.as_str(),
118        _ => return Err(HaystackError::bad_request("filter must be a Str value")),
119    };
120
121    let limit = match row.get("limit") {
122        Some(Kind::Number(n)) => n.val as usize,
123        _ => 0, // 0 means no limit
124    };
125
126    let effective_limit = if limit == 0 { usize::MAX } else { limit };
127    let is_wildcard = filter == "*";
128
129    // Read from local graph.
130    let mut results: Vec<Arc<HDict>> = if is_wildcard {
131        // Wildcard: return all entities from the graph.
132        state
133            .graph
134            .all_entities()
135            .into_iter()
136            .map(Arc::new)
137            .collect()
138    } else {
139        let local_grid = state
140            .graph
141            .read_filter(filter, limit)
142            .map_err(|e| HaystackError::bad_request(format!("filter error: {e}")))?;
143        local_grid.rows.into_iter().map(Arc::new).collect()
144    };
145
146    // Apply limit to local results.
147    if results.len() > effective_limit {
148        results.truncate(effective_limit);
149    }
150
151    // Merge federated entities if we have not yet hit the limit.
152    if results.len() < effective_limit {
153        let remaining = effective_limit - results.len();
154        if is_wildcard {
155            // Wildcard: include all federated entities up to the limit.
156            let federated = state.federation.all_cached_entities();
157            for entity in federated {
158                if results.len() >= effective_limit {
159                    break;
160                }
161                results.push(entity);
162            }
163        } else {
164            // Use bitmap-accelerated filter on federated caches.
165            match state.federation.filter_cached_entities(filter, remaining) {
166                Ok(federated) => results.extend(federated),
167                Err(e) => {
168                    return Err(HaystackError::bad_request(format!(
169                        "federation filter error: {e}"
170                    )));
171                }
172            }
173        }
174    }
175
176    if results.is_empty() {
177        return Ok(HGrid::new());
178    }
179
180    // Build column set from all result entities using borrowed &str.
181    let mut col_set: Vec<&str> = Vec::new();
182    let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
183    for entity in &results {
184        for name in entity.tag_names() {
185            if seen.insert(name) {
186                col_set.push(name);
187            }
188        }
189    }
190    col_set.sort_unstable();
191    let cols: Vec<HCol> = col_set.iter().map(|&n| HCol::new(n)).collect();
192
193    Ok(HGrid::from_parts_arc(HDict::new(), cols, results))
194}
195
196/// Read entities by ID references.
197///
198/// First pass: resolve IDs from local graph.
199/// Second pass: batch-fetch remaining IDs from federation connectors
200/// (grouped by connector for O(1) indexed lookup per ID).
201fn read_by_id(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
202    let mut results: Vec<Arc<HDict>> = Vec::new();
203    let mut unknown_ids: Vec<String> = Vec::new();
204
205    // First pass: resolve from local graph, collect unknowns.
206    for row in request_grid.rows.iter() {
207        let ref_val = match row.get("id") {
208            Some(Kind::Ref(r)) => &r.val,
209            _ => continue,
210        };
211
212        if let Some(entity) = state.graph.get(ref_val) {
213            results.push(Arc::new(entity));
214        } else {
215            unknown_ids.push(ref_val.clone());
216        }
217    }
218
219    // Second pass: batch fetch unknowns from federation.
220    if !unknown_ids.is_empty() && !state.federation.connectors.is_empty() {
221        let id_refs: Vec<&str> = unknown_ids.iter().map(|s| s.as_str()).collect();
222        let (found, still_missing) = state.federation.batch_read_by_id(id_refs);
223        results.extend(found);
224
225        // Add missing stubs for IDs not found anywhere.
226        for id in still_missing {
227            let mut missing = HDict::new();
228            missing.set("id", Kind::Ref(HRef::from_val(id.as_str())));
229            results.push(Arc::new(missing));
230        }
231    } else {
232        // No federation — add missing stubs directly.
233        for id in unknown_ids {
234            let mut missing = HDict::new();
235            missing.set("id", Kind::Ref(HRef::from_val(id.as_str())));
236            results.push(Arc::new(missing));
237        }
238    }
239
240    if results.is_empty() {
241        return Ok(HGrid::new());
242    }
243
244    // Build column set using borrowed &str from entities.
245    let mut col_set: Vec<&str> = Vec::new();
246    let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
247    for entity in &results {
248        for name in entity.tag_names() {
249            if seen.insert(name) {
250                col_set.push(name);
251            }
252        }
253    }
254
255    col_set.sort_unstable();
256    let cols: Vec<HCol> = col_set.iter().map(|&n| HCol::new(n)).collect();
257    Ok(HGrid::from_parts_arc(HDict::new(), cols, results))
258}