Skip to main content

helios_persistence/backends/mongodb/
bulk_export.rs

1//! Bulk export implementation for the MongoDB backend.
2//!
3//! Implements [`ExportDataProvider`], [`PatientExportProvider`], and
4//! [`GroupExportProvider`] over the `resources` collection. Job-state storage
5//! is not provided by MongoDB; deployments using MongoDB as the primary FHIR
6//! backend pair it with an embedded SQLite sidecar for the
7//! [`BulkExportJobStore`].
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use mongodb::{
12    Collection,
13    bson::{self, Bson, DateTime as BsonDateTime, Document, doc},
14    options::FindOptions,
15};
16use serde_json::Value;
17
18use crate::core::bulk_export::{
19    ExportDataProvider, ExportRequest, GroupExportProvider, NdjsonBatch, PatientExportProvider,
20};
21use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
22use crate::tenant::TenantContext;
23
24use super::MongoBackend;
25
26fn internal_error(msg: impl Into<String>) -> StorageError {
27    StorageError::Backend(BackendError::Internal {
28        backend_name: "mongodb".to_string(),
29        message: msg.into(),
30        source: None,
31    })
32}
33
34fn chrono_to_bson(dt: DateTime<Utc>) -> BsonDateTime {
35    BsonDateTime::from_millis(dt.timestamp_millis())
36}
37
38fn bson_to_chrono(dt: &BsonDateTime) -> DateTime<Utc> {
39    DateTime::<Utc>::from_timestamp_millis(dt.timestamp_millis()).unwrap_or_else(Utc::now)
40}
41
42fn document_to_value(doc: &Document) -> StorageResult<Value> {
43    bson::from_bson::<Value>(Bson::Document(doc.clone()))
44        .map_err(|e| internal_error(format!("deserialize resource: {e}")))
45}
46
47/// Parses a `{rfc3339}|{id}` keyset-pagination cursor.
48fn parse_cursor(cursor: &str) -> Option<(DateTime<Utc>, String)> {
49    let (ts, id) = cursor.split_once('|')?;
50    let dt = DateTime::parse_from_rfc3339(ts).ok()?.with_timezone(&Utc);
51    Some((dt, id.to_string()))
52}
53
54fn make_cursor(last_updated: &BsonDateTime, id: &str) -> String {
55    format!("{}|{}", bson_to_chrono(last_updated).to_rfc3339(), id)
56}
57
58async fn collect_cursor(mut cursor: mongodb::Cursor<Document>) -> StorageResult<Vec<Document>> {
59    let mut docs = Vec::new();
60    while cursor
61        .advance()
62        .await
63        .map_err(|e| internal_error(format!("cursor advance: {e}")))?
64    {
65        docs.push(
66            cursor
67                .deserialize_current()
68                .map_err(|e| internal_error(format!("cursor deserialize: {e}")))?,
69        );
70    }
71    Ok(docs)
72}
73
74fn ndjson_from_docs(docs: Vec<Document>, batch_size: u32) -> StorageResult<NdjsonBatch> {
75    let has_more = docs.len() > batch_size as usize;
76    let slice = if has_more {
77        &docs[..batch_size as usize]
78    } else {
79        &docs[..]
80    };
81    let mut lines = Vec::with_capacity(slice.len());
82    let mut last_cursor = None;
83    for d in slice {
84        let data = d
85            .get_document("data")
86            .map_err(|e| internal_error(format!("missing data payload: {e}")))?;
87        let val = document_to_value(data)?;
88        let line =
89            serde_json::to_string(&val).map_err(|e| internal_error(format!("serialize: {e}")))?;
90        lines.push(line);
91        let last_updated = d
92            .get_datetime("last_updated")
93            .map_err(|e| internal_error(format!("missing last_updated: {e}")))?;
94        let id = d
95            .get_str("id")
96            .map_err(|e| internal_error(format!("missing id: {e}")))?;
97        last_cursor = Some(make_cursor(last_updated, id));
98    }
99    Ok(NdjsonBatch {
100        lines,
101        next_cursor: if has_more { last_cursor } else { None },
102        is_last: !has_more,
103    })
104}
105
106#[async_trait]
107impl ExportDataProvider for MongoBackend {
108    async fn list_export_types(
109        &self,
110        tenant: &TenantContext,
111        request: &ExportRequest,
112    ) -> StorageResult<Vec<String>> {
113        let db = self.get_database().await?;
114        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
115        let tenant_id = tenant.tenant_id().as_str();
116
117        if !request.resource_types.is_empty() {
118            let mut valid = Vec::new();
119            for rt in &request.resource_types {
120                let exists = resources
121                    .find_one(doc! {
122                        "tenant_id": tenant_id,
123                        "resource_type": rt.as_str(),
124                        "is_deleted": false,
125                    })
126                    .await
127                    .map_err(|e| internal_error(format!("validate type: {e}")))?
128                    .is_some();
129                if exists {
130                    valid.push(rt.clone());
131                }
132            }
133            return Ok(valid);
134        }
135
136        let types = resources
137            .distinct(
138                "resource_type",
139                doc! { "tenant_id": tenant_id, "is_deleted": false },
140            )
141            .await
142            .map_err(|e| internal_error(format!("distinct types: {e}")))?;
143        let mut out: Vec<String> = types
144            .into_iter()
145            .filter_map(|b| match b {
146                Bson::String(s) => Some(s),
147                _ => None,
148            })
149            .collect();
150        out.sort();
151        Ok(out)
152    }
153
154    async fn count_export_resources(
155        &self,
156        tenant: &TenantContext,
157        request: &ExportRequest,
158        resource_type: &str,
159    ) -> StorageResult<u64> {
160        let db = self.get_database().await?;
161        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
162        let tenant_id = tenant.tenant_id().as_str();
163
164        let mut filter = doc! {
165            "tenant_id": tenant_id,
166            "resource_type": resource_type,
167            "is_deleted": false,
168        };
169        if let Some(since) = request.since {
170            filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
171        }
172
173        resources
174            .count_documents(filter)
175            .await
176            .map_err(|e| internal_error(format!("count: {e}")))
177    }
178
179    async fn fetch_export_batch(
180        &self,
181        tenant: &TenantContext,
182        request: &ExportRequest,
183        resource_type: &str,
184        cursor: Option<&str>,
185        batch_size: u32,
186    ) -> StorageResult<NdjsonBatch> {
187        let db = self.get_database().await?;
188        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
189        let tenant_id = tenant.tenant_id().as_str();
190
191        let mut filter = doc! {
192            "tenant_id": tenant_id,
193            "resource_type": resource_type,
194            "is_deleted": false,
195        };
196        if let Some(since) = request.since {
197            filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
198        }
199        if let Some((cur_dt, cur_id)) = cursor.and_then(parse_cursor) {
200            filter.insert(
201                "$or",
202                vec![
203                    doc! { "last_updated": { "$gt": chrono_to_bson(cur_dt) } },
204                    doc! {
205                        "last_updated": chrono_to_bson(cur_dt),
206                        "id": { "$gt": cur_id },
207                    },
208                ],
209            );
210        }
211
212        let opts = FindOptions::builder()
213            .sort(doc! { "last_updated": 1, "id": 1 })
214            .limit((batch_size as i64) + 1)
215            .build();
216        let cursor_stream = resources
217            .find(filter)
218            .with_options(opts)
219            .await
220            .map_err(|e| internal_error(format!("find: {e}")))?;
221        let docs = collect_cursor(cursor_stream).await?;
222        ndjson_from_docs(docs, batch_size)
223    }
224}
225
226#[async_trait]
227impl PatientExportProvider for MongoBackend {
228    async fn list_patient_ids(
229        &self,
230        tenant: &TenantContext,
231        request: &ExportRequest,
232        cursor: Option<&str>,
233        batch_size: u32,
234    ) -> StorageResult<(Vec<String>, Option<String>)> {
235        let db = self.get_database().await?;
236        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
237        let tenant_id = tenant.tenant_id().as_str();
238
239        let mut filter = doc! {
240            "tenant_id": tenant_id,
241            "resource_type": "Patient",
242            "is_deleted": false,
243        };
244        if let Some(since) = request.since {
245            filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
246        }
247        if let Some(c) = cursor {
248            filter.insert("id", doc! { "$gt": c });
249        }
250
251        let opts = FindOptions::builder()
252            .sort(doc! { "id": 1 })
253            .limit((batch_size as i64) + 1)
254            .projection(doc! { "id": 1 })
255            .build();
256        let cursor_stream = resources
257            .find(filter)
258            .with_options(opts)
259            .await
260            .map_err(|e| internal_error(format!("list patients: {e}")))?;
261        let docs = collect_cursor(cursor_stream).await?;
262        let mut ids: Vec<String> = docs
263            .iter()
264            .filter_map(|d| d.get_str("id").ok().map(|s| s.to_string()))
265            .collect();
266
267        let has_more = ids.len() > batch_size as usize;
268        if has_more {
269            ids.truncate(batch_size as usize);
270        }
271        let next = if has_more { ids.last().cloned() } else { None };
272        Ok((ids, next))
273    }
274
275    async fn fetch_patient_compartment_batch(
276        &self,
277        tenant: &TenantContext,
278        request: &ExportRequest,
279        resource_type: &str,
280        patient_ids: &[String],
281        cursor: Option<&str>,
282        batch_size: u32,
283    ) -> StorageResult<NdjsonBatch> {
284        if patient_ids.is_empty() {
285            return Ok(NdjsonBatch::empty());
286        }
287        let db = self.get_database().await?;
288        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
289        let tenant_id = tenant.tenant_id().as_str();
290
291        // Patient itself: just filter by id list.
292        if resource_type == "Patient" {
293            let mut filter = doc! {
294                "tenant_id": tenant_id,
295                "resource_type": "Patient",
296                "is_deleted": false,
297                "id": { "$in": patient_ids.to_vec() },
298            };
299            if let Some(since) = request.since {
300                filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
301            }
302            if let Some((cur_dt, cur_id)) = cursor.and_then(parse_cursor) {
303                filter.insert(
304                    "$or",
305                    vec![
306                        doc! { "last_updated": { "$gt": chrono_to_bson(cur_dt) } },
307                        doc! {
308                            "last_updated": chrono_to_bson(cur_dt),
309                            "id": { "$gt": cur_id },
310                        },
311                    ],
312                );
313            }
314            let opts = FindOptions::builder()
315                .sort(doc! { "last_updated": 1, "id": 1 })
316                .limit((batch_size as i64) + 1)
317                .build();
318            let cursor_stream = resources
319                .find(filter)
320                .with_options(opts)
321                .await
322                .map_err(|e| internal_error(format!("compartment patients: {e}")))?;
323            let docs = collect_cursor(cursor_stream).await?;
324            return ndjson_from_docs(docs, batch_size);
325        }
326
327        // Other types: filter directly on the resource payload using dot
328        // notation on `data.subject.reference` / `data.patient.reference`.
329        // This is robust whether or not the local search_index is populated
330        // (mongodb-elasticsearch offloads search, leaving search_index empty).
331        let refs: Vec<String> = patient_ids.iter().map(|p| format!("Patient/{p}")).collect();
332        let mut filter = doc! {
333            "tenant_id": tenant_id,
334            "resource_type": resource_type,
335            "is_deleted": false,
336            "$or": vec![
337                doc! { "data.subject.reference": { "$in": &refs } },
338                doc! { "data.patient.reference": { "$in": &refs } },
339            ],
340        };
341        if let Some(since) = request.since {
342            filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
343        }
344        if let Some((cur_dt, cur_id)) = cursor.and_then(parse_cursor) {
345            // Combine compartment-match $or with cursor keyset $or via $and.
346            let compartment = filter
347                .remove("$or")
348                .expect("compartment $or was inserted above");
349            filter.insert(
350                "$and",
351                vec![
352                    doc! { "$or": compartment },
353                    doc! { "$or": vec![
354                        doc! { "last_updated": { "$gt": chrono_to_bson(cur_dt) } },
355                        doc! {
356                            "last_updated": chrono_to_bson(cur_dt),
357                            "id": { "$gt": cur_id },
358                        },
359                    ]},
360                ],
361            );
362        }
363        let opts = FindOptions::builder()
364            .sort(doc! { "last_updated": 1, "id": 1 })
365            .limit((batch_size as i64) + 1)
366            .build();
367        let cursor_stream = resources
368            .find(filter)
369            .with_options(opts)
370            .await
371            .map_err(|e| internal_error(format!("compartment fetch: {e}")))?;
372        let docs = collect_cursor(cursor_stream).await?;
373        ndjson_from_docs(docs, batch_size)
374    }
375}
376
377#[async_trait]
378impl GroupExportProvider for MongoBackend {
379    async fn get_group_members(
380        &self,
381        tenant: &TenantContext,
382        group_id: &str,
383    ) -> StorageResult<Vec<String>> {
384        let db = self.get_database().await?;
385        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
386        let tenant_id = tenant.tenant_id().as_str();
387
388        let doc_opt = resources
389            .find_one(doc! {
390                "tenant_id": tenant_id,
391                "resource_type": "Group",
392                "id": group_id,
393                "is_deleted": false,
394            })
395            .await
396            .map_err(|e| internal_error(format!("get group: {e}")))?;
397        let d = doc_opt.ok_or_else(|| {
398            StorageError::BulkExport(BulkExportError::GroupNotFound {
399                group_id: group_id.to_string(),
400            })
401        })?;
402        let data = d
403            .get_document("data")
404            .map_err(|e| internal_error(format!("missing data payload: {e}")))?;
405        let group: Value = document_to_value(data)?;
406        let mut refs = Vec::new();
407        if let Some(members) = group.get("member").and_then(|m| m.as_array()) {
408            for member in members {
409                if let Some(reference) = member
410                    .get("entity")
411                    .and_then(|e| e.get("reference"))
412                    .and_then(|r| r.as_str())
413                {
414                    refs.push(reference.to_string());
415                }
416            }
417        }
418        Ok(refs)
419    }
420
421    async fn resolve_group_patient_ids(
422        &self,
423        tenant: &TenantContext,
424        group_id: &str,
425    ) -> StorageResult<Vec<String>> {
426        use std::collections::HashSet;
427        let mut visited: HashSet<String> = HashSet::new();
428        let mut seen: HashSet<String> = HashSet::new();
429        let mut patient_ids: Vec<String> = Vec::new();
430        let mut worklist: Vec<String> = vec![group_id.to_string()];
431        while let Some(gid) = worklist.pop() {
432            if !visited.insert(gid.clone()) {
433                continue;
434            }
435            let members = self.get_group_members(tenant, &gid).await?;
436            for r in members {
437                if let Some(pid) = r.strip_prefix("Patient/") {
438                    if seen.insert(pid.to_string()) {
439                        patient_ids.push(pid.to_string());
440                    }
441                } else if let Some(nested) = r.strip_prefix("Group/") {
442                    worklist.push(nested.to_string());
443                }
444            }
445        }
446        Ok(patient_ids)
447    }
448
449    async fn get_group_members_with_periods(
450        &self,
451        tenant: &TenantContext,
452        group_id: &str,
453    ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
454        let db = self.get_database().await?;
455        let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
456        let tenant_id = tenant.tenant_id().as_str();
457        let doc_opt = resources
458            .find_one(doc! {
459                "tenant_id": tenant_id,
460                "resource_type": "Group",
461                "id": group_id,
462                "is_deleted": false,
463            })
464            .await
465            .map_err(|e| internal_error(format!("get group: {e}")))?;
466        let d = doc_opt.ok_or_else(|| {
467            StorageError::BulkExport(BulkExportError::GroupNotFound {
468                group_id: group_id.to_string(),
469            })
470        })?;
471        let data = d
472            .get_document("data")
473            .map_err(|e| internal_error(format!("missing data payload: {e}")))?;
474        let group: Value = document_to_value(data)?;
475        let mut out = Vec::new();
476        if let Some(arr) = group.get("member").and_then(|m| m.as_array()) {
477            for member in arr {
478                let Some(reference) = member
479                    .get("entity")
480                    .and_then(|e| e.get("reference"))
481                    .and_then(|r| r.as_str())
482                else {
483                    continue;
484                };
485                let period_start = member
486                    .get("period")
487                    .and_then(|p| p.get("start"))
488                    .and_then(|s| s.as_str())
489                    .and_then(|s| {
490                        DateTime::parse_from_rfc3339(s)
491                            .ok()
492                            .map(|dt| dt.with_timezone(&Utc))
493                    });
494                out.push((reference.to_string(), period_start));
495            }
496        }
497        Ok(out)
498    }
499}