1use async_trait::async_trait;
4use chrono::Utc;
5use serde_json::Value;
6
7use crate::core::bulk_export::{
8 BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportManifest,
9 ExportOutputFile, ExportProgress, ExportRequest, ExportStatus, GroupExportProvider,
10 NdjsonBatch, PatientExportProvider, TypeExportProgress,
11};
12use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
13use crate::tenant::TenantContext;
14
15use super::PostgresBackend;
16
17fn internal_error(message: String) -> StorageError {
18 StorageError::Backend(BackendError::Internal {
19 backend_name: "postgres".to_string(),
20 message,
21 source: None,
22 })
23}
24
25#[async_trait]
26impl BulkExportStorage for PostgresBackend {
27 async fn start_export(
28 &self,
29 tenant: &TenantContext,
30 request: ExportRequest,
31 ) -> StorageResult<ExportJobId> {
32 let client = self.get_client().await?;
33 let tenant_id = tenant.tenant_id().as_str();
34
35 let row = client
37 .query_one(
38 "SELECT COUNT(*) FROM bulk_export_jobs
39 WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress')",
40 &[&tenant_id],
41 )
42 .await
43 .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
44
45 let active_count: i64 = row.get(0);
46 if active_count >= 5 {
47 return Err(StorageError::BulkExport(
48 BulkExportError::TooManyConcurrentExports { max_concurrent: 5 },
49 ));
50 }
51
52 let job_id = ExportJobId::new();
53 let now = Utc::now();
54
55 let level_str = match &request.level {
56 ExportLevel::System => "system".to_string(),
57 ExportLevel::Patient => "patient".to_string(),
58 ExportLevel::Group { .. } => "group".to_string(),
59 };
60
61 let group_id = request.group_id().map(|s| s.to_string());
62
63 let request_json = serde_json::to_string(&request)
64 .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
65
66 client
67 .execute(
68 "INSERT INTO bulk_export_jobs
69 (id, tenant_id, status, level, group_id, request_json, transaction_time, created_at)
70 VALUES ($1, $2, 'accepted', $3, $4, $5, $6, $7)",
71 &[
72 &job_id.as_str(),
73 &tenant_id,
74 &level_str.as_str(),
75 &group_id,
76 &request_json.as_str(),
77 &now,
78 &now,
79 ],
80 )
81 .await
82 .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
83
84 Ok(job_id)
85 }
86
87 async fn get_export_status(
88 &self,
89 tenant: &TenantContext,
90 job_id: &ExportJobId,
91 ) -> StorageResult<ExportProgress> {
92 let client = self.get_client().await?;
93 let tenant_id = tenant.tenant_id().as_str();
94
95 let rows = client
96 .query(
97 "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
98 FROM bulk_export_jobs
99 WHERE id = $1 AND tenant_id = $2",
100 &[&job_id.as_str(), &tenant_id],
101 )
102 .await
103 .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
104
105 if rows.is_empty() {
106 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
107 job_id: job_id.to_string(),
108 }));
109 }
110
111 let row = &rows[0];
112 let status_str: String = row.get(0);
113 let level_str: String = row.get(1);
114 let group_id: Option<String> = row.get(2);
115 let transaction_time: chrono::DateTime<Utc> = row.get(3);
116 let started_at: Option<chrono::DateTime<Utc>> = row.get(4);
117 let completed_at: Option<chrono::DateTime<Utc>> = row.get(5);
118 let error_message: Option<String> = row.get(6);
119 let current_type: Option<String> = row.get(7);
120
121 let status: ExportStatus = status_str
122 .parse()
123 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
124
125 let level = match level_str.as_str() {
126 "system" => ExportLevel::System,
127 "patient" => ExportLevel::Patient,
128 "group" => ExportLevel::Group {
129 group_id: group_id.unwrap_or_default(),
130 },
131 _ => {
132 return Err(internal_error(format!(
133 "Invalid level in database: {}",
134 level_str
135 )));
136 }
137 };
138
139 let progress_rows = client
141 .query(
142 "SELECT resource_type, total_count, exported_count, error_count, cursor_state
143 FROM bulk_export_progress
144 WHERE job_id = $1",
145 &[&job_id.as_str()],
146 )
147 .await
148 .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?;
149
150 let type_progress: Vec<TypeExportProgress> = progress_rows
151 .iter()
152 .map(|r| TypeExportProgress {
153 resource_type: r.get(0),
154 total_count: r.get::<_, Option<i64>>(1).map(|v| v as u64),
155 exported_count: r.get::<_, i64>(2) as u64,
156 error_count: r.get::<_, i64>(3) as u64,
157 cursor_state: r.get(4),
158 })
159 .collect();
160
161 Ok(ExportProgress {
162 job_id: job_id.clone(),
163 status,
164 level,
165 transaction_time,
166 started_at,
167 completed_at,
168 type_progress,
169 current_type,
170 error_message,
171 })
172 }
173
174 async fn cancel_export(
175 &self,
176 tenant: &TenantContext,
177 job_id: &ExportJobId,
178 ) -> StorageResult<()> {
179 let client = self.get_client().await?;
180 let tenant_id = tenant.tenant_id().as_str();
181
182 let rows = client
183 .query(
184 "SELECT status FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
185 &[&job_id.as_str(), &tenant_id],
186 )
187 .await
188 .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
189
190 if rows.is_empty() {
191 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
192 job_id: job_id.to_string(),
193 }));
194 }
195
196 let current_status: String = rows[0].get(0);
197 let status: ExportStatus = current_status.parse().map_err(|_| {
198 internal_error(format!("Invalid status in database: {}", current_status))
199 })?;
200
201 if status.is_terminal() {
202 return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
203 job_id: job_id.to_string(),
204 expected: "accepted or in-progress".to_string(),
205 actual: current_status,
206 }));
207 }
208
209 let now = Utc::now();
210 client
211 .execute(
212 "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = $1 WHERE id = $2",
213 &[&now, &job_id.as_str()],
214 )
215 .await
216 .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
217
218 Ok(())
219 }
220
221 async fn delete_export(
222 &self,
223 tenant: &TenantContext,
224 job_id: &ExportJobId,
225 ) -> StorageResult<()> {
226 let client = self.get_client().await?;
227 let tenant_id = tenant.tenant_id().as_str();
228
229 let result = client
230 .execute(
231 "DELETE FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
232 &[&job_id.as_str(), &tenant_id],
233 )
234 .await
235 .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
236
237 if result == 0 {
238 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
239 job_id: job_id.to_string(),
240 }));
241 }
242
243 Ok(())
244 }
245
246 async fn get_export_manifest(
247 &self,
248 tenant: &TenantContext,
249 job_id: &ExportJobId,
250 ) -> StorageResult<ExportManifest> {
251 let progress = self.get_export_status(tenant, job_id).await?;
252
253 if progress.status != ExportStatus::Complete {
254 return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
255 job_id: job_id.to_string(),
256 expected: "complete".to_string(),
257 actual: progress.status.to_string(),
258 }));
259 }
260
261 let client = self.get_client().await?;
262
263 let rows = client
264 .query(
265 "SELECT resource_type, file_path, resource_count, file_type
266 FROM bulk_export_files
267 WHERE job_id = $1
268 ORDER BY resource_type",
269 &[&job_id.as_str()],
270 )
271 .await
272 .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?;
273
274 let mut output_files = Vec::new();
275 let mut error_files = Vec::new();
276
277 for row in &rows {
278 let resource_type: String = row.get(0);
279 let file_path: String = row.get(1);
280 let count: Option<i64> = row.get(2);
281 let file_type: String = row.get(3);
282
283 let file = ExportOutputFile {
284 resource_type,
285 url: file_path,
286 count: count.map(|c| c as u64),
287 };
288
289 if file_type == "error" {
290 error_files.push(file);
291 } else {
292 output_files.push(file);
293 }
294 }
295
296 Ok(ExportManifest {
297 transaction_time: progress.transaction_time,
298 request: format!("$export?job={}", job_id),
299 requires_access_token: true,
300 output: output_files,
301 error: error_files,
302 message: None,
303 extension: None,
304 })
305 }
306
307 async fn list_exports(
308 &self,
309 tenant: &TenantContext,
310 include_completed: bool,
311 ) -> StorageResult<Vec<ExportProgress>> {
312 let client = self.get_client().await?;
313 let tenant_id = tenant.tenant_id().as_str();
314
315 let query = if include_completed {
316 "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 ORDER BY created_at DESC"
317 } else {
318 "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
319 };
320
321 let rows = client
322 .query(query, &[&tenant_id])
323 .await
324 .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?;
325
326 let mut results = Vec::new();
327 for row in &rows {
328 let id: String = row.get(0);
329 let job_id = ExportJobId::from_string(id);
330 if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
331 results.push(progress);
332 }
333 }
334
335 Ok(results)
336 }
337}
338
339#[async_trait]
340impl ExportDataProvider for PostgresBackend {
341 async fn list_export_types(
342 &self,
343 tenant: &TenantContext,
344 request: &ExportRequest,
345 ) -> StorageResult<Vec<String>> {
346 let client = self.get_client().await?;
347 let tenant_id = tenant.tenant_id().as_str();
348
349 if !request.resource_types.is_empty() {
350 let mut valid_types = Vec::new();
351 for rt in &request.resource_types {
352 let row = client
353 .query_one(
354 "SELECT EXISTS(SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE LIMIT 1)",
355 &[&tenant_id, &rt.as_str()],
356 )
357 .await
358 .map_err(|e| internal_error(format!("Failed to check type: {}", e)))?;
359
360 let exists: bool = row.get(0);
361 if exists {
362 valid_types.push(rt.clone());
363 }
364 }
365 return Ok(valid_types);
366 }
367
368 let rows = client
369 .query(
370 "SELECT DISTINCT resource_type FROM resources
371 WHERE tenant_id = $1 AND is_deleted = FALSE
372 ORDER BY resource_type",
373 &[&tenant_id],
374 )
375 .await
376 .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?;
377
378 let types: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
379 Ok(types)
380 }
381
382 async fn count_export_resources(
383 &self,
384 tenant: &TenantContext,
385 request: &ExportRequest,
386 resource_type: &str,
387 ) -> StorageResult<u64> {
388 let client = self.get_client().await?;
389 let tenant_id = tenant.tenant_id().as_str();
390
391 let (sql, params): (
392 String,
393 Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
394 ) = if let Some(since) = request.since {
395 (
396 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND last_updated >= $3".to_string(),
397 vec![
398 Box::new(tenant_id.to_string()),
399 Box::new(resource_type.to_string()),
400 Box::new(since),
401 ],
402 )
403 } else {
404 (
405 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string(),
406 vec![
407 Box::new(tenant_id.to_string()),
408 Box::new(resource_type.to_string()),
409 ],
410 )
411 };
412
413 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
414 .iter()
415 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
416 .collect();
417
418 let row = client
419 .query_one(&sql, ¶m_refs)
420 .await
421 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
422
423 let count: i64 = row.get(0);
424 Ok(count as u64)
425 }
426
427 async fn fetch_export_batch(
428 &self,
429 tenant: &TenantContext,
430 request: &ExportRequest,
431 resource_type: &str,
432 cursor: Option<&str>,
433 batch_size: u32,
434 ) -> StorageResult<NdjsonBatch> {
435 let client = self.get_client().await?;
436 let tenant_id = tenant.tenant_id().as_str();
437
438 let mut sql = "SELECT id, data, last_updated FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
439 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
440 Box::new(tenant_id.to_string()),
441 Box::new(resource_type.to_string()),
442 ];
443 let mut param_idx = 3;
444
445 if let Some(since) = request.since {
446 sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
447 params.push(Box::new(since));
448 param_idx += 1;
449 }
450
451 if let Some(cursor) = cursor {
452 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
453 if parts.len() == 2 {
454 sql.push_str(&format!(
455 " AND (last_updated, id) > (${}, ${})",
456 param_idx,
457 param_idx + 1
458 ));
459 params.push(Box::new(parts[0].to_string()));
460 params.push(Box::new(parts[1].to_string()));
461 }
462 }
463
464 sql.push_str(&format!(
465 " ORDER BY last_updated, id LIMIT {}",
466 batch_size + 1
467 ));
468
469 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
470 .iter()
471 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
472 .collect();
473
474 let rows = client
475 .query(&sql, ¶m_refs)
476 .await
477 .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?;
478
479 let has_more = rows.len() > batch_size as usize;
480 let rows_to_process = if has_more {
481 &rows[..batch_size as usize]
482 } else {
483 &rows[..]
484 };
485
486 let mut lines = Vec::new();
487 let mut last_cursor = None;
488
489 for row in rows_to_process {
490 let id: String = row.get(0);
491 let resource: Value = row.get(1);
492 let last_updated: chrono::DateTime<Utc> = row.get(2);
493
494 let line = serde_json::to_string(&resource)
495 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
496 lines.push(line);
497 last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
498 }
499
500 Ok(NdjsonBatch {
501 lines,
502 next_cursor: if has_more { last_cursor } else { None },
503 is_last: !has_more,
504 })
505 }
506}
507
508#[async_trait]
509impl PatientExportProvider for PostgresBackend {
510 async fn list_patient_ids(
511 &self,
512 tenant: &TenantContext,
513 request: &ExportRequest,
514 cursor: Option<&str>,
515 batch_size: u32,
516 ) -> StorageResult<(Vec<String>, Option<String>)> {
517 let client = self.get_client().await?;
518 let tenant_id = tenant.tenant_id().as_str();
519
520 let mut sql = "SELECT id FROM resources WHERE tenant_id = $1 AND resource_type = 'Patient' AND is_deleted = FALSE".to_string();
521 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
522 vec![Box::new(tenant_id.to_string())];
523 let mut param_idx = 2;
524
525 if let Some(since) = request.since {
526 sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
527 params.push(Box::new(since));
528 param_idx += 1;
529 }
530
531 if let Some(cursor) = cursor {
532 sql.push_str(&format!(" AND id > ${}", param_idx));
533 params.push(Box::new(cursor.to_string()));
534 }
535
536 sql.push_str(&format!(" ORDER BY id LIMIT {}", batch_size + 1));
537
538 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
539 .iter()
540 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
541 .collect();
542
543 let rows = client
544 .query(&sql, ¶m_refs)
545 .await
546 .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?;
547
548 let mut ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
549
550 let has_more = ids.len() > batch_size as usize;
551 if has_more {
552 ids.truncate(batch_size as usize);
553 }
554
555 let next_cursor = if has_more { ids.last().cloned() } else { None };
556
557 Ok((ids, next_cursor))
558 }
559
560 async fn fetch_patient_compartment_batch(
561 &self,
562 tenant: &TenantContext,
563 request: &ExportRequest,
564 resource_type: &str,
565 patient_ids: &[String],
566 cursor: Option<&str>,
567 batch_size: u32,
568 ) -> StorageResult<NdjsonBatch> {
569 if patient_ids.is_empty() {
570 return Ok(NdjsonBatch::empty());
571 }
572
573 let client = self.get_client().await?;
574 let tenant_id = tenant.tenant_id().as_str();
575
576 if resource_type == "Patient" {
577 let mut sql = "SELECT id, data, last_updated FROM resources
579 WHERE tenant_id = $1 AND resource_type = $2 AND id = ANY($3::text[]) AND is_deleted = FALSE".to_string();
580
581 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
582 Box::new(tenant_id.to_string()),
583 Box::new(resource_type.to_string()),
584 Box::new(patient_ids.to_vec()),
585 ];
586 let param_idx = 4;
587
588 if let Some(cursor) = cursor {
589 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
590 if parts.len() == 2 {
591 sql.push_str(&format!(
592 " AND (last_updated, id) > (${}, ${})",
593 param_idx,
594 param_idx + 1
595 ));
596 params.push(Box::new(parts[0].to_string()));
597 params.push(Box::new(parts[1].to_string()));
598 }
599 }
600
601 sql.push_str(&format!(
602 " ORDER BY last_updated, id LIMIT {}",
603 batch_size + 1
604 ));
605
606 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
607 .iter()
608 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
609 .collect();
610
611 let rows = client
612 .query(&sql, ¶m_refs)
613 .await
614 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
615
616 let has_more = rows.len() > batch_size as usize;
617 let rows_slice = if has_more {
618 &rows[..batch_size as usize]
619 } else {
620 &rows[..]
621 };
622
623 let mut lines = Vec::new();
624 let mut last_cursor = None;
625
626 for row in rows_slice {
627 let id: String = row.get(0);
628 let resource: Value = row.get(1);
629 let last_updated: chrono::DateTime<Utc> = row.get(2);
630
631 let line = serde_json::to_string(&resource)
632 .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
633 lines.push(line);
634 last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
635 }
636
637 return Ok(NdjsonBatch {
638 lines,
639 next_cursor: if has_more { last_cursor } else { None },
640 is_last: !has_more,
641 });
642 }
643
644 let patient_refs: Vec<String> = patient_ids
646 .iter()
647 .map(|id| format!("Patient/{}", id))
648 .collect();
649
650 let mut sql = "SELECT DISTINCT r.id, r.data, r.last_updated
651 FROM resources r
652 INNER JOIN search_index si ON r.tenant_id = si.tenant_id
653 AND r.resource_type = si.resource_type
654 AND r.id = si.resource_id
655 WHERE r.tenant_id = $1
656 AND r.resource_type = $2
657 AND r.is_deleted = FALSE
658 AND si.param_name IN ('subject', 'patient')
659 AND si.value_reference = ANY($3::text[])"
660 .to_string();
661
662 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
663 Box::new(tenant_id.to_string()),
664 Box::new(resource_type.to_string()),
665 Box::new(patient_refs),
666 ];
667 let mut param_idx = 4;
668
669 if let Some(since) = request.since {
670 sql.push_str(&format!(" AND r.last_updated >= ${}", param_idx));
671 params.push(Box::new(since));
672 param_idx += 1;
673 }
674
675 if let Some(cursor) = cursor {
676 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
677 if parts.len() == 2 {
678 sql.push_str(&format!(
679 " AND (r.last_updated, r.id) > (${}, ${})",
680 param_idx,
681 param_idx + 1
682 ));
683 params.push(Box::new(parts[0].to_string()));
684 params.push(Box::new(parts[1].to_string()));
685 }
686 }
687
688 sql.push_str(&format!(
689 " ORDER BY r.last_updated, r.id LIMIT {}",
690 batch_size + 1
691 ));
692
693 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
694 .iter()
695 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
696 .collect();
697
698 let rows = client
699 .query(&sql, ¶m_refs)
700 .await
701 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
702
703 let has_more = rows.len() > batch_size as usize;
704 let rows_slice = if has_more {
705 &rows[..batch_size as usize]
706 } else {
707 &rows[..]
708 };
709
710 let mut lines = Vec::new();
711 let mut last_cursor = None;
712
713 for row in rows_slice {
714 let id: String = row.get(0);
715 let resource: Value = row.get(1);
716 let last_updated: chrono::DateTime<Utc> = row.get(2);
717
718 let line = serde_json::to_string(&resource)
719 .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
720 lines.push(line);
721 last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
722 }
723
724 Ok(NdjsonBatch {
725 lines,
726 next_cursor: if has_more { last_cursor } else { None },
727 is_last: !has_more,
728 })
729 }
730}
731
732#[async_trait]
733impl GroupExportProvider for PostgresBackend {
734 async fn get_group_members(
735 &self,
736 tenant: &TenantContext,
737 group_id: &str,
738 ) -> StorageResult<Vec<String>> {
739 let client = self.get_client().await?;
740 let tenant_id = tenant.tenant_id().as_str();
741
742 let rows = client
743 .query(
744 "SELECT data FROM resources WHERE tenant_id = $1 AND resource_type = 'Group' AND id = $2 AND is_deleted = FALSE",
745 &[&tenant_id, &group_id],
746 )
747 .await
748 .map_err(|e| internal_error(format!("Failed to fetch group: {}", e)))?;
749
750 if rows.is_empty() {
751 return Ok(Vec::new());
752 }
753
754 let data: Value = rows[0].get(0);
755
756 let mut member_refs = Vec::new();
758 if let Some(members) = data.get("member").and_then(|m| m.as_array()) {
759 for member in members {
760 if let Some(reference) = member
761 .get("entity")
762 .and_then(|e| e.get("reference"))
763 .and_then(|r| r.as_str())
764 {
765 member_refs.push(reference.to_string());
766 }
767 }
768 }
769
770 Ok(member_refs)
771 }
772
773 async fn resolve_group_patient_ids(
774 &self,
775 tenant: &TenantContext,
776 group_id: &str,
777 ) -> StorageResult<Vec<String>> {
778 let members = self.get_group_members(tenant, group_id).await?;
779
780 let mut patient_ids = Vec::new();
781 for member_ref in &members {
782 if let Some(id) = member_ref.strip_prefix("Patient/") {
784 patient_ids.push(id.to_string());
785 }
786 }
787
788 Ok(patient_ids)
789 }
790}