Skip to main content

helios_persistence/backends/postgres/
bulk_export.rs

1//! Bulk export implementation for PostgreSQL backend.
2
3use async_trait::async_trait;
4use chrono::Utc;
5use serde_json::Value;
6
7use crate::core::bulk_export::{
8    BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportManifest,
9    ExportOutputFile, ExportProgress, ExportRequest, ExportStatus, GroupExportProvider,
10    NdjsonBatch, PatientExportProvider, TypeExportProgress,
11};
12use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
13use crate::tenant::TenantContext;
14
15use super::PostgresBackend;
16
17fn internal_error(message: String) -> StorageError {
18    StorageError::Backend(BackendError::Internal {
19        backend_name: "postgres".to_string(),
20        message,
21        source: None,
22    })
23}
24
25#[async_trait]
26impl BulkExportStorage for PostgresBackend {
27    async fn start_export(
28        &self,
29        tenant: &TenantContext,
30        request: ExportRequest,
31    ) -> StorageResult<ExportJobId> {
32        let client = self.get_client().await?;
33        let tenant_id = tenant.tenant_id().as_str();
34
35        // Check for too many concurrent exports (limit to 5 active exports per tenant)
36        let row = client
37            .query_one(
38                "SELECT COUNT(*) FROM bulk_export_jobs
39                 WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress')",
40                &[&tenant_id],
41            )
42            .await
43            .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
44
45        let active_count: i64 = row.get(0);
46        if active_count >= 5 {
47            return Err(StorageError::BulkExport(
48                BulkExportError::TooManyConcurrentExports { max_concurrent: 5 },
49            ));
50        }
51
52        let job_id = ExportJobId::new();
53        let now = Utc::now();
54
55        let level_str = match &request.level {
56            ExportLevel::System => "system".to_string(),
57            ExportLevel::Patient => "patient".to_string(),
58            ExportLevel::Group { .. } => "group".to_string(),
59        };
60
61        let group_id = request.group_id().map(|s| s.to_string());
62
63        let request_json = serde_json::to_string(&request)
64            .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
65
66        client
67            .execute(
68                "INSERT INTO bulk_export_jobs
69                 (id, tenant_id, status, level, group_id, request_json, transaction_time, created_at)
70                 VALUES ($1, $2, 'accepted', $3, $4, $5, $6, $7)",
71                &[
72                    &job_id.as_str(),
73                    &tenant_id,
74                    &level_str.as_str(),
75                    &group_id,
76                    &request_json.as_str(),
77                    &now,
78                    &now,
79                ],
80            )
81            .await
82            .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
83
84        Ok(job_id)
85    }
86
87    async fn get_export_status(
88        &self,
89        tenant: &TenantContext,
90        job_id: &ExportJobId,
91    ) -> StorageResult<ExportProgress> {
92        let client = self.get_client().await?;
93        let tenant_id = tenant.tenant_id().as_str();
94
95        let rows = client
96            .query(
97                "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
98                 FROM bulk_export_jobs
99                 WHERE id = $1 AND tenant_id = $2",
100                &[&job_id.as_str(), &tenant_id],
101            )
102            .await
103            .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
104
105        if rows.is_empty() {
106            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
107                job_id: job_id.to_string(),
108            }));
109        }
110
111        let row = &rows[0];
112        let status_str: String = row.get(0);
113        let level_str: String = row.get(1);
114        let group_id: Option<String> = row.get(2);
115        let transaction_time: chrono::DateTime<Utc> = row.get(3);
116        let started_at: Option<chrono::DateTime<Utc>> = row.get(4);
117        let completed_at: Option<chrono::DateTime<Utc>> = row.get(5);
118        let error_message: Option<String> = row.get(6);
119        let current_type: Option<String> = row.get(7);
120
121        let status: ExportStatus = status_str
122            .parse()
123            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
124
125        let level = match level_str.as_str() {
126            "system" => ExportLevel::System,
127            "patient" => ExportLevel::Patient,
128            "group" => ExportLevel::Group {
129                group_id: group_id.unwrap_or_default(),
130            },
131            _ => {
132                return Err(internal_error(format!(
133                    "Invalid level in database: {}",
134                    level_str
135                )));
136            }
137        };
138
139        // Get per-type progress
140        let progress_rows = client
141            .query(
142                "SELECT resource_type, total_count, exported_count, error_count, cursor_state
143                 FROM bulk_export_progress
144                 WHERE job_id = $1",
145                &[&job_id.as_str()],
146            )
147            .await
148            .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?;
149
150        let type_progress: Vec<TypeExportProgress> = progress_rows
151            .iter()
152            .map(|r| TypeExportProgress {
153                resource_type: r.get(0),
154                total_count: r.get::<_, Option<i64>>(1).map(|v| v as u64),
155                exported_count: r.get::<_, i64>(2) as u64,
156                error_count: r.get::<_, i64>(3) as u64,
157                cursor_state: r.get(4),
158            })
159            .collect();
160
161        Ok(ExportProgress {
162            job_id: job_id.clone(),
163            status,
164            level,
165            transaction_time,
166            started_at,
167            completed_at,
168            type_progress,
169            current_type,
170            error_message,
171        })
172    }
173
174    async fn cancel_export(
175        &self,
176        tenant: &TenantContext,
177        job_id: &ExportJobId,
178    ) -> StorageResult<()> {
179        let client = self.get_client().await?;
180        let tenant_id = tenant.tenant_id().as_str();
181
182        let rows = client
183            .query(
184                "SELECT status FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
185                &[&job_id.as_str(), &tenant_id],
186            )
187            .await
188            .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
189
190        if rows.is_empty() {
191            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
192                job_id: job_id.to_string(),
193            }));
194        }
195
196        let current_status: String = rows[0].get(0);
197        let status: ExportStatus = current_status.parse().map_err(|_| {
198            internal_error(format!("Invalid status in database: {}", current_status))
199        })?;
200
201        if status.is_terminal() {
202            return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
203                job_id: job_id.to_string(),
204                expected: "accepted or in-progress".to_string(),
205                actual: current_status,
206            }));
207        }
208
209        let now = Utc::now();
210        client
211            .execute(
212                "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = $1 WHERE id = $2",
213                &[&now, &job_id.as_str()],
214            )
215            .await
216            .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
217
218        Ok(())
219    }
220
221    async fn delete_export(
222        &self,
223        tenant: &TenantContext,
224        job_id: &ExportJobId,
225    ) -> StorageResult<()> {
226        let client = self.get_client().await?;
227        let tenant_id = tenant.tenant_id().as_str();
228
229        let result = client
230            .execute(
231                "DELETE FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
232                &[&job_id.as_str(), &tenant_id],
233            )
234            .await
235            .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
236
237        if result == 0 {
238            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
239                job_id: job_id.to_string(),
240            }));
241        }
242
243        Ok(())
244    }
245
246    async fn get_export_manifest(
247        &self,
248        tenant: &TenantContext,
249        job_id: &ExportJobId,
250    ) -> StorageResult<ExportManifest> {
251        let progress = self.get_export_status(tenant, job_id).await?;
252
253        if progress.status != ExportStatus::Complete {
254            return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
255                job_id: job_id.to_string(),
256                expected: "complete".to_string(),
257                actual: progress.status.to_string(),
258            }));
259        }
260
261        let client = self.get_client().await?;
262
263        let rows = client
264            .query(
265                "SELECT resource_type, file_path, resource_count, file_type
266                 FROM bulk_export_files
267                 WHERE job_id = $1
268                 ORDER BY resource_type",
269                &[&job_id.as_str()],
270            )
271            .await
272            .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?;
273
274        let mut output_files = Vec::new();
275        let mut error_files = Vec::new();
276
277        for row in &rows {
278            let resource_type: String = row.get(0);
279            let file_path: String = row.get(1);
280            let count: Option<i64> = row.get(2);
281            let file_type: String = row.get(3);
282
283            let file = ExportOutputFile {
284                resource_type,
285                url: file_path,
286                count: count.map(|c| c as u64),
287            };
288
289            if file_type == "error" {
290                error_files.push(file);
291            } else {
292                output_files.push(file);
293            }
294        }
295
296        Ok(ExportManifest {
297            transaction_time: progress.transaction_time,
298            request: format!("$export?job={}", job_id),
299            requires_access_token: true,
300            output: output_files,
301            error: error_files,
302            message: None,
303            extension: None,
304        })
305    }
306
307    async fn list_exports(
308        &self,
309        tenant: &TenantContext,
310        include_completed: bool,
311    ) -> StorageResult<Vec<ExportProgress>> {
312        let client = self.get_client().await?;
313        let tenant_id = tenant.tenant_id().as_str();
314
315        let query = if include_completed {
316            "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 ORDER BY created_at DESC"
317        } else {
318            "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
319        };
320
321        let rows = client
322            .query(query, &[&tenant_id])
323            .await
324            .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?;
325
326        let mut results = Vec::new();
327        for row in &rows {
328            let id: String = row.get(0);
329            let job_id = ExportJobId::from_string(id);
330            if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
331                results.push(progress);
332            }
333        }
334
335        Ok(results)
336    }
337}
338
339#[async_trait]
340impl ExportDataProvider for PostgresBackend {
341    async fn list_export_types(
342        &self,
343        tenant: &TenantContext,
344        request: &ExportRequest,
345    ) -> StorageResult<Vec<String>> {
346        let client = self.get_client().await?;
347        let tenant_id = tenant.tenant_id().as_str();
348
349        if !request.resource_types.is_empty() {
350            let mut valid_types = Vec::new();
351            for rt in &request.resource_types {
352                let row = client
353                    .query_one(
354                        "SELECT EXISTS(SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE LIMIT 1)",
355                        &[&tenant_id, &rt.as_str()],
356                    )
357                    .await
358                    .map_err(|e| internal_error(format!("Failed to check type: {}", e)))?;
359
360                let exists: bool = row.get(0);
361                if exists {
362                    valid_types.push(rt.clone());
363                }
364            }
365            return Ok(valid_types);
366        }
367
368        let rows = client
369            .query(
370                "SELECT DISTINCT resource_type FROM resources
371                 WHERE tenant_id = $1 AND is_deleted = FALSE
372                 ORDER BY resource_type",
373                &[&tenant_id],
374            )
375            .await
376            .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?;
377
378        let types: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
379        Ok(types)
380    }
381
382    async fn count_export_resources(
383        &self,
384        tenant: &TenantContext,
385        request: &ExportRequest,
386        resource_type: &str,
387    ) -> StorageResult<u64> {
388        let client = self.get_client().await?;
389        let tenant_id = tenant.tenant_id().as_str();
390
391        let (sql, params): (
392            String,
393            Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
394        ) = if let Some(since) = request.since {
395            (
396                "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND last_updated >= $3".to_string(),
397                vec![
398                    Box::new(tenant_id.to_string()),
399                    Box::new(resource_type.to_string()),
400                    Box::new(since),
401                ],
402            )
403        } else {
404            (
405                "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string(),
406                vec![
407                    Box::new(tenant_id.to_string()),
408                    Box::new(resource_type.to_string()),
409                ],
410            )
411        };
412
413        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
414            .iter()
415            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
416            .collect();
417
418        let row = client
419            .query_one(&sql, &param_refs)
420            .await
421            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
422
423        let count: i64 = row.get(0);
424        Ok(count as u64)
425    }
426
427    async fn fetch_export_batch(
428        &self,
429        tenant: &TenantContext,
430        request: &ExportRequest,
431        resource_type: &str,
432        cursor: Option<&str>,
433        batch_size: u32,
434    ) -> StorageResult<NdjsonBatch> {
435        let client = self.get_client().await?;
436        let tenant_id = tenant.tenant_id().as_str();
437
438        let mut sql = "SELECT id, data, last_updated FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
439        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
440            Box::new(tenant_id.to_string()),
441            Box::new(resource_type.to_string()),
442        ];
443        let mut param_idx = 3;
444
445        if let Some(since) = request.since {
446            sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
447            params.push(Box::new(since));
448            param_idx += 1;
449        }
450
451        if let Some(cursor) = cursor {
452            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
453            if parts.len() == 2 {
454                sql.push_str(&format!(
455                    " AND (last_updated, id) > (${}, ${})",
456                    param_idx,
457                    param_idx + 1
458                ));
459                params.push(Box::new(parts[0].to_string()));
460                params.push(Box::new(parts[1].to_string()));
461            }
462        }
463
464        sql.push_str(&format!(
465            " ORDER BY last_updated, id LIMIT {}",
466            batch_size + 1
467        ));
468
469        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
470            .iter()
471            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
472            .collect();
473
474        let rows = client
475            .query(&sql, &param_refs)
476            .await
477            .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?;
478
479        let has_more = rows.len() > batch_size as usize;
480        let rows_to_process = if has_more {
481            &rows[..batch_size as usize]
482        } else {
483            &rows[..]
484        };
485
486        let mut lines = Vec::new();
487        let mut last_cursor = None;
488
489        for row in rows_to_process {
490            let id: String = row.get(0);
491            let resource: Value = row.get(1);
492            let last_updated: chrono::DateTime<Utc> = row.get(2);
493
494            let line = serde_json::to_string(&resource)
495                .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
496            lines.push(line);
497            last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
498        }
499
500        Ok(NdjsonBatch {
501            lines,
502            next_cursor: if has_more { last_cursor } else { None },
503            is_last: !has_more,
504        })
505    }
506}
507
508#[async_trait]
509impl PatientExportProvider for PostgresBackend {
510    async fn list_patient_ids(
511        &self,
512        tenant: &TenantContext,
513        request: &ExportRequest,
514        cursor: Option<&str>,
515        batch_size: u32,
516    ) -> StorageResult<(Vec<String>, Option<String>)> {
517        let client = self.get_client().await?;
518        let tenant_id = tenant.tenant_id().as_str();
519
520        let mut sql = "SELECT id FROM resources WHERE tenant_id = $1 AND resource_type = 'Patient' AND is_deleted = FALSE".to_string();
521        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
522            vec![Box::new(tenant_id.to_string())];
523        let mut param_idx = 2;
524
525        if let Some(since) = request.since {
526            sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
527            params.push(Box::new(since));
528            param_idx += 1;
529        }
530
531        if let Some(cursor) = cursor {
532            sql.push_str(&format!(" AND id > ${}", param_idx));
533            params.push(Box::new(cursor.to_string()));
534        }
535
536        sql.push_str(&format!(" ORDER BY id LIMIT {}", batch_size + 1));
537
538        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
539            .iter()
540            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
541            .collect();
542
543        let rows = client
544            .query(&sql, &param_refs)
545            .await
546            .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?;
547
548        let mut ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
549
550        let has_more = ids.len() > batch_size as usize;
551        if has_more {
552            ids.truncate(batch_size as usize);
553        }
554
555        let next_cursor = if has_more { ids.last().cloned() } else { None };
556
557        Ok((ids, next_cursor))
558    }
559
560    async fn fetch_patient_compartment_batch(
561        &self,
562        tenant: &TenantContext,
563        request: &ExportRequest,
564        resource_type: &str,
565        patient_ids: &[String],
566        cursor: Option<&str>,
567        batch_size: u32,
568    ) -> StorageResult<NdjsonBatch> {
569        if patient_ids.is_empty() {
570            return Ok(NdjsonBatch::empty());
571        }
572
573        let client = self.get_client().await?;
574        let tenant_id = tenant.tenant_id().as_str();
575
576        if resource_type == "Patient" {
577            // For Patient resources, just filter by the IDs using ANY($3::text[])
578            let mut sql = "SELECT id, data, last_updated FROM resources
579                 WHERE tenant_id = $1 AND resource_type = $2 AND id = ANY($3::text[]) AND is_deleted = FALSE".to_string();
580
581            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
582                Box::new(tenant_id.to_string()),
583                Box::new(resource_type.to_string()),
584                Box::new(patient_ids.to_vec()),
585            ];
586            let param_idx = 4;
587
588            if let Some(cursor) = cursor {
589                let parts: Vec<&str> = cursor.splitn(2, '|').collect();
590                if parts.len() == 2 {
591                    sql.push_str(&format!(
592                        " AND (last_updated, id) > (${}, ${})",
593                        param_idx,
594                        param_idx + 1
595                    ));
596                    params.push(Box::new(parts[0].to_string()));
597                    params.push(Box::new(parts[1].to_string()));
598                }
599            }
600
601            sql.push_str(&format!(
602                " ORDER BY last_updated, id LIMIT {}",
603                batch_size + 1
604            ));
605
606            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
607                .iter()
608                .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
609                .collect();
610
611            let rows = client
612                .query(&sql, &param_refs)
613                .await
614                .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
615
616            let has_more = rows.len() > batch_size as usize;
617            let rows_slice = if has_more {
618                &rows[..batch_size as usize]
619            } else {
620                &rows[..]
621            };
622
623            let mut lines = Vec::new();
624            let mut last_cursor = None;
625
626            for row in rows_slice {
627                let id: String = row.get(0);
628                let resource: Value = row.get(1);
629                let last_updated: chrono::DateTime<Utc> = row.get(2);
630
631                let line = serde_json::to_string(&resource)
632                    .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
633                lines.push(line);
634                last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
635            }
636
637            return Ok(NdjsonBatch {
638                lines,
639                next_cursor: if has_more { last_cursor } else { None },
640                is_last: !has_more,
641            });
642        }
643
644        // For other resources, use search index to find resources referencing these patients
645        let patient_refs: Vec<String> = patient_ids
646            .iter()
647            .map(|id| format!("Patient/{}", id))
648            .collect();
649
650        let mut sql = "SELECT DISTINCT r.id, r.data, r.last_updated
651             FROM resources r
652             INNER JOIN search_index si ON r.tenant_id = si.tenant_id
653                AND r.resource_type = si.resource_type
654                AND r.id = si.resource_id
655             WHERE r.tenant_id = $1
656                AND r.resource_type = $2
657                AND r.is_deleted = FALSE
658                AND si.param_name IN ('subject', 'patient')
659                AND si.value_reference = ANY($3::text[])"
660            .to_string();
661
662        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
663            Box::new(tenant_id.to_string()),
664            Box::new(resource_type.to_string()),
665            Box::new(patient_refs),
666        ];
667        let mut param_idx = 4;
668
669        if let Some(since) = request.since {
670            sql.push_str(&format!(" AND r.last_updated >= ${}", param_idx));
671            params.push(Box::new(since));
672            param_idx += 1;
673        }
674
675        if let Some(cursor) = cursor {
676            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
677            if parts.len() == 2 {
678                sql.push_str(&format!(
679                    " AND (r.last_updated, r.id) > (${}, ${})",
680                    param_idx,
681                    param_idx + 1
682                ));
683                params.push(Box::new(parts[0].to_string()));
684                params.push(Box::new(parts[1].to_string()));
685            }
686        }
687
688        sql.push_str(&format!(
689            " ORDER BY r.last_updated, r.id LIMIT {}",
690            batch_size + 1
691        ));
692
693        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
694            .iter()
695            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
696            .collect();
697
698        let rows = client
699            .query(&sql, &param_refs)
700            .await
701            .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
702
703        let has_more = rows.len() > batch_size as usize;
704        let rows_slice = if has_more {
705            &rows[..batch_size as usize]
706        } else {
707            &rows[..]
708        };
709
710        let mut lines = Vec::new();
711        let mut last_cursor = None;
712
713        for row in rows_slice {
714            let id: String = row.get(0);
715            let resource: Value = row.get(1);
716            let last_updated: chrono::DateTime<Utc> = row.get(2);
717
718            let line = serde_json::to_string(&resource)
719                .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
720            lines.push(line);
721            last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
722        }
723
724        Ok(NdjsonBatch {
725            lines,
726            next_cursor: if has_more { last_cursor } else { None },
727            is_last: !has_more,
728        })
729    }
730}
731
732#[async_trait]
733impl GroupExportProvider for PostgresBackend {
734    async fn get_group_members(
735        &self,
736        tenant: &TenantContext,
737        group_id: &str,
738    ) -> StorageResult<Vec<String>> {
739        let client = self.get_client().await?;
740        let tenant_id = tenant.tenant_id().as_str();
741
742        let rows = client
743            .query(
744                "SELECT data FROM resources WHERE tenant_id = $1 AND resource_type = 'Group' AND id = $2 AND is_deleted = FALSE",
745                &[&tenant_id, &group_id],
746            )
747            .await
748            .map_err(|e| internal_error(format!("Failed to fetch group: {}", e)))?;
749
750        if rows.is_empty() {
751            return Ok(Vec::new());
752        }
753
754        let data: Value = rows[0].get(0);
755
756        // Extract member references from the Group resource
757        let mut member_refs = Vec::new();
758        if let Some(members) = data.get("member").and_then(|m| m.as_array()) {
759            for member in members {
760                if let Some(reference) = member
761                    .get("entity")
762                    .and_then(|e| e.get("reference"))
763                    .and_then(|r| r.as_str())
764                {
765                    member_refs.push(reference.to_string());
766                }
767            }
768        }
769
770        Ok(member_refs)
771    }
772
773    async fn resolve_group_patient_ids(
774        &self,
775        tenant: &TenantContext,
776        group_id: &str,
777    ) -> StorageResult<Vec<String>> {
778        let members = self.get_group_members(tenant, group_id).await?;
779
780        let mut patient_ids = Vec::new();
781        for member_ref in &members {
782            // Extract patient ID from "Patient/123" format
783            if let Some(id) = member_ref.strip_prefix("Patient/") {
784                patient_ids.push(id.to_string());
785            }
786        }
787
788        Ok(patient_ids)
789    }
790}