Skip to main content

haystack_server/ops/
data.rs

1//! The `export` and `import` ops — bulk data import/export.
2
3use actix_web::{HttpRequest, HttpResponse, web};
4
5use haystack_core::data::{HCol, HDict, HGrid};
6use haystack_core::kinds::{Kind, Number};
7
8use crate::content;
9use crate::error::HaystackError;
10use crate::state::AppState;
11
12/// POST /api/export
13///
14/// Reads all entities from the graph and returns them as a grid.
15/// The Accept header determines the response encoding format.
16pub async fn handle_export(
17    req: HttpRequest,
18    state: web::Data<AppState>,
19) -> Result<HttpResponse, HaystackError> {
20    let accept = req
21        .headers()
22        .get("Accept")
23        .and_then(|v| v.to_str().ok())
24        .unwrap_or("");
25
26    let grid = state
27        .graph
28        .read(|g| g.to_grid(""))
29        .map_err(|e| HaystackError::internal(format!("export failed: {e}")))?;
30
31    let (encoded, ct) = content::encode_response_grid(&grid, accept)
32        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
33
34    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
35}
36
37/// POST /api/import
38///
39/// Decodes the request body as a grid and adds/updates each row as an entity.
40/// Each row must have an `id` tag with a Ref value.
41/// Existing entities are updated; new entities are added.
42/// Returns a grid with the count of imported entities.
43pub async fn handle_import(
44    req: HttpRequest,
45    body: String,
46    state: web::Data<AppState>,
47) -> Result<HttpResponse, HaystackError> {
48    let content_type = req
49        .headers()
50        .get("Content-Type")
51        .and_then(|v| v.to_str().ok())
52        .unwrap_or("");
53    let accept = req
54        .headers()
55        .get("Accept")
56        .and_then(|v| v.to_str().ok())
57        .unwrap_or("");
58
59    let request_grid = content::decode_request_grid(&body, content_type)
60        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
61
62    let mut count: usize = 0;
63
64    for row in &request_grid.rows {
65        let ref_val = match row.id() {
66            Some(r) => r.val.clone(),
67            None => {
68                // Skip rows without a valid Ref id
69                continue;
70            }
71        };
72
73        // Check federation: if entity is owned by a remote connector, proxy import.
74        if let Some(connector) = state.federation.owner_of(&ref_val) {
75            connector.proxy_import(row).await.map_err(|e| {
76                HaystackError::internal(format!("federation import failed for {ref_val}: {e}"))
77            })?;
78            count += 1;
79            continue;
80        }
81
82        if state.graph.contains(&ref_val) {
83            // Update existing entity
84            state.graph.update(&ref_val, row.clone()).map_err(|e| {
85                HaystackError::internal(format!("update failed for {ref_val}: {e}"))
86            })?;
87        } else {
88            // Add new entity
89            state
90                .graph
91                .add(row.clone())
92                .map_err(|e| HaystackError::internal(format!("add failed for {ref_val}: {e}")))?;
93        }
94
95        count += 1;
96    }
97
98    // Build response grid with count
99    let mut row = HDict::new();
100    row.set("count", Kind::Number(Number::new(count as f64, None)));
101
102    let cols = vec![HCol::new("count")];
103    let result_grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
104
105    let (encoded, ct) = content::encode_response_grid(&result_grid, accept)
106        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
107
108    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
109}
110
111#[cfg(test)]
112mod tests {
113    use actix_web::App;
114    use actix_web::test as actix_test;
115    use actix_web::web;
116
117    use haystack_core::data::{HCol, HDict, HGrid};
118    use haystack_core::graph::{EntityGraph, SharedGraph};
119    use haystack_core::kinds::{HRef, Kind, Number};
120
121    use crate::actions::ActionRegistry;
122    use crate::auth::AuthManager;
123    use crate::his_store::HisStore;
124    use crate::state::AppState;
125    use crate::ws::WatchManager;
126
127    fn test_app_state() -> web::Data<AppState> {
128        web::Data::new(AppState {
129            graph: SharedGraph::new(EntityGraph::new()),
130            namespace: parking_lot::RwLock::new(haystack_core::ontology::DefNamespace::new()),
131            auth: AuthManager::empty(),
132            watches: WatchManager::new(),
133            actions: ActionRegistry::new(),
134            his: HisStore::new(),
135            started_at: std::time::Instant::now(),
136            federation: crate::federation::Federation::new(),
137        })
138    }
139
140    fn make_site(id: &str) -> HDict {
141        let mut d = HDict::new();
142        d.set("id", Kind::Ref(HRef::from_val(id)));
143        d.set("site", Kind::Marker);
144        d.set("dis", Kind::Str(format!("Site {id}")));
145        d.set(
146            "area",
147            Kind::Number(Number::new(4500.0, Some("ft\u{00b2}".into()))),
148        );
149        d
150    }
151
152    fn make_equip(id: &str, site_ref: &str) -> HDict {
153        let mut d = HDict::new();
154        d.set("id", Kind::Ref(HRef::from_val(id)));
155        d.set("equip", Kind::Marker);
156        d.set("dis", Kind::Str(format!("Equip {id}")));
157        d.set("siteRef", Kind::Ref(HRef::from_val(site_ref)));
158        d
159    }
160
161    fn encode_grid_zinc(grid: &HGrid) -> String {
162        let codec = haystack_core::codecs::codec_for("text/zinc").unwrap();
163        codec.encode_grid(grid).unwrap()
164    }
165
166    fn decode_grid_zinc(body: &str) -> HGrid {
167        let codec = haystack_core::codecs::codec_for("text/zinc").unwrap();
168        codec.decode_grid(body).unwrap()
169    }
170
171    #[actix_web::test]
172    async fn export_empty_graph() {
173        let state = test_app_state();
174        let app = actix_test::init_service(
175            App::new()
176                .app_data(state.clone())
177                .route("/api/export", web::post().to(super::handle_export)),
178        )
179        .await;
180
181        let req = actix_test::TestRequest::post()
182            .uri("/api/export")
183            .insert_header(("Accept", "text/zinc"))
184            .to_request();
185
186        let resp = actix_test::call_service(&app, req).await;
187        assert_eq!(resp.status(), 200);
188
189        let body = actix_test::read_body(resp).await;
190        let body_str = std::str::from_utf8(&body).unwrap();
191        let grid = decode_grid_zinc(body_str);
192        assert!(grid.is_empty());
193    }
194
195    #[actix_web::test]
196    async fn import_entities() {
197        let state = test_app_state();
198        let app = actix_test::init_service(
199            App::new()
200                .app_data(state.clone())
201                .route("/api/import", web::post().to(super::handle_import)),
202        )
203        .await;
204
205        // Build a grid with two entities
206        let site = make_site("site-1");
207        let equip = make_equip("equip-1", "site-1");
208        let cols = vec![
209            HCol::new("area"),
210            HCol::new("dis"),
211            HCol::new("equip"),
212            HCol::new("id"),
213            HCol::new("site"),
214            HCol::new("siteRef"),
215        ];
216        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![site, equip]);
217        let body = encode_grid_zinc(&import_grid);
218
219        let req = actix_test::TestRequest::post()
220            .uri("/api/import")
221            .insert_header(("Content-Type", "text/zinc"))
222            .insert_header(("Accept", "text/zinc"))
223            .set_payload(body)
224            .to_request();
225
226        let resp = actix_test::call_service(&app, req).await;
227        assert_eq!(resp.status(), 200);
228
229        let resp_body = actix_test::read_body(resp).await;
230        let resp_str = std::str::from_utf8(&resp_body).unwrap();
231        let result_grid = decode_grid_zinc(resp_str);
232        assert_eq!(result_grid.len(), 1);
233
234        // Verify count
235        let count_row = result_grid.row(0).unwrap();
236        match count_row.get("count") {
237            Some(Kind::Number(n)) => assert_eq!(n.val as usize, 2),
238            other => panic!("expected Number count, got {other:?}"),
239        }
240
241        // Verify graph has entities
242        assert_eq!(state.graph.len(), 2);
243        assert!(state.graph.contains("site-1"));
244        assert!(state.graph.contains("equip-1"));
245    }
246
247    #[actix_web::test]
248    async fn import_updates_existing_entities() {
249        let state = test_app_state();
250
251        // Pre-populate the graph with a site
252        state.graph.add(make_site("site-1")).unwrap();
253
254        let app = actix_test::init_service(
255            App::new()
256                .app_data(state.clone())
257                .route("/api/import", web::post().to(super::handle_import)),
258        )
259        .await;
260
261        // Import an updated version of site-1
262        let mut updated_site = HDict::new();
263        updated_site.set("id", Kind::Ref(HRef::from_val("site-1")));
264        updated_site.set("site", Kind::Marker);
265        updated_site.set("dis", Kind::Str("Updated Site".into()));
266        updated_site.set(
267            "area",
268            Kind::Number(Number::new(9000.0, Some("ft\u{00b2}".into()))),
269        );
270
271        let cols = vec![
272            HCol::new("area"),
273            HCol::new("dis"),
274            HCol::new("id"),
275            HCol::new("site"),
276        ];
277        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![updated_site]);
278        let body = encode_grid_zinc(&import_grid);
279
280        let req = actix_test::TestRequest::post()
281            .uri("/api/import")
282            .insert_header(("Content-Type", "text/zinc"))
283            .insert_header(("Accept", "text/zinc"))
284            .set_payload(body)
285            .to_request();
286
287        let resp = actix_test::call_service(&app, req).await;
288        assert_eq!(resp.status(), 200);
289
290        // Still only 1 entity (updated, not duplicated)
291        assert_eq!(state.graph.len(), 1);
292
293        // Verify the entity was updated
294        let entity = state.graph.get("site-1").unwrap();
295        assert_eq!(entity.get("dis"), Some(&Kind::Str("Updated Site".into())));
296    }
297
298    #[actix_web::test]
299    async fn import_then_export_roundtrip() {
300        let state = test_app_state();
301        let app = actix_test::init_service(
302            App::new()
303                .app_data(state.clone())
304                .route("/api/import", web::post().to(super::handle_import))
305                .route("/api/export", web::post().to(super::handle_export)),
306        )
307        .await;
308
309        // Import two entities
310        let site = make_site("site-1");
311        let equip = make_equip("equip-1", "site-1");
312        let cols = vec![
313            HCol::new("area"),
314            HCol::new("dis"),
315            HCol::new("equip"),
316            HCol::new("id"),
317            HCol::new("site"),
318            HCol::new("siteRef"),
319        ];
320        let import_grid = HGrid::from_parts(HDict::new(), cols, vec![site, equip]);
321        let body = encode_grid_zinc(&import_grid);
322
323        let import_req = actix_test::TestRequest::post()
324            .uri("/api/import")
325            .insert_header(("Content-Type", "text/zinc"))
326            .insert_header(("Accept", "text/zinc"))
327            .set_payload(body)
328            .to_request();
329
330        let import_resp = actix_test::call_service(&app, import_req).await;
331        assert_eq!(import_resp.status(), 200);
332
333        // Now export
334        let export_req = actix_test::TestRequest::post()
335            .uri("/api/export")
336            .insert_header(("Accept", "text/zinc"))
337            .to_request();
338
339        let export_resp = actix_test::call_service(&app, export_req).await;
340        assert_eq!(export_resp.status(), 200);
341
342        let export_body = actix_test::read_body(export_resp).await;
343        let export_str = std::str::from_utf8(&export_body).unwrap();
344        let exported_grid = decode_grid_zinc(export_str);
345
346        // Should have 2 rows
347        assert_eq!(exported_grid.len(), 2);
348
349        // Verify the exported grid has the expected columns
350        assert!(exported_grid.col("id").is_some());
351        assert!(exported_grid.col("dis").is_some());
352
353        // Verify we can find both entities by checking id refs in the rows
354        let mut ids: Vec<String> = exported_grid
355            .rows
356            .iter()
357            .filter_map(|r| r.id().map(|r| r.val.clone()))
358            .collect();
359        ids.sort();
360        assert_eq!(ids, vec!["equip-1", "site-1"]);
361    }
362}