helios_persistence/backends/mongodb/
bulk_export.rs1use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use mongodb::{
12 Collection,
13 bson::{self, Bson, DateTime as BsonDateTime, Document, doc},
14 options::FindOptions,
15};
16use serde_json::Value;
17
18use crate::core::bulk_export::{
19 ExportDataProvider, ExportRequest, GroupExportProvider, NdjsonBatch, PatientExportProvider,
20};
21use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
22use crate::tenant::TenantContext;
23
24use super::MongoBackend;
25
26fn internal_error(msg: impl Into<String>) -> StorageError {
27 StorageError::Backend(BackendError::Internal {
28 backend_name: "mongodb".to_string(),
29 message: msg.into(),
30 source: None,
31 })
32}
33
34fn chrono_to_bson(dt: DateTime<Utc>) -> BsonDateTime {
35 BsonDateTime::from_millis(dt.timestamp_millis())
36}
37
38fn bson_to_chrono(dt: &BsonDateTime) -> DateTime<Utc> {
39 DateTime::<Utc>::from_timestamp_millis(dt.timestamp_millis()).unwrap_or_else(Utc::now)
40}
41
42fn document_to_value(doc: &Document) -> StorageResult<Value> {
43 bson::from_bson::<Value>(Bson::Document(doc.clone()))
44 .map_err(|e| internal_error(format!("deserialize resource: {e}")))
45}
46
47fn parse_cursor(cursor: &str) -> Option<(DateTime<Utc>, String)> {
49 let (ts, id) = cursor.split_once('|')?;
50 let dt = DateTime::parse_from_rfc3339(ts).ok()?.with_timezone(&Utc);
51 Some((dt, id.to_string()))
52}
53
54fn make_cursor(last_updated: &BsonDateTime, id: &str) -> String {
55 format!("{}|{}", bson_to_chrono(last_updated).to_rfc3339(), id)
56}
57
58async fn collect_cursor(mut cursor: mongodb::Cursor<Document>) -> StorageResult<Vec<Document>> {
59 let mut docs = Vec::new();
60 while cursor
61 .advance()
62 .await
63 .map_err(|e| internal_error(format!("cursor advance: {e}")))?
64 {
65 docs.push(
66 cursor
67 .deserialize_current()
68 .map_err(|e| internal_error(format!("cursor deserialize: {e}")))?,
69 );
70 }
71 Ok(docs)
72}
73
74fn ndjson_from_docs(docs: Vec<Document>, batch_size: u32) -> StorageResult<NdjsonBatch> {
75 let has_more = docs.len() > batch_size as usize;
76 let slice = if has_more {
77 &docs[..batch_size as usize]
78 } else {
79 &docs[..]
80 };
81 let mut lines = Vec::with_capacity(slice.len());
82 let mut last_cursor = None;
83 for d in slice {
84 let data = d
85 .get_document("data")
86 .map_err(|e| internal_error(format!("missing data payload: {e}")))?;
87 let val = document_to_value(data)?;
88 let line =
89 serde_json::to_string(&val).map_err(|e| internal_error(format!("serialize: {e}")))?;
90 lines.push(line);
91 let last_updated = d
92 .get_datetime("last_updated")
93 .map_err(|e| internal_error(format!("missing last_updated: {e}")))?;
94 let id = d
95 .get_str("id")
96 .map_err(|e| internal_error(format!("missing id: {e}")))?;
97 last_cursor = Some(make_cursor(last_updated, id));
98 }
99 Ok(NdjsonBatch {
100 lines,
101 next_cursor: if has_more { last_cursor } else { None },
102 is_last: !has_more,
103 })
104}
105
106#[async_trait]
107impl ExportDataProvider for MongoBackend {
108 async fn list_export_types(
109 &self,
110 tenant: &TenantContext,
111 request: &ExportRequest,
112 ) -> StorageResult<Vec<String>> {
113 let db = self.get_database().await?;
114 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
115 let tenant_id = tenant.tenant_id().as_str();
116
117 if !request.resource_types.is_empty() {
118 let mut valid = Vec::new();
119 for rt in &request.resource_types {
120 let exists = resources
121 .find_one(doc! {
122 "tenant_id": tenant_id,
123 "resource_type": rt.as_str(),
124 "is_deleted": false,
125 })
126 .await
127 .map_err(|e| internal_error(format!("validate type: {e}")))?
128 .is_some();
129 if exists {
130 valid.push(rt.clone());
131 }
132 }
133 return Ok(valid);
134 }
135
136 let types = resources
137 .distinct(
138 "resource_type",
139 doc! { "tenant_id": tenant_id, "is_deleted": false },
140 )
141 .await
142 .map_err(|e| internal_error(format!("distinct types: {e}")))?;
143 let mut out: Vec<String> = types
144 .into_iter()
145 .filter_map(|b| match b {
146 Bson::String(s) => Some(s),
147 _ => None,
148 })
149 .collect();
150 out.sort();
151 Ok(out)
152 }
153
154 async fn count_export_resources(
155 &self,
156 tenant: &TenantContext,
157 request: &ExportRequest,
158 resource_type: &str,
159 ) -> StorageResult<u64> {
160 let db = self.get_database().await?;
161 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
162 let tenant_id = tenant.tenant_id().as_str();
163
164 let mut filter = doc! {
165 "tenant_id": tenant_id,
166 "resource_type": resource_type,
167 "is_deleted": false,
168 };
169 if let Some(since) = request.since {
170 filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
171 }
172
173 resources
174 .count_documents(filter)
175 .await
176 .map_err(|e| internal_error(format!("count: {e}")))
177 }
178
179 async fn fetch_export_batch(
180 &self,
181 tenant: &TenantContext,
182 request: &ExportRequest,
183 resource_type: &str,
184 cursor: Option<&str>,
185 batch_size: u32,
186 ) -> StorageResult<NdjsonBatch> {
187 let db = self.get_database().await?;
188 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
189 let tenant_id = tenant.tenant_id().as_str();
190
191 let mut filter = doc! {
192 "tenant_id": tenant_id,
193 "resource_type": resource_type,
194 "is_deleted": false,
195 };
196 if let Some(since) = request.since {
197 filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
198 }
199 if let Some((cur_dt, cur_id)) = cursor.and_then(parse_cursor) {
200 filter.insert(
201 "$or",
202 vec![
203 doc! { "last_updated": { "$gt": chrono_to_bson(cur_dt) } },
204 doc! {
205 "last_updated": chrono_to_bson(cur_dt),
206 "id": { "$gt": cur_id },
207 },
208 ],
209 );
210 }
211
212 let opts = FindOptions::builder()
213 .sort(doc! { "last_updated": 1, "id": 1 })
214 .limit((batch_size as i64) + 1)
215 .build();
216 let cursor_stream = resources
217 .find(filter)
218 .with_options(opts)
219 .await
220 .map_err(|e| internal_error(format!("find: {e}")))?;
221 let docs = collect_cursor(cursor_stream).await?;
222 ndjson_from_docs(docs, batch_size)
223 }
224}
225
226#[async_trait]
227impl PatientExportProvider for MongoBackend {
228 async fn list_patient_ids(
229 &self,
230 tenant: &TenantContext,
231 request: &ExportRequest,
232 cursor: Option<&str>,
233 batch_size: u32,
234 ) -> StorageResult<(Vec<String>, Option<String>)> {
235 let db = self.get_database().await?;
236 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
237 let tenant_id = tenant.tenant_id().as_str();
238
239 let mut filter = doc! {
240 "tenant_id": tenant_id,
241 "resource_type": "Patient",
242 "is_deleted": false,
243 };
244 if let Some(since) = request.since {
245 filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
246 }
247 if let Some(c) = cursor {
248 filter.insert("id", doc! { "$gt": c });
249 }
250
251 let opts = FindOptions::builder()
252 .sort(doc! { "id": 1 })
253 .limit((batch_size as i64) + 1)
254 .projection(doc! { "id": 1 })
255 .build();
256 let cursor_stream = resources
257 .find(filter)
258 .with_options(opts)
259 .await
260 .map_err(|e| internal_error(format!("list patients: {e}")))?;
261 let docs = collect_cursor(cursor_stream).await?;
262 let mut ids: Vec<String> = docs
263 .iter()
264 .filter_map(|d| d.get_str("id").ok().map(|s| s.to_string()))
265 .collect();
266
267 let has_more = ids.len() > batch_size as usize;
268 if has_more {
269 ids.truncate(batch_size as usize);
270 }
271 let next = if has_more { ids.last().cloned() } else { None };
272 Ok((ids, next))
273 }
274
275 async fn fetch_patient_compartment_batch(
276 &self,
277 tenant: &TenantContext,
278 request: &ExportRequest,
279 resource_type: &str,
280 patient_ids: &[String],
281 cursor: Option<&str>,
282 batch_size: u32,
283 ) -> StorageResult<NdjsonBatch> {
284 if patient_ids.is_empty() {
285 return Ok(NdjsonBatch::empty());
286 }
287 let db = self.get_database().await?;
288 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
289 let tenant_id = tenant.tenant_id().as_str();
290
291 if resource_type == "Patient" {
293 let mut filter = doc! {
294 "tenant_id": tenant_id,
295 "resource_type": "Patient",
296 "is_deleted": false,
297 "id": { "$in": patient_ids.to_vec() },
298 };
299 if let Some(since) = request.since {
300 filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
301 }
302 if let Some((cur_dt, cur_id)) = cursor.and_then(parse_cursor) {
303 filter.insert(
304 "$or",
305 vec![
306 doc! { "last_updated": { "$gt": chrono_to_bson(cur_dt) } },
307 doc! {
308 "last_updated": chrono_to_bson(cur_dt),
309 "id": { "$gt": cur_id },
310 },
311 ],
312 );
313 }
314 let opts = FindOptions::builder()
315 .sort(doc! { "last_updated": 1, "id": 1 })
316 .limit((batch_size as i64) + 1)
317 .build();
318 let cursor_stream = resources
319 .find(filter)
320 .with_options(opts)
321 .await
322 .map_err(|e| internal_error(format!("compartment patients: {e}")))?;
323 let docs = collect_cursor(cursor_stream).await?;
324 return ndjson_from_docs(docs, batch_size);
325 }
326
327 let refs: Vec<String> = patient_ids.iter().map(|p| format!("Patient/{p}")).collect();
332 let mut filter = doc! {
333 "tenant_id": tenant_id,
334 "resource_type": resource_type,
335 "is_deleted": false,
336 "$or": vec![
337 doc! { "data.subject.reference": { "$in": &refs } },
338 doc! { "data.patient.reference": { "$in": &refs } },
339 ],
340 };
341 if let Some(since) = request.since {
342 filter.insert("last_updated", doc! { "$gte": chrono_to_bson(since) });
343 }
344 if let Some((cur_dt, cur_id)) = cursor.and_then(parse_cursor) {
345 let compartment = filter
347 .remove("$or")
348 .expect("compartment $or was inserted above");
349 filter.insert(
350 "$and",
351 vec![
352 doc! { "$or": compartment },
353 doc! { "$or": vec![
354 doc! { "last_updated": { "$gt": chrono_to_bson(cur_dt) } },
355 doc! {
356 "last_updated": chrono_to_bson(cur_dt),
357 "id": { "$gt": cur_id },
358 },
359 ]},
360 ],
361 );
362 }
363 let opts = FindOptions::builder()
364 .sort(doc! { "last_updated": 1, "id": 1 })
365 .limit((batch_size as i64) + 1)
366 .build();
367 let cursor_stream = resources
368 .find(filter)
369 .with_options(opts)
370 .await
371 .map_err(|e| internal_error(format!("compartment fetch: {e}")))?;
372 let docs = collect_cursor(cursor_stream).await?;
373 ndjson_from_docs(docs, batch_size)
374 }
375}
376
377#[async_trait]
378impl GroupExportProvider for MongoBackend {
379 async fn get_group_members(
380 &self,
381 tenant: &TenantContext,
382 group_id: &str,
383 ) -> StorageResult<Vec<String>> {
384 let db = self.get_database().await?;
385 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
386 let tenant_id = tenant.tenant_id().as_str();
387
388 let doc_opt = resources
389 .find_one(doc! {
390 "tenant_id": tenant_id,
391 "resource_type": "Group",
392 "id": group_id,
393 "is_deleted": false,
394 })
395 .await
396 .map_err(|e| internal_error(format!("get group: {e}")))?;
397 let d = doc_opt.ok_or_else(|| {
398 StorageError::BulkExport(BulkExportError::GroupNotFound {
399 group_id: group_id.to_string(),
400 })
401 })?;
402 let data = d
403 .get_document("data")
404 .map_err(|e| internal_error(format!("missing data payload: {e}")))?;
405 let group: Value = document_to_value(data)?;
406 let mut refs = Vec::new();
407 if let Some(members) = group.get("member").and_then(|m| m.as_array()) {
408 for member in members {
409 if let Some(reference) = member
410 .get("entity")
411 .and_then(|e| e.get("reference"))
412 .and_then(|r| r.as_str())
413 {
414 refs.push(reference.to_string());
415 }
416 }
417 }
418 Ok(refs)
419 }
420
421 async fn resolve_group_patient_ids(
422 &self,
423 tenant: &TenantContext,
424 group_id: &str,
425 ) -> StorageResult<Vec<String>> {
426 use std::collections::HashSet;
427 let mut visited: HashSet<String> = HashSet::new();
428 let mut seen: HashSet<String> = HashSet::new();
429 let mut patient_ids: Vec<String> = Vec::new();
430 let mut worklist: Vec<String> = vec![group_id.to_string()];
431 while let Some(gid) = worklist.pop() {
432 if !visited.insert(gid.clone()) {
433 continue;
434 }
435 let members = self.get_group_members(tenant, &gid).await?;
436 for r in members {
437 if let Some(pid) = r.strip_prefix("Patient/") {
438 if seen.insert(pid.to_string()) {
439 patient_ids.push(pid.to_string());
440 }
441 } else if let Some(nested) = r.strip_prefix("Group/") {
442 worklist.push(nested.to_string());
443 }
444 }
445 }
446 Ok(patient_ids)
447 }
448
449 async fn get_group_members_with_periods(
450 &self,
451 tenant: &TenantContext,
452 group_id: &str,
453 ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
454 let db = self.get_database().await?;
455 let resources: Collection<Document> = db.collection(MongoBackend::RESOURCES_COLLECTION);
456 let tenant_id = tenant.tenant_id().as_str();
457 let doc_opt = resources
458 .find_one(doc! {
459 "tenant_id": tenant_id,
460 "resource_type": "Group",
461 "id": group_id,
462 "is_deleted": false,
463 })
464 .await
465 .map_err(|e| internal_error(format!("get group: {e}")))?;
466 let d = doc_opt.ok_or_else(|| {
467 StorageError::BulkExport(BulkExportError::GroupNotFound {
468 group_id: group_id.to_string(),
469 })
470 })?;
471 let data = d
472 .get_document("data")
473 .map_err(|e| internal_error(format!("missing data payload: {e}")))?;
474 let group: Value = document_to_value(data)?;
475 let mut out = Vec::new();
476 if let Some(arr) = group.get("member").and_then(|m| m.as_array()) {
477 for member in arr {
478 let Some(reference) = member
479 .get("entity")
480 .and_then(|e| e.get("reference"))
481 .and_then(|r| r.as_str())
482 else {
483 continue;
484 };
485 let period_start = member
486 .get("period")
487 .and_then(|p| p.get("start"))
488 .and_then(|s| s.as_str())
489 .and_then(|s| {
490 DateTime::parse_from_rfc3339(s)
491 .ok()
492 .map(|dt| dt.with_timezone(&Utc))
493 });
494 out.push((reference.to_string(), period_start));
495 }
496 }
497 Ok(out)
498 }
499}