Skip to main content

haystack_server/ops/
data.rs

1//! The `export` and `import` ops — bulk data import/export.
2//!
3//! # export (`POST /api/export`)
4//!
5//! No request grid. Returns all entities in the graph as a grid.
6//! The `Accept` header determines the response encoding (Zinc, JSON, etc.).
7//! Columns are derived from the union of all entity tags.
8//!
9//! # import (`POST /api/import`)
10//!
11//! ## Request Grid Columns
12//!
13//! Each row is an entity dict. Every row must have an `id` (Ref) tag.
14//! Additional columns are the entity tags to set.
15//!
16//! ## Response Grid Columns
17//!
18//! | Column  | Kind   | Description                      |
19//! |---------|--------|----------------------------------|
20//! | `count` | Number | Number of entities imported      |
21//!
22//! Existing entities are updated; new entities are added. Entities owned
23//! by a federation connector are proxied to the remote server.
24//!
25//! # Errors
26//!
27//! - **400 Bad Request** — request grid decode failure.
28//! - **500 Internal Server Error** — graph add/update failure, federation proxy
29//!   error, or encoding error.
30
31use actix_web::{HttpRequest, HttpResponse, web};
32
33use haystack_core::data::{HCol, HDict, HGrid};
34use haystack_core::kinds::{Kind, Number};
35
36use crate::content;
37use crate::error::HaystackError;
38use crate::state::AppState;
39
40/// POST /api/export
41///
42/// Reads all entities from the graph and returns them as a grid.
43/// The Accept header determines the response encoding format.
44pub async fn handle_export(
45    req: HttpRequest,
46    state: web::Data<AppState>,
47) -> Result<HttpResponse, HaystackError> {
48    let accept = req
49        .headers()
50        .get("Accept")
51        .and_then(|v| v.to_str().ok())
52        .unwrap_or("");
53
54    let grid = state
55        .graph
56        .read(|g| g.to_grid(""))
57        .map_err(|e| HaystackError::internal(format!("export failed: {e}")))?;
58
59    let (encoded, ct) = content::encode_response_grid(&grid, accept)
60        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
61
62    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
63}
64
65/// POST /api/import
66///
67/// Decodes the request body as a grid and adds/updates each row as an entity.
68/// Each row must have an `id` tag with a Ref value.
69/// Existing entities are updated; new entities are added.
70/// Returns a grid with the count of imported entities.
71pub async fn handle_import(
72    req: HttpRequest,
73    body: String,
74    state: web::Data<AppState>,
75) -> Result<HttpResponse, HaystackError> {
76    let content_type = req
77        .headers()
78        .get("Content-Type")
79        .and_then(|v| v.to_str().ok())
80        .unwrap_or("");
81    let accept = req
82        .headers()
83        .get("Accept")
84        .and_then(|v| v.to_str().ok())
85        .unwrap_or("");
86
87    let request_grid = content::decode_request_grid(&body, content_type)
88        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
89
90    let mut count: usize = 0;
91
92    for row in &request_grid.rows {
93        let ref_val = match row.id() {
94            Some(r) => r.val.clone(),
95            None => {
96                // Skip rows without a valid Ref id
97                continue;
98            }
99        };
100
101        // Check federation: if entity is owned by a remote connector, proxy import.
102        if let Some(connector) = state.federation.owner_of(&ref_val) {
103            connector.proxy_import(row).await.map_err(|e| {
104                HaystackError::internal(format!("federation import failed for {ref_val}: {e}"))
105            })?;
106            count += 1;
107            continue;
108        }
109
110        if state.graph.contains(&ref_val) {
111            // Update existing entity
112            state.graph.update(&ref_val, row.clone()).map_err(|e| {
113                HaystackError::internal(format!("update failed for {ref_val}: {e}"))
114            })?;
115        } else {
116            // Add new entity
117            state
118                .graph
119                .add(row.clone())
120                .map_err(|e| HaystackError::internal(format!("add failed for {ref_val}: {e}")))?;
121        }
122
123        count += 1;
124    }
125
126    // Build response grid with count
127    let mut row = HDict::new();
128    row.set("count", Kind::Number(Number::new(count as f64, None)));
129
130    let cols = vec![HCol::new("count")];
131    let result_grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
132
133    let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
134        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
135
136    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
137}
138
139#[cfg(test)]
140mod tests {
141    use actix_web::App;
142    use actix_web::test as actix_test;
143    use actix_web::web;
144
145    use haystack_core::data::{HCol, HDict, HGrid};
146    use haystack_core::graph::{EntityGraph, SharedGraph};
147    use haystack_core::kinds::{HRef, Kind, Number};
148
149    use crate::actions::ActionRegistry;
150    use crate::auth::AuthManager;
151    use crate::his_store::HisStore;
152    use crate::state::AppState;
153    use crate::ws::WatchManager;
154
155    fn test_app_state() -> web::Data<AppState> {
156        web::Data::new(AppState {
157            graph: SharedGraph::new(EntityGraph::new()),
158            namespace: parking_lot::RwLock::new(haystack_core::ontology::DefNamespace::new()),
159            auth: AuthManager::empty(),
160            watches: WatchManager::new(),
161            actions: ActionRegistry::new(),
162            his: HisStore::new(),
163            started_at: std::time::Instant::now(),
164            federation: crate::federation::Federation::new(),
165        })
166    }
167
168    fn make_site(id: &str) -> HDict {
169        let mut d = HDict::new();
170        d.set("id", Kind::Ref(HRef::from_val(id)));
171        d.set("site", Kind::Marker);
172        d.set("dis", Kind::Str(format!("Site {id}")));
173        d.set(
174            "area",
175            Kind::Number(Number::new(4500.0, Some("ft\u{00b2}".into()))),
176        );
177        d
178    }
179
180    fn make_equip(id: &str, site_ref: &str) -> HDict {
181        let mut d = HDict::new();
182        d.set("id", Kind::Ref(HRef::from_val(id)));
183        d.set("equip", Kind::Marker);
184        d.set("dis", Kind::Str(format!("Equip {id}")));
185        d.set("siteRef", Kind::Ref(HRef::from_val(site_ref)));
186        d
187    }
188
189    fn encode_grid_zinc(grid: &HGrid) -> String {
190        let codec = haystack_core::codecs::codec_for("text/zinc").unwrap();
191        codec.encode_grid(grid).unwrap()
192    }
193
194    fn decode_grid_zinc(body: &str) -> HGrid {
195        let codec = haystack_core::codecs::codec_for("text/zinc").unwrap();
196        codec.decode_grid(body).unwrap()
197    }
198
199    #[actix_web::test]
200    async fn export_empty_graph() {
201        let state = test_app_state();
202        let app = actix_test::init_service(
203            App::new()
204                .app_data(state.clone())
205                .route("/api/export", web::post().to(super::handle_export)),
206        )
207        .await;
208
209        let req = actix_test::TestRequest::post()
210            .uri("/api/export")
211            .insert_header(("Accept", "text/zinc"))
212            .to_request();
213
214        let resp = actix_test::call_service(&app, req).await;
215        assert_eq!(resp.status(), 200);
216
217        let body = actix_test::read_body(resp).await;
218        let body_str = std::str::from_utf8(&body).unwrap();
219        let grid = decode_grid_zinc(body_str);
220        assert!(grid.is_empty());
221    }
222
223    #[actix_web::test]
224    async fn import_entities() {
225        let state = test_app_state();
226        let app = actix_test::init_service(
227            App::new()
228                .app_data(state.clone())
229                .route("/api/import", web::post().to(super::handle_import)),
230        )
231        .await;
232
233        // Build a grid with two entities
234        let site = make_site("site-1");
235        let equip = make_equip("equip-1", "site-1");
236        let cols = vec![
237            HCol::new("area"),
238            HCol::new("dis"),
239            HCol::new("equip"),
240            HCol::new("id"),
241            HCol::new("site"),
242            HCol::new("siteRef"),
243        ];
244        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![site, equip]);
245        let body = encode_grid_zinc(&import_grid);
246
247        let req = actix_test::TestRequest::post()
248            .uri("/api/import")
249            .insert_header(("Content-Type", "text/zinc"))
250            .insert_header(("Accept", "text/zinc"))
251            .set_payload(body)
252            .to_request();
253
254        let resp = actix_test::call_service(&app, req).await;
255        assert_eq!(resp.status(), 200);
256
257        let resp_body = actix_test::read_body(resp).await;
258        let resp_str = std::str::from_utf8(&resp_body).unwrap();
259        let result_grid = decode_grid_zinc(resp_str);
260        assert_eq!(result_grid.len(), 1);
261
262        // Verify count
263        let count_row = result_grid.row(0).unwrap();
264        match count_row.get("count") {
265            Some(Kind::Number(n)) => assert_eq!(n.val as usize, 2),
266            other => panic!("expected Number count, got {other:?}"),
267        }
268
269        // Verify graph has entities
270        assert_eq!(state.graph.len(), 2);
271        assert!(state.graph.contains("site-1"));
272        assert!(state.graph.contains("equip-1"));
273    }
274
275    #[actix_web::test]
276    async fn import_updates_existing_entities() {
277        let state = test_app_state();
278
279        // Pre-populate the graph with a site
280        state.graph.add(make_site("site-1")).unwrap();
281
282        let app = actix_test::init_service(
283            App::new()
284                .app_data(state.clone())
285                .route("/api/import", web::post().to(super::handle_import)),
286        )
287        .await;
288
289        // Import an updated version of site-1
290        let mut updated_site = HDict::new();
291        updated_site.set("id", Kind::Ref(HRef::from_val("site-1")));
292        updated_site.set("site", Kind::Marker);
293        updated_site.set("dis", Kind::Str("Updated Site".into()));
294        updated_site.set(
295            "area",
296            Kind::Number(Number::new(9000.0, Some("ft\u{00b2}".into()))),
297        );
298
299        let cols = vec![
300            HCol::new("area"),
301            HCol::new("dis"),
302            HCol::new("id"),
303            HCol::new("site"),
304        ];
305        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![updated_site]);
306        let body = encode_grid_zinc(&import_grid);
307
308        let req = actix_test::TestRequest::post()
309            .uri("/api/import")
310            .insert_header(("Content-Type", "text/zinc"))
311            .insert_header(("Accept", "text/zinc"))
312            .set_payload(body)
313            .to_request();
314
315        let resp = actix_test::call_service(&app, req).await;
316        assert_eq!(resp.status(), 200);
317
318        // Still only 1 entity (updated, not duplicated)
319        assert_eq!(state.graph.len(), 1);
320
321        // Verify the entity was updated
322        let entity = state.graph.get("site-1").unwrap();
323        assert_eq!(entity.get("dis"), Some(&Kind::Str("Updated Site".into())));
324    }
325
326    #[actix_web::test]
327    async fn import_then_export_roundtrip() {
328        let state = test_app_state();
329        let app = actix_test::init_service(
330            App::new()
331                .app_data(state.clone())
332                .route("/api/import", web::post().to(super::handle_import))
333                .route("/api/export", web::post().to(super::handle_export)),
334        )
335        .await;
336
337        // Import two entities
338        let site = make_site("site-1");
339        let equip = make_equip("equip-1", "site-1");
340        let cols = vec![
341            HCol::new("area"),
342            HCol::new("dis"),
343            HCol::new("equip"),
344            HCol::new("id"),
345            HCol::new("site"),
346            HCol::new("siteRef"),
347        ];
348        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![site, equip]);
349        let body = encode_grid_zinc(&import_grid);
350
351        let import_req = actix_test::TestRequest::post()
352            .uri("/api/import")
353            .insert_header(("Content-Type", "text/zinc"))
354            .insert_header(("Accept", "text/zinc"))
355            .set_payload(body)
356            .to_request();
357
358        let import_resp = actix_test::call_service(&app, import_req).await;
359        assert_eq!(import_resp.status(), 200);
360
361        // Now export
362        let export_req = actix_test::TestRequest::post()
363            .uri("/api/export")
364            .insert_header(("Accept", "text/zinc"))
365            .to_request();
366
367        let export_resp = actix_test::call_service(&app, export_req).await;
368        assert_eq!(export_resp.status(), 200);
369
370        let export_body = actix_test::read_body(export_resp).await;
371        let export_str = std::str::from_utf8(&export_body).unwrap();
372        let exported_grid = decode_grid_zinc(export_str);
373
374        // Should have 2 rows
375        assert_eq!(exported_grid.len(), 2);
376
377        // Verify the exported grid has the expected columns
378        assert!(exported_grid.col("id").is_some());
379        assert!(exported_grid.col("dis").is_some());
380
381        // Verify we can find both entities by checking id refs in the rows
382        let mut ids: Vec<String> = exported_grid
383            .rows
384            .iter()
385            .filter_map(|r| r.id().map(|r| r.val.clone()))
386            .collect();
387        ids.sort();
388        assert_eq!(ids, vec!["equip-1", "site-1"]);
389    }
390}