Skip to main content

helios_persistence/backends/postgres/
bulk_export.rs

1//! Bulk export implementation for PostgreSQL backend.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde_json::Value;
6use std::time::Duration as StdDuration;
7
8use crate::core::bulk_export::{
9    BulkExportStorage, ExpiredExportRef, ExportDataProvider, ExportFileMetadata, ExportJobId,
10    ExportJobMetadata, ExportLevel, ExportProgress, ExportRequest, ExportStatus,
11    GroupExportProvider, NdjsonBatch, PatientExportProvider, RawExportManifest, RawManifestEntry,
12    StartExportInput, TypeExportProgress,
13};
14use crate::core::bulk_export_output::{ExportPartKey, FinalizedPart};
15use crate::core::bulk_export_worker::{
16    ExportClaimStrategy, ExportJobLease, ExportWorkerStorage, LeaseError, WorkerId, WorkerJobView,
17};
18use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
19use crate::tenant::{TenantContext, TenantId, TenantPermissions};
20
21use super::PostgresBackend;
22
23fn internal_error(message: String) -> StorageError {
24    StorageError::Backend(BackendError::Internal {
25        backend_name: "postgres".to_string(),
26        message,
27        source: None,
28    })
29}
30
31/// Splits a `{resource_type}-{part_index}` download segment.
32fn parse_part_segment(part: &str) -> Option<(String, u32)> {
33    let idx = part.rfind('-')?;
34    let resource_type = &part[..idx];
35    let part_index: u32 = part[idx + 1..].parse().ok()?;
36    if resource_type.is_empty() {
37        return None;
38    }
39    Some((resource_type.to_string(), part_index))
40}
41
42/// Encodes an [`ExportPartKey`] into the `file_path` column.
43fn encode_part_path(key: &ExportPartKey) -> String {
44    format!(
45        "{}/{}/{}/{}-{}-{}",
46        key.tenant_id,
47        key.job_id,
48        key.file_type,
49        key.resource_type,
50        key.part_index,
51        key.fencing_token
52    )
53}
54
55#[async_trait]
56impl BulkExportStorage for PostgresBackend {
57    async fn start_export(
58        &self,
59        tenant: &TenantContext,
60        input: StartExportInput,
61    ) -> StorageResult<ExportJobId> {
62        let client = self.get_client().await?;
63        let tenant_id = tenant.tenant_id().as_str();
64
65        let job_id = ExportJobId::new();
66        let now = Utc::now();
67
68        let level_str = match &input.request.level {
69            ExportLevel::System => "system".to_string(),
70            ExportLevel::Patient => "patient".to_string(),
71            ExportLevel::Group { .. } => "group".to_string(),
72        };
73
74        let group_id = input.request.group_id().map(|s| s.to_string());
75
76        let request_json = serde_json::to_string(&input.request)
77            .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
78        let fhir_version = input.fhir_version.as_mime_param();
79
80        client
81            .execute(
82                "INSERT INTO bulk_export_jobs
83                 (id, tenant_id, status, level, group_id, request_json, transaction_time,
84                  created_at, owner_subject, request_url, fhir_version, fencing_token)
85                 VALUES ($1, $2, 'accepted', $3, $4, $5, $6, $7, $8, $9, $10, 0)",
86                &[
87                    &job_id.as_str(),
88                    &tenant_id,
89                    &level_str.as_str(),
90                    &group_id,
91                    &request_json.as_str(),
92                    &input.transaction_time,
93                    &now,
94                    &input.owner_subject,
95                    &input.request_url.as_str(),
96                    &fhir_version,
97                ],
98            )
99            .await
100            .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
101
102        Ok(job_id)
103    }
104
105    async fn get_export_status(
106        &self,
107        tenant: &TenantContext,
108        job_id: &ExportJobId,
109    ) -> StorageResult<ExportProgress> {
110        let client = self.get_client().await?;
111        let tenant_id = tenant.tenant_id().as_str();
112
113        let rows = client
114            .query(
115                "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
116                 FROM bulk_export_jobs
117                 WHERE id = $1 AND tenant_id = $2",
118                &[&job_id.as_str(), &tenant_id],
119            )
120            .await
121            .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
122
123        if rows.is_empty() {
124            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
125                job_id: job_id.to_string(),
126            }));
127        }
128
129        let row = &rows[0];
130        let status_str: String = row.get(0);
131        let level_str: String = row.get(1);
132        let group_id: Option<String> = row.get(2);
133        let transaction_time: chrono::DateTime<Utc> = row.get(3);
134        let started_at: Option<chrono::DateTime<Utc>> = row.get(4);
135        let completed_at: Option<chrono::DateTime<Utc>> = row.get(5);
136        let error_message: Option<String> = row.get(6);
137        let current_type: Option<String> = row.get(7);
138
139        let status: ExportStatus = status_str
140            .parse()
141            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
142
143        let level = match level_str.as_str() {
144            "system" => ExportLevel::System,
145            "patient" => ExportLevel::Patient,
146            "group" => ExportLevel::Group {
147                group_id: group_id.unwrap_or_default(),
148            },
149            _ => {
150                return Err(internal_error(format!(
151                    "Invalid level in database: {}",
152                    level_str
153                )));
154            }
155        };
156
157        // Get per-type progress
158        let progress_rows = client
159            .query(
160                "SELECT resource_type, total_count, exported_count, error_count, cursor_state
161                 FROM bulk_export_progress
162                 WHERE job_id = $1",
163                &[&job_id.as_str()],
164            )
165            .await
166            .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?;
167
168        let type_progress: Vec<TypeExportProgress> = progress_rows
169            .iter()
170            .map(|r| TypeExportProgress {
171                resource_type: r.get(0),
172                total_count: r.get::<_, Option<i32>>(1).map(|v| v as u64),
173                exported_count: r.get::<_, i32>(2) as u64,
174                error_count: r.get::<_, i32>(3) as u64,
175                cursor_state: r.get(4),
176            })
177            .collect();
178
179        Ok(ExportProgress {
180            job_id: job_id.clone(),
181            status,
182            level,
183            transaction_time,
184            started_at,
185            completed_at,
186            type_progress,
187            current_type,
188            error_message,
189        })
190    }
191
192    async fn cancel_export(
193        &self,
194        tenant: &TenantContext,
195        job_id: &ExportJobId,
196    ) -> StorageResult<()> {
197        let client = self.get_client().await?;
198        let tenant_id = tenant.tenant_id().as_str();
199
200        let rows = client
201            .query(
202                "SELECT status FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
203                &[&job_id.as_str(), &tenant_id],
204            )
205            .await
206            .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
207
208        if rows.is_empty() {
209            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
210                job_id: job_id.to_string(),
211            }));
212        }
213
214        let current_status: String = rows[0].get(0);
215        let status: ExportStatus = current_status.parse().map_err(|_| {
216            internal_error(format!("Invalid status in database: {}", current_status))
217        })?;
218
219        if status.is_terminal() {
220            return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
221                job_id: job_id.to_string(),
222                expected: "accepted or in-progress".to_string(),
223                actual: current_status,
224            }));
225        }
226
227        let now = Utc::now();
228        client
229            .execute(
230                "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = $1 WHERE id = $2",
231                &[&now, &job_id.as_str()],
232            )
233            .await
234            .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
235
236        Ok(())
237    }
238
239    async fn delete_export(
240        &self,
241        tenant: &TenantContext,
242        job_id: &ExportJobId,
243    ) -> StorageResult<()> {
244        let client = self.get_client().await?;
245        let tenant_id = tenant.tenant_id().as_str();
246
247        let result = client
248            .execute(
249                "DELETE FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
250                &[&job_id.as_str(), &tenant_id],
251            )
252            .await
253            .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
254
255        if result == 0 {
256            return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
257                job_id: job_id.to_string(),
258            }));
259        }
260
261        Ok(())
262    }
263
264    async fn get_export_manifest(
265        &self,
266        tenant: &TenantContext,
267        job_id: &ExportJobId,
268    ) -> StorageResult<RawExportManifest> {
269        let client = self.get_client().await?;
270        let tenant_id = tenant.tenant_id().as_str();
271
272        let job_rows = client
273            .query(
274                "SELECT status, transaction_time, request_url, error_message, completed_at
275                 FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
276                &[&job_id.as_str(), &tenant_id],
277            )
278            .await
279            .map_err(|e| internal_error(format!("Failed to get export job: {}", e)))?;
280        let job_row = job_rows.first().ok_or_else(|| {
281            StorageError::BulkExport(BulkExportError::JobNotFound {
282                job_id: job_id.to_string(),
283            })
284        })?;
285        let status_str: String = job_row.get(0);
286        let transaction_time: DateTime<Utc> = job_row.get(1);
287        let request_url: String = job_row.get(2);
288        let error_message: Option<String> = job_row.get(3);
289        let completed_at: Option<DateTime<Utc>> = job_row.get(4);
290        let status: ExportStatus = status_str
291            .parse()
292            .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
293
294        let rows = client
295            .query(
296                "SELECT resource_type, resource_count, file_type, part_index, fencing_token
297                 FROM bulk_export_files
298                 WHERE job_id = $1
299                 ORDER BY file_type, resource_type, part_index",
300                &[&job_id.as_str()],
301            )
302            .await
303            .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?;
304
305        let mut output = Vec::new();
306        let mut errors = Vec::new();
307        for row in &rows {
308            let resource_type: String = row.get(0);
309            let count: Option<i32> = row.get(1);
310            let file_type: String = row.get(2);
311            let part_index: i32 = row.get(3);
312            let fencing_token: i64 = row.get(4);
313            let key = ExportPartKey {
314                tenant_id: tenant_id.to_string(),
315                job_id: job_id.clone(),
316                resource_type: resource_type.clone(),
317                file_type: file_type.clone(),
318                part_index: part_index as u32,
319                fencing_token: fencing_token as u64,
320            };
321            let entry = RawManifestEntry {
322                resource_type,
323                key,
324                count: count.unwrap_or(0) as u64,
325            };
326            if file_type == "error" {
327                errors.push(entry);
328            } else {
329                output.push(entry);
330            }
331        }
332
333        Ok(RawExportManifest {
334            transaction_time,
335            request_url,
336            status,
337            error_message,
338            completed_at,
339            output,
340            errors,
341        })
342    }
343
344    async fn get_export_job_metadata(
345        &self,
346        tenant: &TenantContext,
347        job_id: &ExportJobId,
348    ) -> StorageResult<ExportJobMetadata> {
349        let client = self.get_client().await?;
350        let tenant_id = tenant.tenant_id().as_str();
351        let rows = client
352            .query(
353                "SELECT status, level, group_id, owner_subject, transaction_time,
354                        completed_at, request_url
355                 FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
356                &[&job_id.as_str(), &tenant_id],
357            )
358            .await
359            .map_err(|e| internal_error(format!("Failed to get export job metadata: {}", e)))?;
360        let row = rows.first().ok_or_else(|| {
361            StorageError::BulkExport(BulkExportError::JobNotFound {
362                job_id: job_id.to_string(),
363            })
364        })?;
365        let status_str: String = row.get(0);
366        let level_str: String = row.get(1);
367        let group_id: Option<String> = row.get(2);
368        let owner_subject: Option<String> = row.get(3);
369        let transaction_time: DateTime<Utc> = row.get(4);
370        let completed_at: Option<DateTime<Utc>> = row.get(5);
371        let request_url: String = row.get(6);
372        let status: ExportStatus = status_str
373            .parse()
374            .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
375        let level = match level_str.as_str() {
376            "system" => ExportLevel::System,
377            "patient" => ExportLevel::Patient,
378            "group" => ExportLevel::Group {
379                group_id: group_id.unwrap_or_default(),
380            },
381            _ => return Err(internal_error(format!("Invalid level: {}", level_str))),
382        };
383        Ok(ExportJobMetadata {
384            job_id: job_id.clone(),
385            status,
386            level,
387            owner_subject,
388            transaction_time,
389            completed_at,
390            request_url,
391        })
392    }
393
394    async fn get_export_file_metadata(
395        &self,
396        tenant: &TenantContext,
397        job_id: &ExportJobId,
398        part: &str,
399    ) -> StorageResult<ExportFileMetadata> {
400        let (resource_type, part_index) = parse_part_segment(part).ok_or_else(|| {
401            StorageError::BulkExport(BulkExportError::JobNotFound {
402                job_id: format!("{job_id}/{part}"),
403            })
404        })?;
405        let client = self.get_client().await?;
406        let tenant_id = tenant.tenant_id().as_str();
407        let rows = client
408            .query(
409                "SELECT f.file_type, f.resource_count, f.fencing_token, j.owner_subject
410                 FROM bulk_export_files f
411                 JOIN bulk_export_jobs j ON j.id = f.job_id
412                 WHERE f.job_id = $1 AND j.tenant_id = $2
413                   AND f.resource_type = $3 AND f.part_index = $4",
414                &[
415                    &job_id.as_str(),
416                    &tenant_id,
417                    &resource_type.as_str(),
418                    &(part_index as i32),
419                ],
420            )
421            .await
422            .map_err(|e| internal_error(format!("Failed to get file metadata: {}", e)))?;
423        let row = rows.first().ok_or_else(|| {
424            StorageError::BulkExport(BulkExportError::JobNotFound {
425                job_id: format!("{job_id}/{part}"),
426            })
427        })?;
428        let file_type: String = row.get(0);
429        let resource_count: Option<i32> = row.get(1);
430        let fencing_token: i64 = row.get(2);
431        let owner_subject: Option<String> = row.get(3);
432        let key = ExportPartKey {
433            tenant_id: tenant_id.to_string(),
434            job_id: job_id.clone(),
435            resource_type: resource_type.clone(),
436            file_type: file_type.clone(),
437            part_index,
438            fencing_token: fencing_token as u64,
439        };
440        Ok(ExportFileMetadata {
441            key,
442            resource_type,
443            file_type,
444            line_count: resource_count.unwrap_or(0) as u64,
445            job_owner_subject: owner_subject,
446        })
447    }
448
449    async fn count_active_exports(&self, tenant: &TenantContext) -> StorageResult<u64> {
450        let client = self.get_client().await?;
451        let tenant_id = tenant.tenant_id().as_str();
452        let row = client
453            .query_one(
454                "SELECT COUNT(*) FROM bulk_export_jobs
455                 WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress')",
456                &[&tenant_id],
457            )
458            .await
459            .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
460        let count: i64 = row.get(0);
461        Ok(count as u64)
462    }
463
464    async fn list_expired_exports(
465        &self,
466        now: DateTime<Utc>,
467        output_ttl: StdDuration,
468        limit: u32,
469    ) -> StorageResult<Vec<ExpiredExportRef>> {
470        let client = self.get_client().await?;
471        let cutoff = now
472            - chrono::Duration::from_std(output_ttl)
473                .unwrap_or_else(|_| chrono::Duration::seconds(0));
474        let rows = client
475            .query(
476                "SELECT tenant_id, id FROM bulk_export_jobs
477                 WHERE status IN ('complete', 'error', 'cancelled')
478                   AND completed_at IS NOT NULL AND completed_at < $1
479                 ORDER BY completed_at LIMIT $2",
480                &[&cutoff, &(limit as i64)],
481            )
482            .await
483            .map_err(|e| internal_error(format!("Failed to query expired exports: {}", e)))?;
484        Ok(rows
485            .iter()
486            .map(|row| {
487                let tenant_id: String = row.get(0);
488                let id: String = row.get(1);
489                ExpiredExportRef {
490                    tenant: TenantContext::new(
491                        TenantId::new(tenant_id),
492                        TenantPermissions::full_access(),
493                    ),
494                    job_id: ExportJobId::from_string(id),
495                }
496            })
497            .collect())
498    }
499
500    async fn list_exports(
501        &self,
502        tenant: &TenantContext,
503        include_completed: bool,
504    ) -> StorageResult<Vec<ExportProgress>> {
505        let client = self.get_client().await?;
506        let tenant_id = tenant.tenant_id().as_str();
507
508        let query = if include_completed {
509            "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 ORDER BY created_at DESC"
510        } else {
511            "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
512        };
513
514        let rows = client
515            .query(query, &[&tenant_id])
516            .await
517            .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?;
518
519        let mut results = Vec::new();
520        for row in &rows {
521            let id: String = row.get(0);
522            let job_id = ExportJobId::from_string(id);
523            if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
524                results.push(progress);
525            }
526        }
527
528        Ok(results)
529    }
530}
531
532#[async_trait]
533impl ExportClaimStrategy for PostgresBackend {
534    async fn claim_next(
535        &self,
536        worker_id: &WorkerId,
537        lease_duration: StdDuration,
538    ) -> StorageResult<Option<ExportJobLease>> {
539        let mut client = self.get_client().await?;
540        let now = Utc::now();
541        let lease_expiry = now
542            + chrono::Duration::from_std(lease_duration)
543                .unwrap_or_else(|_| chrono::Duration::seconds(60));
544
545        let txn = client
546            .transaction()
547            .await
548            .map_err(|e| internal_error(format!("Failed to begin claim txn: {}", e)))?;
549
550        let rows = txn
551            .query(
552                "SELECT id, tenant_id, fencing_token FROM bulk_export_jobs
553                 WHERE status = 'accepted'
554                    OR (status = 'in-progress' AND (lease_expiry IS NULL OR lease_expiry < $1))
555                 ORDER BY created_at
556                 LIMIT 1
557                 FOR UPDATE SKIP LOCKED",
558                &[&now],
559            )
560            .await
561            .map_err(|e| internal_error(format!("Failed to select claimable job: {}", e)))?;
562
563        let Some(row) = rows.first() else {
564            txn.commit()
565                .await
566                .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
567            return Ok(None);
568        };
569        let job_id: String = row.get(0);
570        let tenant_id: String = row.get(1);
571        let fencing_token: i64 = row.get(2);
572        let new_token = fencing_token + 1;
573
574        txn.execute(
575            "UPDATE bulk_export_jobs
576             SET status = 'in-progress', worker_id = $1, lease_expiry = $2,
577                 heartbeat_at = $3, fencing_token = $4,
578                 started_at = COALESCE(started_at, $3)
579             WHERE id = $5",
580            &[
581                &worker_id.as_str(),
582                &lease_expiry,
583                &now,
584                &new_token,
585                &job_id.as_str(),
586            ],
587        )
588        .await
589        .map_err(|e| internal_error(format!("Failed to claim export job: {}", e)))?;
590
591        txn.commit()
592            .await
593            .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
594
595        Ok(Some(ExportJobLease {
596            job_id: ExportJobId::from_string(job_id),
597            tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
598            worker_id: worker_id.clone(),
599            lease_expiry,
600            fencing_token: new_token as u64,
601        }))
602    }
603
604    async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError> {
605        let client = self.get_client().await.map_err(LeaseError::Storage)?;
606        let now = Utc::now();
607        let new_expiry = now + chrono::Duration::seconds(60);
608        let affected = client
609            .execute(
610                "UPDATE bulk_export_jobs
611                 SET lease_expiry = $1, heartbeat_at = $2
612                 WHERE id = $3 AND worker_id = $4 AND fencing_token = $5",
613                &[
614                    &new_expiry,
615                    &now,
616                    &lease.job_id.as_str(),
617                    &lease.worker_id.as_str(),
618                    &(lease.fencing_token as i64),
619                ],
620            )
621            .await
622            .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
623        if affected == 0 {
624            Err(LeaseError::LeaseLost {
625                job_id: lease.job_id.clone(),
626            })
627        } else {
628            Ok(new_expiry)
629        }
630    }
631
632    async fn release(&self, lease: ExportJobLease) -> StorageResult<()> {
633        let client = self.get_client().await?;
634        client
635            .execute(
636                "UPDATE bulk_export_jobs
637                 SET status = 'accepted', worker_id = NULL, lease_expiry = NULL
638                 WHERE id = $1 AND worker_id = $2 AND fencing_token = $3
639                   AND status = 'in-progress'",
640                &[
641                    &lease.job_id.as_str(),
642                    &lease.worker_id.as_str(),
643                    &(lease.fencing_token as i64),
644                ],
645            )
646            .await
647            .map_err(|e| internal_error(format!("Failed to release lease: {}", e)))?;
648        Ok(())
649    }
650}
651
652#[async_trait]
653impl ExportWorkerStorage for PostgresBackend {
654    async fn get_export_job_for_worker(
655        &self,
656        tenant: &TenantContext,
657        job_id: &ExportJobId,
658        worker_id: &WorkerId,
659        fencing_token: u64,
660    ) -> Result<WorkerJobView, LeaseError> {
661        let client = self.get_client().await.map_err(LeaseError::Storage)?;
662        let tenant_id = tenant.tenant_id().as_str();
663        let rows = client
664            .query(
665                "SELECT request_json, level, group_id, transaction_time, fhir_version
666                 FROM bulk_export_jobs
667                 WHERE id = $1 AND tenant_id = $2 AND worker_id = $3 AND fencing_token = $4",
668                &[
669                    &job_id.as_str(),
670                    &tenant_id,
671                    &worker_id.as_str(),
672                    &(fencing_token as i64),
673                ],
674            )
675            .await
676            .map_err(|e| LeaseError::Storage(internal_error(format!("load worker job: {e}"))))?;
677        let row = rows.first().ok_or_else(|| LeaseError::LeaseLost {
678            job_id: job_id.clone(),
679        })?;
680        let request_json: String = row.get(0);
681        let level_str: String = row.get(1);
682        let group_id: Option<String> = row.get(2);
683        let transaction_time: DateTime<Utc> = row.get(3);
684        let fhir_version_str: String = row.get(4);
685
686        let request: ExportRequest = serde_json::from_str(&request_json)
687            .map_err(|e| LeaseError::Storage(internal_error(format!("parse request_json: {e}"))))?;
688        let level = match level_str.as_str() {
689            "system" => ExportLevel::System,
690            "patient" => ExportLevel::Patient,
691            "group" => ExportLevel::Group {
692                group_id: group_id.unwrap_or_default(),
693            },
694            _ => {
695                return Err(LeaseError::Storage(internal_error(format!(
696                    "Invalid level: {level_str}"
697                ))));
698            }
699        };
700        let fhir_version = helios_fhir::FhirVersion::from_mime_param(&fhir_version_str)
701            .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
702
703        let progress_rows = client
704            .query(
705                "SELECT resource_type, total_count, exported_count, error_count, cursor_state
706                 FROM bulk_export_progress WHERE job_id = $1",
707                &[&job_id.as_str()],
708            )
709            .await
710            .map_err(|e| LeaseError::Storage(internal_error(format!("query progress: {e}"))))?;
711        let type_progress: Vec<TypeExportProgress> = progress_rows
712            .iter()
713            .map(|r| TypeExportProgress {
714                resource_type: r.get(0),
715                total_count: r.get::<_, Option<i32>>(1).map(|v| v as u64),
716                exported_count: r.get::<_, i32>(2) as u64,
717                error_count: r.get::<_, i32>(3) as u64,
718                cursor_state: r.get(4),
719            })
720            .collect();
721
722        Ok(WorkerJobView {
723            request,
724            level,
725            transaction_time,
726            fhir_version,
727            type_progress,
728        })
729    }
730
731    async fn mark_export_in_progress(
732        &self,
733        tenant: &TenantContext,
734        job_id: &ExportJobId,
735        worker_id: &WorkerId,
736        fencing_token: u64,
737    ) -> Result<(), LeaseError> {
738        let client = self.get_client().await.map_err(LeaseError::Storage)?;
739        let now = Utc::now();
740        let affected = client
741            .execute(
742                "UPDATE bulk_export_jobs
743                 SET status = 'in-progress', started_at = COALESCE(started_at, $1)
744                 WHERE id = $2 AND tenant_id = $3 AND worker_id = $4 AND fencing_token = $5",
745                &[
746                    &now,
747                    &job_id.as_str(),
748                    &tenant.tenant_id().as_str(),
749                    &worker_id.as_str(),
750                    &(fencing_token as i64),
751                ],
752            )
753            .await
754            .map_err(|e| LeaseError::Storage(internal_error(format!("mark_in_progress: {e}"))))?;
755        if affected == 0 {
756            Err(LeaseError::LeaseLost {
757                job_id: job_id.clone(),
758            })
759        } else {
760            Ok(())
761        }
762    }
763
764    async fn update_export_type_progress(
765        &self,
766        tenant: &TenantContext,
767        job_id: &ExportJobId,
768        worker_id: &WorkerId,
769        fencing_token: u64,
770        progress: &TypeExportProgress,
771    ) -> Result<(), LeaseError> {
772        let client = self.get_client().await.map_err(LeaseError::Storage)?;
773        let affected = client
774            .execute(
775                "INSERT INTO bulk_export_progress
776                   (job_id, resource_type, total_count, exported_count, error_count, cursor_state)
777                 SELECT $1, $2, $3, $4, $5, $6
778                 WHERE EXISTS (
779                     SELECT 1 FROM bulk_export_jobs
780                     WHERE id = $1 AND tenant_id = $7 AND worker_id = $8 AND fencing_token = $9
781                 )
782                 ON CONFLICT (job_id, resource_type) DO UPDATE SET
783                   total_count = EXCLUDED.total_count,
784                   exported_count = EXCLUDED.exported_count,
785                   error_count = EXCLUDED.error_count,
786                   cursor_state = EXCLUDED.cursor_state",
787                &[
788                    &job_id.as_str(),
789                    &progress.resource_type.as_str(),
790                    &progress.total_count.map(|v| v as i32),
791                    &(progress.exported_count as i32),
792                    &(progress.error_count as i32),
793                    &progress.cursor_state,
794                    &tenant.tenant_id().as_str(),
795                    &worker_id.as_str(),
796                    &(fencing_token as i64),
797                ],
798            )
799            .await
800            .map_err(|e| {
801                LeaseError::Storage(internal_error(format!("update_type_progress: {e}")))
802            })?;
803        if affected == 0 {
804            Err(LeaseError::LeaseLost {
805                job_id: job_id.clone(),
806            })
807        } else {
808            Ok(())
809        }
810    }
811
812    async fn record_export_file(
813        &self,
814        tenant: &TenantContext,
815        job_id: &ExportJobId,
816        worker_id: &WorkerId,
817        fencing_token: u64,
818        part: &FinalizedPart,
819        file_type: &str,
820    ) -> Result<(), LeaseError> {
821        let client = self.get_client().await.map_err(LeaseError::Storage)?;
822        let file_path = encode_part_path(&part.key);
823        let affected = client
824            .execute(
825                "INSERT INTO bulk_export_files
826                   (job_id, resource_type, file_type, file_path, resource_count, byte_count,
827                    part_index, fencing_token)
828                 SELECT $1, $2, $3, $4, $5, $6, $7, $8
829                 WHERE EXISTS (
830                     SELECT 1 FROM bulk_export_jobs
831                     WHERE id = $1 AND tenant_id = $9 AND worker_id = $10 AND fencing_token = $11
832                 )
833                 ON CONFLICT (job_id, file_type, resource_type, part_index) DO UPDATE SET
834                   file_path = EXCLUDED.file_path,
835                   resource_count = EXCLUDED.resource_count,
836                   byte_count = EXCLUDED.byte_count,
837                   fencing_token = EXCLUDED.fencing_token",
838                &[
839                    &job_id.as_str(),
840                    &part.resource_type.as_str(),
841                    &file_type,
842                    &file_path.as_str(),
843                    &(part.line_count as i32),
844                    &(part.size_bytes as i64),
845                    &(part.key.part_index as i32),
846                    &(part.key.fencing_token as i64),
847                    &tenant.tenant_id().as_str(),
848                    &worker_id.as_str(),
849                    &(fencing_token as i64),
850                ],
851            )
852            .await
853            .map_err(|e| LeaseError::Storage(internal_error(format!("record_export_file: {e}"))))?;
854        if affected == 0 {
855            Err(LeaseError::LeaseLost {
856                job_id: job_id.clone(),
857            })
858        } else {
859            Ok(())
860        }
861    }
862
863    async fn finish_export_job(
864        &self,
865        tenant: &TenantContext,
866        job_id: &ExportJobId,
867        worker_id: &WorkerId,
868        fencing_token: u64,
869    ) -> Result<(), LeaseError> {
870        let client = self.get_client().await.map_err(LeaseError::Storage)?;
871        let now = Utc::now();
872        let affected = client
873            .execute(
874                "UPDATE bulk_export_jobs
875                 SET status = 'complete', completed_at = $1
876                 WHERE id = $2 AND tenant_id = $3 AND worker_id = $4 AND fencing_token = $5",
877                &[
878                    &now,
879                    &job_id.as_str(),
880                    &tenant.tenant_id().as_str(),
881                    &worker_id.as_str(),
882                    &(fencing_token as i64),
883                ],
884            )
885            .await
886            .map_err(|e| LeaseError::Storage(internal_error(format!("finish_job: {e}"))))?;
887        if affected == 0 {
888            Err(LeaseError::LeaseLost {
889                job_id: job_id.clone(),
890            })
891        } else {
892            Ok(())
893        }
894    }
895
896    async fn fail_export_job(
897        &self,
898        tenant: &TenantContext,
899        job_id: &ExportJobId,
900        worker_id: &WorkerId,
901        fencing_token: u64,
902        error_message: &str,
903    ) -> Result<(), LeaseError> {
904        let client = self.get_client().await.map_err(LeaseError::Storage)?;
905        let now = Utc::now();
906        let affected = client
907            .execute(
908                "UPDATE bulk_export_jobs
909                 SET status = 'error', error_message = $1, completed_at = $2
910                 WHERE id = $3 AND tenant_id = $4 AND worker_id = $5 AND fencing_token = $6",
911                &[
912                    &error_message,
913                    &now,
914                    &job_id.as_str(),
915                    &tenant.tenant_id().as_str(),
916                    &worker_id.as_str(),
917                    &(fencing_token as i64),
918                ],
919            )
920            .await
921            .map_err(|e| LeaseError::Storage(internal_error(format!("fail_job: {e}"))))?;
922        if affected == 0 {
923            Err(LeaseError::LeaseLost {
924                job_id: job_id.clone(),
925            })
926        } else {
927            Ok(())
928        }
929    }
930}
931
932#[async_trait]
933impl ExportDataProvider for PostgresBackend {
934    async fn list_export_types(
935        &self,
936        tenant: &TenantContext,
937        request: &ExportRequest,
938    ) -> StorageResult<Vec<String>> {
939        let client = self.get_client().await?;
940        let tenant_id = tenant.tenant_id().as_str();
941
942        if !request.resource_types.is_empty() {
943            let mut valid_types = Vec::new();
944            for rt in &request.resource_types {
945                let row = client
946                    .query_one(
947                        "SELECT EXISTS(SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE LIMIT 1)",
948                        &[&tenant_id, &rt.as_str()],
949                    )
950                    .await
951                    .map_err(|e| internal_error(format!("Failed to check type: {}", e)))?;
952
953                let exists: bool = row.get(0);
954                if exists {
955                    valid_types.push(rt.clone());
956                }
957            }
958            return Ok(valid_types);
959        }
960
961        let rows = client
962            .query(
963                "SELECT DISTINCT resource_type FROM resources
964                 WHERE tenant_id = $1 AND is_deleted = FALSE
965                 ORDER BY resource_type",
966                &[&tenant_id],
967            )
968            .await
969            .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?;
970
971        let types: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
972        Ok(types)
973    }
974
975    async fn count_export_resources(
976        &self,
977        tenant: &TenantContext,
978        request: &ExportRequest,
979        resource_type: &str,
980    ) -> StorageResult<u64> {
981        let client = self.get_client().await?;
982        let tenant_id = tenant.tenant_id().as_str();
983
984        let (sql, params): (
985            String,
986            Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
987        ) = if let Some(since) = request.since {
988            (
989                "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND last_updated >= $3".to_string(),
990                vec![
991                    Box::new(tenant_id.to_string()),
992                    Box::new(resource_type.to_string()),
993                    Box::new(since),
994                ],
995            )
996        } else {
997            (
998                "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string(),
999                vec![
1000                    Box::new(tenant_id.to_string()),
1001                    Box::new(resource_type.to_string()),
1002                ],
1003            )
1004        };
1005
1006        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1007            .iter()
1008            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1009            .collect();
1010
1011        let row = client
1012            .query_one(&sql, &param_refs)
1013            .await
1014            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1015
1016        let count: i64 = row.get(0);
1017        Ok(count as u64)
1018    }
1019
1020    async fn fetch_export_batch(
1021        &self,
1022        tenant: &TenantContext,
1023        request: &ExportRequest,
1024        resource_type: &str,
1025        cursor: Option<&str>,
1026        batch_size: u32,
1027    ) -> StorageResult<NdjsonBatch> {
1028        let client = self.get_client().await?;
1029        let tenant_id = tenant.tenant_id().as_str();
1030
1031        let mut sql = "SELECT id, data, last_updated FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
1032        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1033            Box::new(tenant_id.to_string()),
1034            Box::new(resource_type.to_string()),
1035        ];
1036        let mut param_idx = 3;
1037
1038        if let Some(since) = request.since {
1039            sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
1040            params.push(Box::new(since));
1041            param_idx += 1;
1042        }
1043
1044        if let Some(cursor) = cursor {
1045            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1046            if parts.len() == 2 {
1047                if let Ok(dt) = DateTime::parse_from_rfc3339(parts[0]) {
1048                    sql.push_str(&format!(
1049                        " AND (last_updated, id) > (${}, ${})",
1050                        param_idx,
1051                        param_idx + 1
1052                    ));
1053                    params.push(Box::new(dt.with_timezone(&Utc)));
1054                    params.push(Box::new(parts[1].to_string()));
1055                }
1056            }
1057        }
1058
1059        sql.push_str(&format!(
1060            " ORDER BY last_updated, id LIMIT {}",
1061            batch_size + 1
1062        ));
1063
1064        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1065            .iter()
1066            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1067            .collect();
1068
1069        let rows = client
1070            .query(&sql, &param_refs)
1071            .await
1072            .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?;
1073
1074        let has_more = rows.len() > batch_size as usize;
1075        let rows_to_process = if has_more {
1076            &rows[..batch_size as usize]
1077        } else {
1078            &rows[..]
1079        };
1080
1081        let mut lines = Vec::new();
1082        let mut last_cursor = None;
1083
1084        for row in rows_to_process {
1085            let id: String = row.get(0);
1086            let resource: Value = row.get(1);
1087            let last_updated: chrono::DateTime<Utc> = row.get(2);
1088
1089            let line = serde_json::to_string(&resource)
1090                .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1091            lines.push(line);
1092            last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
1093        }
1094
1095        Ok(NdjsonBatch {
1096            lines,
1097            next_cursor: if has_more { last_cursor } else { None },
1098            is_last: !has_more,
1099        })
1100    }
1101}
1102
1103#[async_trait]
1104impl PatientExportProvider for PostgresBackend {
1105    async fn list_patient_ids(
1106        &self,
1107        tenant: &TenantContext,
1108        request: &ExportRequest,
1109        cursor: Option<&str>,
1110        batch_size: u32,
1111    ) -> StorageResult<(Vec<String>, Option<String>)> {
1112        let client = self.get_client().await?;
1113        let tenant_id = tenant.tenant_id().as_str();
1114
1115        let mut sql = "SELECT id FROM resources WHERE tenant_id = $1 AND resource_type = 'Patient' AND is_deleted = FALSE".to_string();
1116        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1117            vec![Box::new(tenant_id.to_string())];
1118        let mut param_idx = 2;
1119
1120        if let Some(since) = request.since {
1121            sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
1122            params.push(Box::new(since));
1123            param_idx += 1;
1124        }
1125
1126        if let Some(cursor) = cursor {
1127            sql.push_str(&format!(" AND id > ${}", param_idx));
1128            params.push(Box::new(cursor.to_string()));
1129        }
1130
1131        sql.push_str(&format!(" ORDER BY id LIMIT {}", batch_size + 1));
1132
1133        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1134            .iter()
1135            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1136            .collect();
1137
1138        let rows = client
1139            .query(&sql, &param_refs)
1140            .await
1141            .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?;
1142
1143        let mut ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1144
1145        let has_more = ids.len() > batch_size as usize;
1146        if has_more {
1147            ids.truncate(batch_size as usize);
1148        }
1149
1150        let next_cursor = if has_more { ids.last().cloned() } else { None };
1151
1152        Ok((ids, next_cursor))
1153    }
1154
1155    async fn fetch_patient_compartment_batch(
1156        &self,
1157        tenant: &TenantContext,
1158        request: &ExportRequest,
1159        resource_type: &str,
1160        patient_ids: &[String],
1161        cursor: Option<&str>,
1162        batch_size: u32,
1163    ) -> StorageResult<NdjsonBatch> {
1164        if patient_ids.is_empty() {
1165            return Ok(NdjsonBatch::empty());
1166        }
1167
1168        let client = self.get_client().await?;
1169        let tenant_id = tenant.tenant_id().as_str();
1170
1171        if resource_type == "Patient" {
1172            // For Patient resources, just filter by the IDs using ANY($3::text[])
1173            let mut sql = "SELECT id, data, last_updated FROM resources
1174                 WHERE tenant_id = $1 AND resource_type = $2 AND id = ANY($3::text[]) AND is_deleted = FALSE".to_string();
1175
1176            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1177                Box::new(tenant_id.to_string()),
1178                Box::new(resource_type.to_string()),
1179                Box::new(patient_ids.to_vec()),
1180            ];
1181            let param_idx = 4;
1182
1183            if let Some(cursor) = cursor {
1184                let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1185                if parts.len() == 2 {
1186                    if let Ok(dt) = DateTime::parse_from_rfc3339(parts[0]) {
1187                        sql.push_str(&format!(
1188                            " AND (last_updated, id) > (${}, ${})",
1189                            param_idx,
1190                            param_idx + 1
1191                        ));
1192                        params.push(Box::new(dt.with_timezone(&Utc)));
1193                        params.push(Box::new(parts[1].to_string()));
1194                    }
1195                }
1196            }
1197
1198            sql.push_str(&format!(
1199                " ORDER BY last_updated, id LIMIT {}",
1200                batch_size + 1
1201            ));
1202
1203            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1204                .iter()
1205                .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1206                .collect();
1207
1208            let rows = client
1209                .query(&sql, &param_refs)
1210                .await
1211                .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
1212
1213            let has_more = rows.len() > batch_size as usize;
1214            let rows_slice = if has_more {
1215                &rows[..batch_size as usize]
1216            } else {
1217                &rows[..]
1218            };
1219
1220            let mut lines = Vec::new();
1221            let mut last_cursor = None;
1222
1223            for row in rows_slice {
1224                let id: String = row.get(0);
1225                let resource: Value = row.get(1);
1226                let last_updated: chrono::DateTime<Utc> = row.get(2);
1227
1228                let line = serde_json::to_string(&resource)
1229                    .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
1230                lines.push(line);
1231                last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
1232            }
1233
1234            return Ok(NdjsonBatch {
1235                lines,
1236                next_cursor: if has_more { last_cursor } else { None },
1237                is_last: !has_more,
1238            });
1239        }
1240
1241        // For other resource types, find resources whose JSONB payload
1242        // references one of the patients via `subject.reference` or
1243        // `patient.reference`. We read the payload directly rather than the
1244        // search_index, so this is correct even when search is offloaded to a
1245        // secondary backend (postgres-elasticsearch), which leaves the local
1246        // search_index empty.
1247        let patient_refs: Vec<String> = patient_ids
1248            .iter()
1249            .map(|id| format!("Patient/{}", id))
1250            .collect();
1251
1252        let mut sql = "SELECT id, data, last_updated FROM resources
1253             WHERE tenant_id = $1
1254                AND resource_type = $2
1255                AND is_deleted = FALSE
1256                AND ((data #>> '{subject,reference}') = ANY($3::text[])
1257                  OR (data #>> '{patient,reference}') = ANY($3::text[]))"
1258            .to_string();
1259
1260        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1261            Box::new(tenant_id.to_string()),
1262            Box::new(resource_type.to_string()),
1263            Box::new(patient_refs),
1264        ];
1265        let mut param_idx = 4;
1266
1267        if let Some(since) = request.since {
1268            sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
1269            params.push(Box::new(since));
1270            param_idx += 1;
1271        }
1272
1273        if let Some(cursor) = cursor {
1274            let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1275            if parts.len() == 2 {
1276                if let Ok(dt) = DateTime::parse_from_rfc3339(parts[0]) {
1277                    sql.push_str(&format!(
1278                        " AND (last_updated, id) > (${}, ${})",
1279                        param_idx,
1280                        param_idx + 1
1281                    ));
1282                    params.push(Box::new(dt.with_timezone(&Utc)));
1283                    params.push(Box::new(parts[1].to_string()));
1284                }
1285            }
1286        }
1287
1288        sql.push_str(&format!(
1289            " ORDER BY last_updated, id LIMIT {}",
1290            batch_size + 1
1291        ));
1292
1293        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1294            .iter()
1295            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1296            .collect();
1297
1298        let rows = client
1299            .query(&sql, &param_refs)
1300            .await
1301            .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
1302
1303        let has_more = rows.len() > batch_size as usize;
1304        let rows_slice = if has_more {
1305            &rows[..batch_size as usize]
1306        } else {
1307            &rows[..]
1308        };
1309
1310        let mut lines = Vec::new();
1311        let mut last_cursor = None;
1312
1313        for row in rows_slice {
1314            let id: String = row.get(0);
1315            let resource: Value = row.get(1);
1316            let last_updated: chrono::DateTime<Utc> = row.get(2);
1317
1318            let line = serde_json::to_string(&resource)
1319                .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
1320            lines.push(line);
1321            last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
1322        }
1323
1324        Ok(NdjsonBatch {
1325            lines,
1326            next_cursor: if has_more { last_cursor } else { None },
1327            is_last: !has_more,
1328        })
1329    }
1330}
1331
1332#[async_trait]
1333impl GroupExportProvider for PostgresBackend {
1334    async fn get_group_members(
1335        &self,
1336        tenant: &TenantContext,
1337        group_id: &str,
1338    ) -> StorageResult<Vec<String>> {
1339        let client = self.get_client().await?;
1340        let tenant_id = tenant.tenant_id().as_str();
1341
1342        let rows = client
1343            .query(
1344                "SELECT data FROM resources WHERE tenant_id = $1 AND resource_type = 'Group' AND id = $2 AND is_deleted = FALSE",
1345                &[&tenant_id, &group_id],
1346            )
1347            .await
1348            .map_err(|e| internal_error(format!("Failed to fetch group: {}", e)))?;
1349
1350        if rows.is_empty() {
1351            return Ok(Vec::new());
1352        }
1353
1354        let data: Value = rows[0].get(0);
1355
1356        // Extract member references from the Group resource
1357        let mut member_refs = Vec::new();
1358        if let Some(members) = data.get("member").and_then(|m| m.as_array()) {
1359            for member in members {
1360                if let Some(reference) = member
1361                    .get("entity")
1362                    .and_then(|e| e.get("reference"))
1363                    .and_then(|r| r.as_str())
1364                {
1365                    member_refs.push(reference.to_string());
1366                }
1367            }
1368        }
1369
1370        Ok(member_refs)
1371    }
1372
1373    async fn resolve_group_patient_ids(
1374        &self,
1375        tenant: &TenantContext,
1376        group_id: &str,
1377    ) -> StorageResult<Vec<String>> {
1378        // Flatten nested Groups iteratively, guarding against membership
1379        // cycles with a visited set.
1380        use std::collections::HashSet;
1381        let mut visited_groups: HashSet<String> = HashSet::new();
1382        let mut seen_patients: HashSet<String> = HashSet::new();
1383        let mut patient_ids: Vec<String> = Vec::new();
1384        let mut worklist: Vec<String> = vec![group_id.to_string()];
1385
1386        while let Some(gid) = worklist.pop() {
1387            if !visited_groups.insert(gid.clone()) {
1388                continue; // cycle / already processed
1389            }
1390            let members = self.get_group_members(tenant, &gid).await?;
1391            for member_ref in &members {
1392                if let Some(id) = member_ref.strip_prefix("Patient/") {
1393                    if seen_patients.insert(id.to_string()) {
1394                        patient_ids.push(id.to_string());
1395                    }
1396                } else if let Some(nested) = member_ref.strip_prefix("Group/") {
1397                    worklist.push(nested.to_string());
1398                }
1399            }
1400        }
1401
1402        Ok(patient_ids)
1403    }
1404
1405    async fn get_group_members_with_periods(
1406        &self,
1407        tenant: &TenantContext,
1408        group_id: &str,
1409    ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
1410        let client = self.get_client().await?;
1411        let tenant_id = tenant.tenant_id().as_str();
1412        let rows = client
1413            .query(
1414                "SELECT data FROM resources
1415                 WHERE tenant_id = $1 AND resource_type = 'Group'
1416                   AND id = $2 AND is_deleted = false",
1417                &[&tenant_id, &group_id],
1418            )
1419            .await
1420            .map_err(|e| internal_error(format!("Failed to get group: {}", e)))?;
1421        let row = rows.first().ok_or_else(|| {
1422            StorageError::BulkExport(BulkExportError::GroupNotFound {
1423                group_id: group_id.to_string(),
1424            })
1425        })?;
1426        let data: Vec<u8> = row.get(0);
1427        let group: Value = serde_json::from_slice(&data)
1428            .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
1429        let mut out = Vec::new();
1430        if let Some(arr) = group.get("member").and_then(|m| m.as_array()) {
1431            for member in arr {
1432                let Some(reference) = member
1433                    .get("entity")
1434                    .and_then(|e| e.get("reference"))
1435                    .and_then(|r| r.as_str())
1436                else {
1437                    continue;
1438                };
1439                let period_start = member
1440                    .get("period")
1441                    .and_then(|p| p.get("start"))
1442                    .and_then(|s| s.as_str())
1443                    .and_then(|s| {
1444                        DateTime::parse_from_rfc3339(s)
1445                            .ok()
1446                            .map(|dt| dt.with_timezone(&Utc))
1447                    });
1448                out.push((reference.to_string(), period_start));
1449            }
1450        }
1451        Ok(out)
1452    }
1453}