Skip to main content

haystack_server/ops/
read.rs

1//! The `read` op — read entities by filter or by id.
2
3use actix_web::{HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::filter::{matches, parse_filter};
7use haystack_core::kinds::{HRef, Kind};
8
9use crate::content;
10use crate::error::HaystackError;
11use crate::state::AppState;
12
13/// POST /api/read
14///
15/// Request grid may have:
16/// - A `filter` column with a filter expression string, and optional `limit` column
17/// - An `id` column with Ref values for reading specific entities
18pub async fn handle(
19    req: HttpRequest,
20    body: String,
21    state: web::Data<AppState>,
22) -> Result<HttpResponse, HaystackError> {
23    let content_type = req
24        .headers()
25        .get("Content-Type")
26        .and_then(|v| v.to_str().ok())
27        .unwrap_or("");
28    let accept = req
29        .headers()
30        .get("Accept")
31        .and_then(|v| v.to_str().ok())
32        .unwrap_or("");
33
34    let request_grid = content::decode_request_grid(&body, content_type)
35        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
36
37    let result_grid = if request_grid.col("id").is_some() {
38        // Read by ID
39        read_by_id(&request_grid, &state)
40    } else if request_grid.col("filter").is_some() {
41        // Read by filter
42        read_by_filter(&request_grid, &state)
43    } else {
44        Err(HaystackError::bad_request(
45            "request must have 'id' or 'filter' column",
46        ))
47    }?;
48
49    let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
50        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
51
52    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
53}
54
55/// Read entities by filter expression.
56///
57/// After reading from the local graph, also includes matching entities from
58/// any federated remote connectors.
59fn read_by_filter(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
60    let row = request_grid
61        .row(0)
62        .ok_or_else(|| HaystackError::bad_request("request grid has no rows"))?;
63
64    let filter = match row.get("filter") {
65        Some(Kind::Str(s)) => s.as_str(),
66        _ => return Err(HaystackError::bad_request("filter must be a Str value")),
67    };
68
69    let limit = match row.get("limit") {
70        Some(Kind::Number(n)) => n.val as usize,
71        _ => 0, // 0 means no limit
72    };
73
74    let effective_limit = if limit == 0 { usize::MAX } else { limit };
75    let is_wildcard = filter == "*";
76
77    // Read from local graph.
78    let mut results: Vec<HDict> = if is_wildcard {
79        // Wildcard: return all entities from the graph.
80        state.graph.all_entities()
81    } else {
82        let local_grid = state
83            .graph
84            .read_filter(filter, limit)
85            .map_err(|e| HaystackError::bad_request(format!("filter error: {e}")))?;
86        local_grid.rows
87    };
88
89    // Apply limit to local results.
90    if results.len() > effective_limit {
91        results.truncate(effective_limit);
92    }
93
94    // Merge federated entities if we have not yet hit the limit.
95    if results.len() < effective_limit {
96        let federated = state.federation.all_cached_entities();
97        if !federated.is_empty() {
98            if is_wildcard {
99                // Wildcard: include all federated entities up to the limit.
100                for entity in federated {
101                    if results.len() >= effective_limit {
102                        break;
103                    }
104                    results.push(entity);
105                }
106            } else {
107                let ast = parse_filter(filter)
108                    .map_err(|e| HaystackError::bad_request(format!("filter error: {e}")))?;
109
110                for entity in federated {
111                    if results.len() >= effective_limit {
112                        break;
113                    }
114                    if matches(&ast, &entity, None) {
115                        results.push(entity);
116                    }
117                }
118            }
119        }
120    }
121
122    if results.is_empty() {
123        return Ok(HGrid::new());
124    }
125
126    // Build column set from all result entities.
127    let mut col_set: Vec<String> = Vec::new();
128    let mut seen = std::collections::HashSet::new();
129    for entity in &results {
130        for name in entity.tag_names() {
131            if seen.insert(name.to_string()) {
132                col_set.push(name.to_string());
133            }
134        }
135    }
136    col_set.sort();
137    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
138
139    Ok(HGrid::from_parts(HDict::new(), cols, results))
140}
141
142/// Read entities by ID references.
143fn read_by_id(request_grid: &HGrid, state: &AppState) -> Result<HGrid, HaystackError> {
144    let mut results: Vec<HDict> = Vec::new();
145    let mut col_set: Vec<String> = Vec::new();
146    let mut seen = std::collections::HashSet::new();
147
148    for row in request_grid.rows.iter() {
149        let ref_val = match row.get("id") {
150            Some(Kind::Ref(r)) => &r.val,
151            _ => continue,
152        };
153
154        if let Some(entity) = state.graph.get(ref_val) {
155            for name in entity.tag_names() {
156                if seen.insert(name.to_string()) {
157                    col_set.push(name.to_string());
158                }
159            }
160            results.push(entity);
161        } else if let Some(connector) = state.federation.owner_of(ref_val) {
162            // Found in federation cache — return cached entity.
163            let cached = connector.cached_entities();
164            if let Some(entity) = cached
165                .iter()
166                .find(|e| matches!(e.get("id"), Some(Kind::Ref(r)) if r.val == *ref_val))
167            {
168                for name in entity.tag_names() {
169                    if seen.insert(name.to_string()) {
170                        col_set.push(name.to_string());
171                    }
172                }
173                results.push(entity.clone());
174            } else {
175                // Connector owns it but entity not in cache (shouldn't normally happen).
176                let mut missing = HDict::new();
177                missing.set("id", Kind::Ref(HRef::from_val(ref_val.as_str())));
178                if seen.insert("id".to_string()) {
179                    col_set.push("id".to_string());
180                }
181                results.push(missing);
182            }
183        } else {
184            // Not found anywhere — return missing entity row.
185            let mut missing = HDict::new();
186            missing.set("id", Kind::Ref(HRef::from_val(ref_val.as_str())));
187            if seen.insert("id".to_string()) {
188                col_set.push("id".to_string());
189            }
190            results.push(missing);
191        }
192    }
193
194    if results.is_empty() {
195        return Ok(HGrid::new());
196    }
197
198    col_set.sort();
199    let cols: Vec<HCol> = col_set.iter().map(|n| HCol::new(n.as_str())).collect();
200    Ok(HGrid::from_parts(HDict::new(), cols, results))
201}