Skip to main content

haystack_server/ops/
data.rs

1//! The `export` and `import` ops — bulk data import/export.
2
3use actix_web::{HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::{Kind, Number};
7
8use crate::content;
9use crate::error::HaystackError;
10use crate::state::AppState;
11
12/// POST /api/export
13///
14/// Reads all entities from the graph and returns them as a grid.
15/// The Accept header determines the response encoding format.
16pub async fn handle_export(
17    req: HttpRequest,
18    state: web::Data<AppState>,
19) -> Result<HttpResponse, HaystackError> {
20    let accept = req
21        .headers()
22        .get("Accept")
23        .and_then(|v| v.to_str().ok())
24        .unwrap_or("");
25
26    let grid = state
27        .graph
28        .read(|g| g.to_grid(""))
29        .map_err(|e| HaystackError::internal(format!("export failed: {e}")))?;
30
31    let (encoded, ct) = content::encode_response_grid(&grid, accept)
32        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
33
34    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
35}
36
37/// POST /api/import
38///
39/// Decodes the request body as a grid and adds/updates each row as an entity.
40/// Each row must have an `id` tag with a Ref value.
41/// Existing entities are updated; new entities are added.
42/// Returns a grid with the count of imported entities.
43pub async fn handle_import(
44    req: HttpRequest,
45    body: String,
46    state: web::Data<AppState>,
47) -> Result<HttpResponse, HaystackError> {
48    let content_type = req
49        .headers()
50        .get("Content-Type")
51        .and_then(|v| v.to_str().ok())
52        .unwrap_or("");
53    let accept = req
54        .headers()
55        .get("Accept")
56        .and_then(|v| v.to_str().ok())
57        .unwrap_or("");
58
59    let request_grid = content::decode_request_grid(&body, content_type)
60        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
61
62    let mut count: usize = 0;
63
64    for row in &request_grid.rows {
65        let ref_val = match row.id() {
66            Some(r) => r.val.clone(),
67            None => {
68                // Skip rows without a valid Ref id
69                continue;
70            }
71        };
72
73        // Check federation: if entity is owned by a remote connector, proxy import.
74        if let Some(connector) = state.federation.owner_of(&ref_val) {
75            connector
76                .proxy_import(row)
77                .await
78                .map_err(|e| {
79                    HaystackError::internal(format!(
80                        "federation import failed for {ref_val}: {e}"
81                    ))
82                })?;
83            count += 1;
84            continue;
85        }
86
87        if state.graph.contains(&ref_val) {
88            // Update existing entity
89            state.graph.update(&ref_val, row.clone()).map_err(|e| {
90                HaystackError::internal(format!("update failed for {ref_val}: {e}"))
91            })?;
92        } else {
93            // Add new entity
94            state
95                .graph
96                .add(row.clone())
97                .map_err(|e| HaystackError::internal(format!("add failed for {ref_val}: {e}")))?;
98        }
99
100        count += 1;
101    }
102
103    // Build response grid with count
104    let mut row = HDict::new();
105    row.set("count", Kind::Number(Number::new(count as f64, None)));
106
107    let cols = vec![HCol::new("count")];
108    let result_grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
109
110    let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
111        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
112
113    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
114}
115
116#[cfg(test)]
117mod tests {
118    use actix_web::App;
119    use actix_web::test as actix_test;
120    use actix_web::web;
121
122    use haystack_core::data::{HCol, HDict, HGrid};
123    use haystack_core::graph::{EntityGraph, SharedGraph};
124    use haystack_core::kinds::{HRef, Kind, Number};
125
126    use crate::actions::ActionRegistry;
127    use crate::auth::AuthManager;
128    use crate::his_store::HisStore;
129    use crate::state::AppState;
130    use crate::ws::WatchManager;
131
132    fn test_app_state() -> web::Data<AppState> {
133        web::Data::new(AppState {
134            graph: SharedGraph::new(EntityGraph::new()),
135            namespace: parking_lot::RwLock::new(haystack_core::ontology::DefNamespace::new()),
136            auth: AuthManager::empty(),
137            watches: WatchManager::new(),
138            actions: ActionRegistry::new(),
139            his: HisStore::new(),
140            started_at: std::time::Instant::now(),
141            federation: crate::federation::Federation::new(),
142        })
143    }
144
145    fn make_site(id: &str) -> HDict {
146        let mut d = HDict::new();
147        d.set("id", Kind::Ref(HRef::from_val(id)));
148        d.set("site", Kind::Marker);
149        d.set("dis", Kind::Str(format!("Site {id}")));
150        d.set(
151            "area",
152            Kind::Number(Number::new(4500.0, Some("ft\u{00b2}".into()))),
153        );
154        d
155    }
156
157    fn make_equip(id: &str, site_ref: &str) -> HDict {
158        let mut d = HDict::new();
159        d.set("id", Kind::Ref(HRef::from_val(id)));
160        d.set("equip", Kind::Marker);
161        d.set("dis", Kind::Str(format!("Equip {id}")));
162        d.set("siteRef", Kind::Ref(HRef::from_val(site_ref)));
163        d
164    }
165
166    fn encode_grid_zinc(grid: &HGrid) -> String {
167        let codec = haystack_core::codecs::codec_for("text/zinc").unwrap();
168        codec.encode_grid(grid).unwrap()
169    }
170
171    fn decode_grid_zinc(body: &str) -> HGrid {
172        let codec = haystack_core::codecs::codec_for("text/zinc").unwrap();
173        codec.decode_grid(body).unwrap()
174    }
175
176    #[actix_web::test]
177    async fn export_empty_graph() {
178        let state = test_app_state();
179        let app = actix_test::init_service(
180            App::new()
181                .app_data(state.clone())
182                .route("/api/export", web::post().to(super::handle_export)),
183        )
184        .await;
185
186        let req = actix_test::TestRequest::post()
187            .uri("/api/export")
188            .insert_header(("Accept", "text/zinc"))
189            .to_request();
190
191        let resp = actix_test::call_service(&app, req).await;
192        assert_eq!(resp.status(), 200);
193
194        let body = actix_test::read_body(resp).await;
195        let body_str = std::str::from_utf8(&body).unwrap();
196        let grid = decode_grid_zinc(body_str);
197        assert!(grid.is_empty());
198    }
199
200    #[actix_web::test]
201    async fn import_entities() {
202        let state = test_app_state();
203        let app = actix_test::init_service(
204            App::new()
205                .app_data(state.clone())
206                .route("/api/import", web::post().to(super::handle_import)),
207        )
208        .await;
209
210        // Build a grid with two entities
211        let site = make_site("site-1");
212        let equip = make_equip("equip-1", "site-1");
213        let cols = vec![
214            HCol::new("area"),
215            HCol::new("dis"),
216            HCol::new("equip"),
217            HCol::new("id"),
218            HCol::new("site"),
219            HCol::new("siteRef"),
220        ];
221        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![site, equip]);
222        let body = encode_grid_zinc(&import_grid);
223
224        let req = actix_test::TestRequest::post()
225            .uri("/api/import")
226            .insert_header(("Content-Type", "text/zinc"))
227            .insert_header(("Accept", "text/zinc"))
228            .set_payload(body)
229            .to_request();
230
231        let resp = actix_test::call_service(&app, req).await;
232        assert_eq!(resp.status(), 200);
233
234        let resp_body = actix_test::read_body(resp).await;
235        let resp_str = std::str::from_utf8(&resp_body).unwrap();
236        let result_grid = decode_grid_zinc(resp_str);
237        assert_eq!(result_grid.len(), 1);
238
239        // Verify count
240        let count_row = result_grid.row(0).unwrap();
241        match count_row.get("count") {
242            Some(Kind::Number(n)) => assert_eq!(n.val as usize, 2),
243            other => panic!("expected Number count, got {other:?}"),
244        }
245
246        // Verify graph has entities
247        assert_eq!(state.graph.len(), 2);
248        assert!(state.graph.contains("site-1"));
249        assert!(state.graph.contains("equip-1"));
250    }
251
252    #[actix_web::test]
253    async fn import_updates_existing_entities() {
254        let state = test_app_state();
255
256        // Pre-populate the graph with a site
257        state.graph.add(make_site("site-1")).unwrap();
258
259        let app = actix_test::init_service(
260            App::new()
261                .app_data(state.clone())
262                .route("/api/import", web::post().to(super::handle_import)),
263        )
264        .await;
265
266        // Import an updated version of site-1
267        let mut updated_site = HDict::new();
268        updated_site.set("id", Kind::Ref(HRef::from_val("site-1")));
269        updated_site.set("site", Kind::Marker);
270        updated_site.set("dis", Kind::Str("Updated Site".into()));
271        updated_site.set(
272            "area",
273            Kind::Number(Number::new(9000.0, Some("ft\u{00b2}".into()))),
274        );
275
276        let cols = vec![
277            HCol::new("area"),
278            HCol::new("dis"),
279            HCol::new("id"),
280            HCol::new("site"),
281        ];
282        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![updated_site]);
283        let body = encode_grid_zinc(&import_grid);
284
285        let req = actix_test::TestRequest::post()
286            .uri("/api/import")
287            .insert_header(("Content-Type", "text/zinc"))
288            .insert_header(("Accept", "text/zinc"))
289            .set_payload(body)
290            .to_request();
291
292        let resp = actix_test::call_service(&app, req).await;
293        assert_eq!(resp.status(), 200);
294
295        // Still only 1 entity (updated, not duplicated)
296        assert_eq!(state.graph.len(), 1);
297
298        // Verify the entity was updated
299        let entity = state.graph.get("site-1").unwrap();
300        assert_eq!(entity.get("dis"), Some(&Kind::Str("Updated Site".into())));
301    }
302
303    #[actix_web::test]
304    async fn import_then_export_roundtrip() {
305        let state = test_app_state();
306        let app = actix_test::init_service(
307            App::new()
308                .app_data(state.clone())
309                .route("/api/import", web::post().to(super::handle_import))
310                .route("/api/export", web::post().to(super::handle_export)),
311        )
312        .await;
313
314        // Import two entities
315        let site = make_site("site-1");
316        let equip = make_equip("equip-1", "site-1");
317        let cols = vec![
318            HCol::new("area"),
319            HCol::new("dis"),
320            HCol::new("equip"),
321            HCol::new("id"),
322            HCol::new("site"),
323            HCol::new("siteRef"),
324        ];
325        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![site, equip]);
326        let body = encode_grid_zinc(&import_grid);
327
328        let import_req = actix_test::TestRequest::post()
329            .uri("/api/import")
330            .insert_header(("Content-Type", "text/zinc"))
331            .insert_header(("Accept", "text/zinc"))
332            .set_payload(body)
333            .to_request();
334
335        let import_resp = actix_test::call_service(&app, import_req).await;
336        assert_eq!(import_resp.status(), 200);
337
338        // Now export
339        let export_req = actix_test::TestRequest::post()
340            .uri("/api/export")
341            .insert_header(("Accept", "text/zinc"))
342            .to_request();
343
344        let export_resp = actix_test::call_service(&app, export_req).await;
345        assert_eq!(export_resp.status(), 200);
346
347        let export_body = actix_test::read_body(export_resp).await;
348        let export_str = std::str::from_utf8(&export_body).unwrap();
349        let exported_grid = decode_grid_zinc(export_str);
350
351        // Should have 2 rows
352        assert_eq!(exported_grid.len(), 2);
353
354        // Verify the exported grid has the expected columns
355        assert!(exported_grid.col("id").is_some());
356        assert!(exported_grid.col("dis").is_some());
357
358        // Verify we can find both entities by checking id refs in the rows
359        let mut ids: Vec<String> = exported_grid
360            .rows
361            .iter()
362            .filter_map(|r| r.id().map(|r| r.val.clone()))
363            .collect();
364        ids.sort();
365        assert_eq!(ids, vec!["equip-1", "site-1"]);
366    }
367}