Skip to main content

haystack_server/ops/
read.rs

1//! The `read` op — read entities by filter or by id.
2//!
3//! # Overview
4//!
5//! `POST /api/read` returns entity records matching a filter expression or a
6//! list of explicit IDs. Results include local entities and, when federation is
7//! enabled, matching entities from remote connectors.
8//!
9//! # Request Grid Columns
10//!
11//! Two mutually-exclusive request forms:
12//!
13//! **Filter read** — first row contains:
14//!
15//! | Column   | Kind   | Description                                      |
16//! |----------|--------|--------------------------------------------------|
17//! | `filter` | Str    | Haystack filter expression (e.g. `"site"`, `"point and siteRef==@s1"`) |
18//! | `limit`  | Number | *(optional)* Max entities to return (0 = no limit) |
19//!
20//! **ID read** — each row contains:
21//!
22//! | Column | Kind | Description      |
23//! |--------|------|------------------|
24//! | `id`   | Ref  | Entity reference |
25//!
26//! # Response Grid Columns
27//!
28//! The response grid has one column per unique tag across all matched entities.
29//! For ID reads, rows for unknown IDs appear as stubs with only the `id` column.
30//!
31//! # Errors
32//!
33//! - **400 Bad Request** — missing `filter`/`id` column, invalid filter syntax, or
34//!   request decode failure.
35//! - **500 Internal Server Error** — graph or encoding error.
36
37use actix_web::{HttpRequest, HttpResponse, web};
38
39use haystack_core::data::{HCol, HDict, HGrid};
40use haystack_core::kinds::{HRef, Kind};
41
42use crate::content;
43use crate::error::HaystackError;
44use crate::state::AppState;
45
46/// POST /api/read
47///
48/// Request grid may have:
49/// - A `filter` column with a filter expression string, and optional `limit` column
50/// - An `id` column with Ref values for reading specific entities
51pub async fn handle(
52    req: HttpRequest,
53    body: String,
54    state: web::Data<AppState>,
55) -> Result<HttpResponse, HaystackError> {
56    let content_type = req
57        .headers()
58        .get("Content-Type")
59        .and_then(|v| v.to_str().ok())
60        .unwrap_or("");
61    let accept = req
62        .headers()
63        .get("Accept")
64        .and_then(|v| v.to_str().ok())
65        .unwrap_or("");
66
67    let request_grid = content::decode_request_grid(&body, content_type)
68        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
69
70    let result_grid = if request_grid.col("id").is_some() {
71        // Read by ID
72        read_by_id(&request_grid, &state)
73    } else if request_grid.col("filter").is_some() {
74        // Read by filter
75        read_by_filter(&request_grid, &state)
76    } else {
77        Err(HaystackError::bad_request(
78            "request must have 'id' or 'filter' column",
79        ))
80    }?;
81
82    let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
83        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
84
85    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
86}
87
88/// Read entities by filter expression.
89///
90/// After reading from the local graph, also includes matching entities from
91/// any federated remote connectors.
92fn read_by_filter(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
93    let row = request_grid
94        .row(0)
95        .ok_or_else(|| HaystackError::bad_request("request grid has no rows"))?;
96
97    let filter = match row.get("filter") {
98        Some(Kind::Str(s)) => s.as_str(),
99        _ => return Err(HaystackError::bad_request("filter must be a Str value")),
100    };
101
102    let limit = match row.get("limit") {
103        Some(Kind::Number(n)) => n.val as usize,
104        _ => 0, // 0 means no limit
105    };
106
107    let effective_limit = if limit == 0 { usize::MAX } else { limit };
108    let is_wildcard = filter == "*";
109
110    // Read from local graph.
111    let mut results: Vec<HDict> = if is_wildcard {
112        // Wildcard: return all entities from the graph.
113        state.graph.all_entities()
114    } else {
115        let local_grid = state
116            .graph
117            .read_filter(filter, limit)
118            .map_err(|e| HaystackError::bad_request(format!("filter error: {e}")))?;
119        local_grid.rows
120    };
121
122    // Apply limit to local results.
123    if results.len() > effective_limit {
124        results.truncate(effective_limit);
125    }
126
127    // Merge federated entities if we have not yet hit the limit.
128    if results.len() < effective_limit {
129        let remaining = effective_limit - results.len();
130        if is_wildcard {
131            // Wildcard: include all federated entities up to the limit.
132            let federated = state.federation.all_cached_entities();
133            for entity in federated {
134                if results.len() >= effective_limit {
135                    break;
136                }
137                results.push(entity);
138            }
139        } else {
140            // Use bitmap-accelerated filter on federated caches.
141            match state.federation.filter_cached_entities(filter, remaining) {
142                Ok(federated) => results.extend(federated),
143                Err(e) => {
144                    return Err(HaystackError::bad_request(format!(
145                        "federation filter error: {e}"
146                    )));
147                }
148            }
149        }
150    }
151
152    if results.is_empty() {
153        return Ok(HGrid::new());
154    }
155
156    // Build column set from all result entities.
157    let mut col_set: Vec<String> = Vec::new();
158    let mut seen = std::collections::HashSet::new();
159    for entity in &results {
160        for name in entity.tag_names() {
161            if seen.insert(name.to_string()) {
162                col_set.push(name.to_string());
163            }
164        }
165    }
166    col_set.sort();
167    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
168
169    Ok(HGrid::from_parts(HDict::new(), cols, results))
170}
171
172/// Read entities by ID references.
173///
174/// First pass: resolve IDs from local graph.
175/// Second pass: batch-fetch remaining IDs from federation connectors
176/// (grouped by connector for O(1) indexed lookup per ID).
177fn read_by_id(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
178    let mut results: Vec<HDict> = Vec::new();
179    let mut col_set: Vec<String> = Vec::new();
180    let mut seen = std::collections::HashSet::new();
181    let mut unknown_ids: Vec<String> = Vec::new();
182
183    // First pass: resolve from local graph, collect unknowns.
184    for row in request_grid.rows.iter() {
185        let ref_val = match row.get("id") {
186            Some(Kind::Ref(r)) => &r.val,
187            _ => continue,
188        };
189
190        if let Some(entity) = state.graph.get(ref_val) {
191            for name in entity.tag_names() {
192                if seen.insert(name.to_string()) {
193                    col_set.push(name.to_string());
194                }
195            }
196            results.push(entity);
197        } else {
198            unknown_ids.push(ref_val.clone());
199        }
200    }
201
202    // Second pass: batch fetch unknowns from federation.
203    if !unknown_ids.is_empty() && !state.federation.connectors.is_empty() {
204        let id_refs: Vec<&str> = unknown_ids.iter().map(|s| s.as_str()).collect();
205        let (found, still_missing) = state.federation.batch_read_by_id(id_refs);
206
207        for entity in found {
208            for name in entity.tag_names() {
209                if seen.insert(name.to_string()) {
210                    col_set.push(name.to_string());
211                }
212            }
213            results.push(entity);
214        }
215
216        // Add missing stubs for IDs not found anywhere.
217        for id in still_missing {
218            let mut missing = HDict::new();
219            missing.set("id", Kind::Ref(HRef::from_val(id.as_str())));
220            if seen.insert("id".to_string()) {
221                col_set.push("id".to_string());
222            }
223            results.push(missing);
224        }
225    } else {
226        // No federation — add missing stubs directly.
227        for id in unknown_ids {
228            let mut missing = HDict::new();
229            missing.set("id", Kind::Ref(HRef::from_val(id.as_str())));
230            if seen.insert("id".to_string()) {
231                col_set.push("id".to_string());
232            }
233            results.push(missing);
234        }
235    }
236
237    if results.is_empty() {
238        return Ok(HGrid::new());
239    }
240
241    col_set.sort();
242    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
243    Ok(HGrid::from_parts(HDict::new(), cols, results))
244}