1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use serde_json::Value;
6use std::time::Duration as StdDuration;
7
8use crate::core::bulk_export::{
9 BulkExportStorage, ExpiredExportRef, ExportDataProvider, ExportFileMetadata, ExportJobId,
10 ExportJobMetadata, ExportLevel, ExportProgress, ExportRequest, ExportStatus,
11 GroupExportProvider, NdjsonBatch, PatientExportProvider, RawExportManifest, RawManifestEntry,
12 StartExportInput, TypeExportProgress,
13};
14use crate::core::bulk_export_output::{ExportPartKey, FinalizedPart};
15use crate::core::bulk_export_worker::{
16 ExportClaimStrategy, ExportJobLease, ExportWorkerStorage, LeaseError, WorkerId, WorkerJobView,
17};
18use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
19use crate::tenant::{TenantContext, TenantId, TenantPermissions};
20
21use super::PostgresBackend;
22
23fn internal_error(message: String) -> StorageError {
24 StorageError::Backend(BackendError::Internal {
25 backend_name: "postgres".to_string(),
26 message,
27 source: None,
28 })
29}
30
31fn parse_part_segment(part: &str) -> Option<(String, u32)> {
33 let idx = part.rfind('-')?;
34 let resource_type = &part[..idx];
35 let part_index: u32 = part[idx + 1..].parse().ok()?;
36 if resource_type.is_empty() {
37 return None;
38 }
39 Some((resource_type.to_string(), part_index))
40}
41
42fn encode_part_path(key: &ExportPartKey) -> String {
44 format!(
45 "{}/{}/{}/{}-{}-{}",
46 key.tenant_id,
47 key.job_id,
48 key.file_type,
49 key.resource_type,
50 key.part_index,
51 key.fencing_token
52 )
53}
54
55#[async_trait]
56impl BulkExportStorage for PostgresBackend {
57 async fn start_export(
58 &self,
59 tenant: &TenantContext,
60 input: StartExportInput,
61 ) -> StorageResult<ExportJobId> {
62 let client = self.get_client().await?;
63 let tenant_id = tenant.tenant_id().as_str();
64
65 let job_id = ExportJobId::new();
66 let now = Utc::now();
67
68 let level_str = match &input.request.level {
69 ExportLevel::System => "system".to_string(),
70 ExportLevel::Patient => "patient".to_string(),
71 ExportLevel::Group { .. } => "group".to_string(),
72 };
73
74 let group_id = input.request.group_id().map(|s| s.to_string());
75
76 let request_json = serde_json::to_string(&input.request)
77 .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
78 let fhir_version = input.fhir_version.as_mime_param();
79
80 client
81 .execute(
82 "INSERT INTO bulk_export_jobs
83 (id, tenant_id, status, level, group_id, request_json, transaction_time,
84 created_at, owner_subject, request_url, fhir_version, fencing_token)
85 VALUES ($1, $2, 'accepted', $3, $4, $5, $6, $7, $8, $9, $10, 0)",
86 &[
87 &job_id.as_str(),
88 &tenant_id,
89 &level_str.as_str(),
90 &group_id,
91 &request_json.as_str(),
92 &input.transaction_time,
93 &now,
94 &input.owner_subject,
95 &input.request_url.as_str(),
96 &fhir_version,
97 ],
98 )
99 .await
100 .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
101
102 Ok(job_id)
103 }
104
105 async fn get_export_status(
106 &self,
107 tenant: &TenantContext,
108 job_id: &ExportJobId,
109 ) -> StorageResult<ExportProgress> {
110 let client = self.get_client().await?;
111 let tenant_id = tenant.tenant_id().as_str();
112
113 let rows = client
114 .query(
115 "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
116 FROM bulk_export_jobs
117 WHERE id = $1 AND tenant_id = $2",
118 &[&job_id.as_str(), &tenant_id],
119 )
120 .await
121 .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
122
123 if rows.is_empty() {
124 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
125 job_id: job_id.to_string(),
126 }));
127 }
128
129 let row = &rows[0];
130 let status_str: String = row.get(0);
131 let level_str: String = row.get(1);
132 let group_id: Option<String> = row.get(2);
133 let transaction_time: chrono::DateTime<Utc> = row.get(3);
134 let started_at: Option<chrono::DateTime<Utc>> = row.get(4);
135 let completed_at: Option<chrono::DateTime<Utc>> = row.get(5);
136 let error_message: Option<String> = row.get(6);
137 let current_type: Option<String> = row.get(7);
138
139 let status: ExportStatus = status_str
140 .parse()
141 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
142
143 let level = match level_str.as_str() {
144 "system" => ExportLevel::System,
145 "patient" => ExportLevel::Patient,
146 "group" => ExportLevel::Group {
147 group_id: group_id.unwrap_or_default(),
148 },
149 _ => {
150 return Err(internal_error(format!(
151 "Invalid level in database: {}",
152 level_str
153 )));
154 }
155 };
156
157 let progress_rows = client
159 .query(
160 "SELECT resource_type, total_count, exported_count, error_count, cursor_state
161 FROM bulk_export_progress
162 WHERE job_id = $1",
163 &[&job_id.as_str()],
164 )
165 .await
166 .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?;
167
168 let type_progress: Vec<TypeExportProgress> = progress_rows
169 .iter()
170 .map(|r| TypeExportProgress {
171 resource_type: r.get(0),
172 total_count: r.get::<_, Option<i32>>(1).map(|v| v as u64),
173 exported_count: r.get::<_, i32>(2) as u64,
174 error_count: r.get::<_, i32>(3) as u64,
175 cursor_state: r.get(4),
176 })
177 .collect();
178
179 Ok(ExportProgress {
180 job_id: job_id.clone(),
181 status,
182 level,
183 transaction_time,
184 started_at,
185 completed_at,
186 type_progress,
187 current_type,
188 error_message,
189 })
190 }
191
192 async fn cancel_export(
193 &self,
194 tenant: &TenantContext,
195 job_id: &ExportJobId,
196 ) -> StorageResult<()> {
197 let client = self.get_client().await?;
198 let tenant_id = tenant.tenant_id().as_str();
199
200 let rows = client
201 .query(
202 "SELECT status FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
203 &[&job_id.as_str(), &tenant_id],
204 )
205 .await
206 .map_err(|e| internal_error(format!("Failed to get export status: {}", e)))?;
207
208 if rows.is_empty() {
209 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
210 job_id: job_id.to_string(),
211 }));
212 }
213
214 let current_status: String = rows[0].get(0);
215 let status: ExportStatus = current_status.parse().map_err(|_| {
216 internal_error(format!("Invalid status in database: {}", current_status))
217 })?;
218
219 if status.is_terminal() {
220 return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
221 job_id: job_id.to_string(),
222 expected: "accepted or in-progress".to_string(),
223 actual: current_status,
224 }));
225 }
226
227 let now = Utc::now();
228 client
229 .execute(
230 "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = $1 WHERE id = $2",
231 &[&now, &job_id.as_str()],
232 )
233 .await
234 .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
235
236 Ok(())
237 }
238
239 async fn delete_export(
240 &self,
241 tenant: &TenantContext,
242 job_id: &ExportJobId,
243 ) -> StorageResult<()> {
244 let client = self.get_client().await?;
245 let tenant_id = tenant.tenant_id().as_str();
246
247 let result = client
248 .execute(
249 "DELETE FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
250 &[&job_id.as_str(), &tenant_id],
251 )
252 .await
253 .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
254
255 if result == 0 {
256 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
257 job_id: job_id.to_string(),
258 }));
259 }
260
261 Ok(())
262 }
263
264 async fn get_export_manifest(
265 &self,
266 tenant: &TenantContext,
267 job_id: &ExportJobId,
268 ) -> StorageResult<RawExportManifest> {
269 let client = self.get_client().await?;
270 let tenant_id = tenant.tenant_id().as_str();
271
272 let job_rows = client
273 .query(
274 "SELECT status, transaction_time, request_url, error_message, completed_at
275 FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
276 &[&job_id.as_str(), &tenant_id],
277 )
278 .await
279 .map_err(|e| internal_error(format!("Failed to get export job: {}", e)))?;
280 let job_row = job_rows.first().ok_or_else(|| {
281 StorageError::BulkExport(BulkExportError::JobNotFound {
282 job_id: job_id.to_string(),
283 })
284 })?;
285 let status_str: String = job_row.get(0);
286 let transaction_time: DateTime<Utc> = job_row.get(1);
287 let request_url: String = job_row.get(2);
288 let error_message: Option<String> = job_row.get(3);
289 let completed_at: Option<DateTime<Utc>> = job_row.get(4);
290 let status: ExportStatus = status_str
291 .parse()
292 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
293
294 let rows = client
295 .query(
296 "SELECT resource_type, resource_count, file_type, part_index, fencing_token
297 FROM bulk_export_files
298 WHERE job_id = $1
299 ORDER BY file_type, resource_type, part_index",
300 &[&job_id.as_str()],
301 )
302 .await
303 .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?;
304
305 let mut output = Vec::new();
306 let mut errors = Vec::new();
307 for row in &rows {
308 let resource_type: String = row.get(0);
309 let count: Option<i32> = row.get(1);
310 let file_type: String = row.get(2);
311 let part_index: i32 = row.get(3);
312 let fencing_token: i64 = row.get(4);
313 let key = ExportPartKey {
314 tenant_id: tenant_id.to_string(),
315 job_id: job_id.clone(),
316 resource_type: resource_type.clone(),
317 file_type: file_type.clone(),
318 part_index: part_index as u32,
319 fencing_token: fencing_token as u64,
320 };
321 let entry = RawManifestEntry {
322 resource_type,
323 key,
324 count: count.unwrap_or(0) as u64,
325 };
326 if file_type == "error" {
327 errors.push(entry);
328 } else {
329 output.push(entry);
330 }
331 }
332
333 Ok(RawExportManifest {
334 transaction_time,
335 request_url,
336 status,
337 error_message,
338 completed_at,
339 output,
340 errors,
341 })
342 }
343
344 async fn get_export_job_metadata(
345 &self,
346 tenant: &TenantContext,
347 job_id: &ExportJobId,
348 ) -> StorageResult<ExportJobMetadata> {
349 let client = self.get_client().await?;
350 let tenant_id = tenant.tenant_id().as_str();
351 let rows = client
352 .query(
353 "SELECT status, level, group_id, owner_subject, transaction_time,
354 completed_at, request_url
355 FROM bulk_export_jobs WHERE id = $1 AND tenant_id = $2",
356 &[&job_id.as_str(), &tenant_id],
357 )
358 .await
359 .map_err(|e| internal_error(format!("Failed to get export job metadata: {}", e)))?;
360 let row = rows.first().ok_or_else(|| {
361 StorageError::BulkExport(BulkExportError::JobNotFound {
362 job_id: job_id.to_string(),
363 })
364 })?;
365 let status_str: String = row.get(0);
366 let level_str: String = row.get(1);
367 let group_id: Option<String> = row.get(2);
368 let owner_subject: Option<String> = row.get(3);
369 let transaction_time: DateTime<Utc> = row.get(4);
370 let completed_at: Option<DateTime<Utc>> = row.get(5);
371 let request_url: String = row.get(6);
372 let status: ExportStatus = status_str
373 .parse()
374 .map_err(|_| internal_error(format!("Invalid status: {}", status_str)))?;
375 let level = match level_str.as_str() {
376 "system" => ExportLevel::System,
377 "patient" => ExportLevel::Patient,
378 "group" => ExportLevel::Group {
379 group_id: group_id.unwrap_or_default(),
380 },
381 _ => return Err(internal_error(format!("Invalid level: {}", level_str))),
382 };
383 Ok(ExportJobMetadata {
384 job_id: job_id.clone(),
385 status,
386 level,
387 owner_subject,
388 transaction_time,
389 completed_at,
390 request_url,
391 })
392 }
393
394 async fn get_export_file_metadata(
395 &self,
396 tenant: &TenantContext,
397 job_id: &ExportJobId,
398 part: &str,
399 ) -> StorageResult<ExportFileMetadata> {
400 let (resource_type, part_index) = parse_part_segment(part).ok_or_else(|| {
401 StorageError::BulkExport(BulkExportError::JobNotFound {
402 job_id: format!("{job_id}/{part}"),
403 })
404 })?;
405 let client = self.get_client().await?;
406 let tenant_id = tenant.tenant_id().as_str();
407 let rows = client
408 .query(
409 "SELECT f.file_type, f.resource_count, f.fencing_token, j.owner_subject
410 FROM bulk_export_files f
411 JOIN bulk_export_jobs j ON j.id = f.job_id
412 WHERE f.job_id = $1 AND j.tenant_id = $2
413 AND f.resource_type = $3 AND f.part_index = $4",
414 &[
415 &job_id.as_str(),
416 &tenant_id,
417 &resource_type.as_str(),
418 &(part_index as i32),
419 ],
420 )
421 .await
422 .map_err(|e| internal_error(format!("Failed to get file metadata: {}", e)))?;
423 let row = rows.first().ok_or_else(|| {
424 StorageError::BulkExport(BulkExportError::JobNotFound {
425 job_id: format!("{job_id}/{part}"),
426 })
427 })?;
428 let file_type: String = row.get(0);
429 let resource_count: Option<i32> = row.get(1);
430 let fencing_token: i64 = row.get(2);
431 let owner_subject: Option<String> = row.get(3);
432 let key = ExportPartKey {
433 tenant_id: tenant_id.to_string(),
434 job_id: job_id.clone(),
435 resource_type: resource_type.clone(),
436 file_type: file_type.clone(),
437 part_index,
438 fencing_token: fencing_token as u64,
439 };
440 Ok(ExportFileMetadata {
441 key,
442 resource_type,
443 file_type,
444 line_count: resource_count.unwrap_or(0) as u64,
445 job_owner_subject: owner_subject,
446 })
447 }
448
449 async fn count_active_exports(&self, tenant: &TenantContext) -> StorageResult<u64> {
450 let client = self.get_client().await?;
451 let tenant_id = tenant.tenant_id().as_str();
452 let row = client
453 .query_one(
454 "SELECT COUNT(*) FROM bulk_export_jobs
455 WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress')",
456 &[&tenant_id],
457 )
458 .await
459 .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
460 let count: i64 = row.get(0);
461 Ok(count as u64)
462 }
463
464 async fn list_expired_exports(
465 &self,
466 now: DateTime<Utc>,
467 output_ttl: StdDuration,
468 limit: u32,
469 ) -> StorageResult<Vec<ExpiredExportRef>> {
470 let client = self.get_client().await?;
471 let cutoff = now
472 - chrono::Duration::from_std(output_ttl)
473 .unwrap_or_else(|_| chrono::Duration::seconds(0));
474 let rows = client
475 .query(
476 "SELECT tenant_id, id FROM bulk_export_jobs
477 WHERE status IN ('complete', 'error', 'cancelled')
478 AND completed_at IS NOT NULL AND completed_at < $1
479 ORDER BY completed_at LIMIT $2",
480 &[&cutoff, &(limit as i64)],
481 )
482 .await
483 .map_err(|e| internal_error(format!("Failed to query expired exports: {}", e)))?;
484 Ok(rows
485 .iter()
486 .map(|row| {
487 let tenant_id: String = row.get(0);
488 let id: String = row.get(1);
489 ExpiredExportRef {
490 tenant: TenantContext::new(
491 TenantId::new(tenant_id),
492 TenantPermissions::full_access(),
493 ),
494 job_id: ExportJobId::from_string(id),
495 }
496 })
497 .collect())
498 }
499
500 async fn list_exports(
501 &self,
502 tenant: &TenantContext,
503 include_completed: bool,
504 ) -> StorageResult<Vec<ExportProgress>> {
505 let client = self.get_client().await?;
506 let tenant_id = tenant.tenant_id().as_str();
507
508 let query = if include_completed {
509 "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 ORDER BY created_at DESC"
510 } else {
511 "SELECT id FROM bulk_export_jobs WHERE tenant_id = $1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
512 };
513
514 let rows = client
515 .query(query, &[&tenant_id])
516 .await
517 .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?;
518
519 let mut results = Vec::new();
520 for row in &rows {
521 let id: String = row.get(0);
522 let job_id = ExportJobId::from_string(id);
523 if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
524 results.push(progress);
525 }
526 }
527
528 Ok(results)
529 }
530}
531
532#[async_trait]
533impl ExportClaimStrategy for PostgresBackend {
534 async fn claim_next(
535 &self,
536 worker_id: &WorkerId,
537 lease_duration: StdDuration,
538 ) -> StorageResult<Option<ExportJobLease>> {
539 let mut client = self.get_client().await?;
540 let now = Utc::now();
541 let lease_expiry = now
542 + chrono::Duration::from_std(lease_duration)
543 .unwrap_or_else(|_| chrono::Duration::seconds(60));
544
545 let txn = client
546 .transaction()
547 .await
548 .map_err(|e| internal_error(format!("Failed to begin claim txn: {}", e)))?;
549
550 let rows = txn
551 .query(
552 "SELECT id, tenant_id, fencing_token FROM bulk_export_jobs
553 WHERE status = 'accepted'
554 OR (status = 'in-progress' AND (lease_expiry IS NULL OR lease_expiry < $1))
555 ORDER BY created_at
556 LIMIT 1
557 FOR UPDATE SKIP LOCKED",
558 &[&now],
559 )
560 .await
561 .map_err(|e| internal_error(format!("Failed to select claimable job: {}", e)))?;
562
563 let Some(row) = rows.first() else {
564 txn.commit()
565 .await
566 .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
567 return Ok(None);
568 };
569 let job_id: String = row.get(0);
570 let tenant_id: String = row.get(1);
571 let fencing_token: i64 = row.get(2);
572 let new_token = fencing_token + 1;
573
574 txn.execute(
575 "UPDATE bulk_export_jobs
576 SET status = 'in-progress', worker_id = $1, lease_expiry = $2,
577 heartbeat_at = $3, fencing_token = $4,
578 started_at = COALESCE(started_at, $3)
579 WHERE id = $5",
580 &[
581 &worker_id.as_str(),
582 &lease_expiry,
583 &now,
584 &new_token,
585 &job_id.as_str(),
586 ],
587 )
588 .await
589 .map_err(|e| internal_error(format!("Failed to claim export job: {}", e)))?;
590
591 txn.commit()
592 .await
593 .map_err(|e| internal_error(format!("Failed to commit claim txn: {}", e)))?;
594
595 Ok(Some(ExportJobLease {
596 job_id: ExportJobId::from_string(job_id),
597 tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
598 worker_id: worker_id.clone(),
599 lease_expiry,
600 fencing_token: new_token as u64,
601 }))
602 }
603
604 async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError> {
605 let client = self.get_client().await.map_err(LeaseError::Storage)?;
606 let now = Utc::now();
607 let new_expiry = now + chrono::Duration::seconds(60);
608 let affected = client
609 .execute(
610 "UPDATE bulk_export_jobs
611 SET lease_expiry = $1, heartbeat_at = $2
612 WHERE id = $3 AND worker_id = $4 AND fencing_token = $5",
613 &[
614 &new_expiry,
615 &now,
616 &lease.job_id.as_str(),
617 &lease.worker_id.as_str(),
618 &(lease.fencing_token as i64),
619 ],
620 )
621 .await
622 .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
623 if affected == 0 {
624 Err(LeaseError::LeaseLost {
625 job_id: lease.job_id.clone(),
626 })
627 } else {
628 Ok(new_expiry)
629 }
630 }
631
632 async fn release(&self, lease: ExportJobLease) -> StorageResult<()> {
633 let client = self.get_client().await?;
634 client
635 .execute(
636 "UPDATE bulk_export_jobs
637 SET status = 'accepted', worker_id = NULL, lease_expiry = NULL
638 WHERE id = $1 AND worker_id = $2 AND fencing_token = $3
639 AND status = 'in-progress'",
640 &[
641 &lease.job_id.as_str(),
642 &lease.worker_id.as_str(),
643 &(lease.fencing_token as i64),
644 ],
645 )
646 .await
647 .map_err(|e| internal_error(format!("Failed to release lease: {}", e)))?;
648 Ok(())
649 }
650}
651
652#[async_trait]
653impl ExportWorkerStorage for PostgresBackend {
654 async fn get_export_job_for_worker(
655 &self,
656 tenant: &TenantContext,
657 job_id: &ExportJobId,
658 worker_id: &WorkerId,
659 fencing_token: u64,
660 ) -> Result<WorkerJobView, LeaseError> {
661 let client = self.get_client().await.map_err(LeaseError::Storage)?;
662 let tenant_id = tenant.tenant_id().as_str();
663 let rows = client
664 .query(
665 "SELECT request_json, level, group_id, transaction_time, fhir_version
666 FROM bulk_export_jobs
667 WHERE id = $1 AND tenant_id = $2 AND worker_id = $3 AND fencing_token = $4",
668 &[
669 &job_id.as_str(),
670 &tenant_id,
671 &worker_id.as_str(),
672 &(fencing_token as i64),
673 ],
674 )
675 .await
676 .map_err(|e| LeaseError::Storage(internal_error(format!("load worker job: {e}"))))?;
677 let row = rows.first().ok_or_else(|| LeaseError::LeaseLost {
678 job_id: job_id.clone(),
679 })?;
680 let request_json: String = row.get(0);
681 let level_str: String = row.get(1);
682 let group_id: Option<String> = row.get(2);
683 let transaction_time: DateTime<Utc> = row.get(3);
684 let fhir_version_str: String = row.get(4);
685
686 let request: ExportRequest = serde_json::from_str(&request_json)
687 .map_err(|e| LeaseError::Storage(internal_error(format!("parse request_json: {e}"))))?;
688 let level = match level_str.as_str() {
689 "system" => ExportLevel::System,
690 "patient" => ExportLevel::Patient,
691 "group" => ExportLevel::Group {
692 group_id: group_id.unwrap_or_default(),
693 },
694 _ => {
695 return Err(LeaseError::Storage(internal_error(format!(
696 "Invalid level: {level_str}"
697 ))));
698 }
699 };
700 let fhir_version = helios_fhir::FhirVersion::from_mime_param(&fhir_version_str)
701 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
702
703 let progress_rows = client
704 .query(
705 "SELECT resource_type, total_count, exported_count, error_count, cursor_state
706 FROM bulk_export_progress WHERE job_id = $1",
707 &[&job_id.as_str()],
708 )
709 .await
710 .map_err(|e| LeaseError::Storage(internal_error(format!("query progress: {e}"))))?;
711 let type_progress: Vec<TypeExportProgress> = progress_rows
712 .iter()
713 .map(|r| TypeExportProgress {
714 resource_type: r.get(0),
715 total_count: r.get::<_, Option<i32>>(1).map(|v| v as u64),
716 exported_count: r.get::<_, i32>(2) as u64,
717 error_count: r.get::<_, i32>(3) as u64,
718 cursor_state: r.get(4),
719 })
720 .collect();
721
722 Ok(WorkerJobView {
723 request,
724 level,
725 transaction_time,
726 fhir_version,
727 type_progress,
728 })
729 }
730
731 async fn mark_export_in_progress(
732 &self,
733 tenant: &TenantContext,
734 job_id: &ExportJobId,
735 worker_id: &WorkerId,
736 fencing_token: u64,
737 ) -> Result<(), LeaseError> {
738 let client = self.get_client().await.map_err(LeaseError::Storage)?;
739 let now = Utc::now();
740 let affected = client
741 .execute(
742 "UPDATE bulk_export_jobs
743 SET status = 'in-progress', started_at = COALESCE(started_at, $1)
744 WHERE id = $2 AND tenant_id = $3 AND worker_id = $4 AND fencing_token = $5",
745 &[
746 &now,
747 &job_id.as_str(),
748 &tenant.tenant_id().as_str(),
749 &worker_id.as_str(),
750 &(fencing_token as i64),
751 ],
752 )
753 .await
754 .map_err(|e| LeaseError::Storage(internal_error(format!("mark_in_progress: {e}"))))?;
755 if affected == 0 {
756 Err(LeaseError::LeaseLost {
757 job_id: job_id.clone(),
758 })
759 } else {
760 Ok(())
761 }
762 }
763
764 async fn update_export_type_progress(
765 &self,
766 tenant: &TenantContext,
767 job_id: &ExportJobId,
768 worker_id: &WorkerId,
769 fencing_token: u64,
770 progress: &TypeExportProgress,
771 ) -> Result<(), LeaseError> {
772 let client = self.get_client().await.map_err(LeaseError::Storage)?;
773 let affected = client
774 .execute(
775 "INSERT INTO bulk_export_progress
776 (job_id, resource_type, total_count, exported_count, error_count, cursor_state)
777 SELECT $1, $2, $3, $4, $5, $6
778 WHERE EXISTS (
779 SELECT 1 FROM bulk_export_jobs
780 WHERE id = $1 AND tenant_id = $7 AND worker_id = $8 AND fencing_token = $9
781 )
782 ON CONFLICT (job_id, resource_type) DO UPDATE SET
783 total_count = EXCLUDED.total_count,
784 exported_count = EXCLUDED.exported_count,
785 error_count = EXCLUDED.error_count,
786 cursor_state = EXCLUDED.cursor_state",
787 &[
788 &job_id.as_str(),
789 &progress.resource_type.as_str(),
790 &progress.total_count.map(|v| v as i32),
791 &(progress.exported_count as i32),
792 &(progress.error_count as i32),
793 &progress.cursor_state,
794 &tenant.tenant_id().as_str(),
795 &worker_id.as_str(),
796 &(fencing_token as i64),
797 ],
798 )
799 .await
800 .map_err(|e| {
801 LeaseError::Storage(internal_error(format!("update_type_progress: {e}")))
802 })?;
803 if affected == 0 {
804 Err(LeaseError::LeaseLost {
805 job_id: job_id.clone(),
806 })
807 } else {
808 Ok(())
809 }
810 }
811
812 async fn record_export_file(
813 &self,
814 tenant: &TenantContext,
815 job_id: &ExportJobId,
816 worker_id: &WorkerId,
817 fencing_token: u64,
818 part: &FinalizedPart,
819 file_type: &str,
820 ) -> Result<(), LeaseError> {
821 let client = self.get_client().await.map_err(LeaseError::Storage)?;
822 let file_path = encode_part_path(&part.key);
823 let affected = client
824 .execute(
825 "INSERT INTO bulk_export_files
826 (job_id, resource_type, file_type, file_path, resource_count, byte_count,
827 part_index, fencing_token)
828 SELECT $1, $2, $3, $4, $5, $6, $7, $8
829 WHERE EXISTS (
830 SELECT 1 FROM bulk_export_jobs
831 WHERE id = $1 AND tenant_id = $9 AND worker_id = $10 AND fencing_token = $11
832 )
833 ON CONFLICT (job_id, file_type, resource_type, part_index) DO UPDATE SET
834 file_path = EXCLUDED.file_path,
835 resource_count = EXCLUDED.resource_count,
836 byte_count = EXCLUDED.byte_count,
837 fencing_token = EXCLUDED.fencing_token",
838 &[
839 &job_id.as_str(),
840 &part.resource_type.as_str(),
841 &file_type,
842 &file_path.as_str(),
843 &(part.line_count as i32),
844 &(part.size_bytes as i64),
845 &(part.key.part_index as i32),
846 &(part.key.fencing_token as i64),
847 &tenant.tenant_id().as_str(),
848 &worker_id.as_str(),
849 &(fencing_token as i64),
850 ],
851 )
852 .await
853 .map_err(|e| LeaseError::Storage(internal_error(format!("record_export_file: {e}"))))?;
854 if affected == 0 {
855 Err(LeaseError::LeaseLost {
856 job_id: job_id.clone(),
857 })
858 } else {
859 Ok(())
860 }
861 }
862
863 async fn finish_export_job(
864 &self,
865 tenant: &TenantContext,
866 job_id: &ExportJobId,
867 worker_id: &WorkerId,
868 fencing_token: u64,
869 ) -> Result<(), LeaseError> {
870 let client = self.get_client().await.map_err(LeaseError::Storage)?;
871 let now = Utc::now();
872 let affected = client
873 .execute(
874 "UPDATE bulk_export_jobs
875 SET status = 'complete', completed_at = $1
876 WHERE id = $2 AND tenant_id = $3 AND worker_id = $4 AND fencing_token = $5",
877 &[
878 &now,
879 &job_id.as_str(),
880 &tenant.tenant_id().as_str(),
881 &worker_id.as_str(),
882 &(fencing_token as i64),
883 ],
884 )
885 .await
886 .map_err(|e| LeaseError::Storage(internal_error(format!("finish_job: {e}"))))?;
887 if affected == 0 {
888 Err(LeaseError::LeaseLost {
889 job_id: job_id.clone(),
890 })
891 } else {
892 Ok(())
893 }
894 }
895
896 async fn fail_export_job(
897 &self,
898 tenant: &TenantContext,
899 job_id: &ExportJobId,
900 worker_id: &WorkerId,
901 fencing_token: u64,
902 error_message: &str,
903 ) -> Result<(), LeaseError> {
904 let client = self.get_client().await.map_err(LeaseError::Storage)?;
905 let now = Utc::now();
906 let affected = client
907 .execute(
908 "UPDATE bulk_export_jobs
909 SET status = 'error', error_message = $1, completed_at = $2
910 WHERE id = $3 AND tenant_id = $4 AND worker_id = $5 AND fencing_token = $6",
911 &[
912 &error_message,
913 &now,
914 &job_id.as_str(),
915 &tenant.tenant_id().as_str(),
916 &worker_id.as_str(),
917 &(fencing_token as i64),
918 ],
919 )
920 .await
921 .map_err(|e| LeaseError::Storage(internal_error(format!("fail_job: {e}"))))?;
922 if affected == 0 {
923 Err(LeaseError::LeaseLost {
924 job_id: job_id.clone(),
925 })
926 } else {
927 Ok(())
928 }
929 }
930}
931
932#[async_trait]
933impl ExportDataProvider for PostgresBackend {
934 async fn list_export_types(
935 &self,
936 tenant: &TenantContext,
937 request: &ExportRequest,
938 ) -> StorageResult<Vec<String>> {
939 let client = self.get_client().await?;
940 let tenant_id = tenant.tenant_id().as_str();
941
942 if !request.resource_types.is_empty() {
943 let mut valid_types = Vec::new();
944 for rt in &request.resource_types {
945 let row = client
946 .query_one(
947 "SELECT EXISTS(SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE LIMIT 1)",
948 &[&tenant_id, &rt.as_str()],
949 )
950 .await
951 .map_err(|e| internal_error(format!("Failed to check type: {}", e)))?;
952
953 let exists: bool = row.get(0);
954 if exists {
955 valid_types.push(rt.clone());
956 }
957 }
958 return Ok(valid_types);
959 }
960
961 let rows = client
962 .query(
963 "SELECT DISTINCT resource_type FROM resources
964 WHERE tenant_id = $1 AND is_deleted = FALSE
965 ORDER BY resource_type",
966 &[&tenant_id],
967 )
968 .await
969 .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?;
970
971 let types: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
972 Ok(types)
973 }
974
975 async fn count_export_resources(
976 &self,
977 tenant: &TenantContext,
978 request: &ExportRequest,
979 resource_type: &str,
980 ) -> StorageResult<u64> {
981 let client = self.get_client().await?;
982 let tenant_id = tenant.tenant_id().as_str();
983
984 let (sql, params): (
985 String,
986 Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
987 ) = if let Some(since) = request.since {
988 (
989 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND last_updated >= $3".to_string(),
990 vec![
991 Box::new(tenant_id.to_string()),
992 Box::new(resource_type.to_string()),
993 Box::new(since),
994 ],
995 )
996 } else {
997 (
998 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string(),
999 vec![
1000 Box::new(tenant_id.to_string()),
1001 Box::new(resource_type.to_string()),
1002 ],
1003 )
1004 };
1005
1006 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1007 .iter()
1008 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1009 .collect();
1010
1011 let row = client
1012 .query_one(&sql, ¶m_refs)
1013 .await
1014 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1015
1016 let count: i64 = row.get(0);
1017 Ok(count as u64)
1018 }
1019
1020 async fn fetch_export_batch(
1021 &self,
1022 tenant: &TenantContext,
1023 request: &ExportRequest,
1024 resource_type: &str,
1025 cursor: Option<&str>,
1026 batch_size: u32,
1027 ) -> StorageResult<NdjsonBatch> {
1028 let client = self.get_client().await?;
1029 let tenant_id = tenant.tenant_id().as_str();
1030
1031 let mut sql = "SELECT id, data, last_updated FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
1032 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1033 Box::new(tenant_id.to_string()),
1034 Box::new(resource_type.to_string()),
1035 ];
1036 let mut param_idx = 3;
1037
1038 if let Some(since) = request.since {
1039 sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
1040 params.push(Box::new(since));
1041 param_idx += 1;
1042 }
1043
1044 if let Some(cursor) = cursor {
1045 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1046 if parts.len() == 2 {
1047 if let Ok(dt) = DateTime::parse_from_rfc3339(parts[0]) {
1048 sql.push_str(&format!(
1049 " AND (last_updated, id) > (${}, ${})",
1050 param_idx,
1051 param_idx + 1
1052 ));
1053 params.push(Box::new(dt.with_timezone(&Utc)));
1054 params.push(Box::new(parts[1].to_string()));
1055 }
1056 }
1057 }
1058
1059 sql.push_str(&format!(
1060 " ORDER BY last_updated, id LIMIT {}",
1061 batch_size + 1
1062 ));
1063
1064 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1065 .iter()
1066 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1067 .collect();
1068
1069 let rows = client
1070 .query(&sql, ¶m_refs)
1071 .await
1072 .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?;
1073
1074 let has_more = rows.len() > batch_size as usize;
1075 let rows_to_process = if has_more {
1076 &rows[..batch_size as usize]
1077 } else {
1078 &rows[..]
1079 };
1080
1081 let mut lines = Vec::new();
1082 let mut last_cursor = None;
1083
1084 for row in rows_to_process {
1085 let id: String = row.get(0);
1086 let resource: Value = row.get(1);
1087 let last_updated: chrono::DateTime<Utc> = row.get(2);
1088
1089 let line = serde_json::to_string(&resource)
1090 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1091 lines.push(line);
1092 last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
1093 }
1094
1095 Ok(NdjsonBatch {
1096 lines,
1097 next_cursor: if has_more { last_cursor } else { None },
1098 is_last: !has_more,
1099 })
1100 }
1101}
1102
1103#[async_trait]
1104impl PatientExportProvider for PostgresBackend {
1105 async fn list_patient_ids(
1106 &self,
1107 tenant: &TenantContext,
1108 request: &ExportRequest,
1109 cursor: Option<&str>,
1110 batch_size: u32,
1111 ) -> StorageResult<(Vec<String>, Option<String>)> {
1112 let client = self.get_client().await?;
1113 let tenant_id = tenant.tenant_id().as_str();
1114
1115 let mut sql = "SELECT id FROM resources WHERE tenant_id = $1 AND resource_type = 'Patient' AND is_deleted = FALSE".to_string();
1116 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1117 vec![Box::new(tenant_id.to_string())];
1118 let mut param_idx = 2;
1119
1120 if let Some(since) = request.since {
1121 sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
1122 params.push(Box::new(since));
1123 param_idx += 1;
1124 }
1125
1126 if let Some(cursor) = cursor {
1127 sql.push_str(&format!(" AND id > ${}", param_idx));
1128 params.push(Box::new(cursor.to_string()));
1129 }
1130
1131 sql.push_str(&format!(" ORDER BY id LIMIT {}", batch_size + 1));
1132
1133 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1134 .iter()
1135 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1136 .collect();
1137
1138 let rows = client
1139 .query(&sql, ¶m_refs)
1140 .await
1141 .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?;
1142
1143 let mut ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1144
1145 let has_more = ids.len() > batch_size as usize;
1146 if has_more {
1147 ids.truncate(batch_size as usize);
1148 }
1149
1150 let next_cursor = if has_more { ids.last().cloned() } else { None };
1151
1152 Ok((ids, next_cursor))
1153 }
1154
1155 async fn fetch_patient_compartment_batch(
1156 &self,
1157 tenant: &TenantContext,
1158 request: &ExportRequest,
1159 resource_type: &str,
1160 patient_ids: &[String],
1161 cursor: Option<&str>,
1162 batch_size: u32,
1163 ) -> StorageResult<NdjsonBatch> {
1164 if patient_ids.is_empty() {
1165 return Ok(NdjsonBatch::empty());
1166 }
1167
1168 let client = self.get_client().await?;
1169 let tenant_id = tenant.tenant_id().as_str();
1170
1171 if resource_type == "Patient" {
1172 let mut sql = "SELECT id, data, last_updated FROM resources
1174 WHERE tenant_id = $1 AND resource_type = $2 AND id = ANY($3::text[]) AND is_deleted = FALSE".to_string();
1175
1176 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1177 Box::new(tenant_id.to_string()),
1178 Box::new(resource_type.to_string()),
1179 Box::new(patient_ids.to_vec()),
1180 ];
1181 let param_idx = 4;
1182
1183 if let Some(cursor) = cursor {
1184 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1185 if parts.len() == 2 {
1186 if let Ok(dt) = DateTime::parse_from_rfc3339(parts[0]) {
1187 sql.push_str(&format!(
1188 " AND (last_updated, id) > (${}, ${})",
1189 param_idx,
1190 param_idx + 1
1191 ));
1192 params.push(Box::new(dt.with_timezone(&Utc)));
1193 params.push(Box::new(parts[1].to_string()));
1194 }
1195 }
1196 }
1197
1198 sql.push_str(&format!(
1199 " ORDER BY last_updated, id LIMIT {}",
1200 batch_size + 1
1201 ));
1202
1203 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1204 .iter()
1205 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1206 .collect();
1207
1208 let rows = client
1209 .query(&sql, ¶m_refs)
1210 .await
1211 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
1212
1213 let has_more = rows.len() > batch_size as usize;
1214 let rows_slice = if has_more {
1215 &rows[..batch_size as usize]
1216 } else {
1217 &rows[..]
1218 };
1219
1220 let mut lines = Vec::new();
1221 let mut last_cursor = None;
1222
1223 for row in rows_slice {
1224 let id: String = row.get(0);
1225 let resource: Value = row.get(1);
1226 let last_updated: chrono::DateTime<Utc> = row.get(2);
1227
1228 let line = serde_json::to_string(&resource)
1229 .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
1230 lines.push(line);
1231 last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
1232 }
1233
1234 return Ok(NdjsonBatch {
1235 lines,
1236 next_cursor: if has_more { last_cursor } else { None },
1237 is_last: !has_more,
1238 });
1239 }
1240
1241 let patient_refs: Vec<String> = patient_ids
1248 .iter()
1249 .map(|id| format!("Patient/{}", id))
1250 .collect();
1251
1252 let mut sql = "SELECT id, data, last_updated FROM resources
1253 WHERE tenant_id = $1
1254 AND resource_type = $2
1255 AND is_deleted = FALSE
1256 AND ((data #>> '{subject,reference}') = ANY($3::text[])
1257 OR (data #>> '{patient,reference}') = ANY($3::text[]))"
1258 .to_string();
1259
1260 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1261 Box::new(tenant_id.to_string()),
1262 Box::new(resource_type.to_string()),
1263 Box::new(patient_refs),
1264 ];
1265 let mut param_idx = 4;
1266
1267 if let Some(since) = request.since {
1268 sql.push_str(&format!(" AND last_updated >= ${}", param_idx));
1269 params.push(Box::new(since));
1270 param_idx += 1;
1271 }
1272
1273 if let Some(cursor) = cursor {
1274 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1275 if parts.len() == 2 {
1276 if let Ok(dt) = DateTime::parse_from_rfc3339(parts[0]) {
1277 sql.push_str(&format!(
1278 " AND (last_updated, id) > (${}, ${})",
1279 param_idx,
1280 param_idx + 1
1281 ));
1282 params.push(Box::new(dt.with_timezone(&Utc)));
1283 params.push(Box::new(parts[1].to_string()));
1284 }
1285 }
1286 }
1287
1288 sql.push_str(&format!(
1289 " ORDER BY last_updated, id LIMIT {}",
1290 batch_size + 1
1291 ));
1292
1293 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
1294 .iter()
1295 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1296 .collect();
1297
1298 let rows = client
1299 .query(&sql, ¶m_refs)
1300 .await
1301 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?;
1302
1303 let has_more = rows.len() > batch_size as usize;
1304 let rows_slice = if has_more {
1305 &rows[..batch_size as usize]
1306 } else {
1307 &rows[..]
1308 };
1309
1310 let mut lines = Vec::new();
1311 let mut last_cursor = None;
1312
1313 for row in rows_slice {
1314 let id: String = row.get(0);
1315 let resource: Value = row.get(1);
1316 let last_updated: chrono::DateTime<Utc> = row.get(2);
1317
1318 let line = serde_json::to_string(&resource)
1319 .map_err(|e| internal_error(format!("Failed to serialize: {}", e)))?;
1320 lines.push(line);
1321 last_cursor = Some(format!("{}|{}", last_updated.to_rfc3339(), id));
1322 }
1323
1324 Ok(NdjsonBatch {
1325 lines,
1326 next_cursor: if has_more { last_cursor } else { None },
1327 is_last: !has_more,
1328 })
1329 }
1330}
1331
1332#[async_trait]
1333impl GroupExportProvider for PostgresBackend {
1334 async fn get_group_members(
1335 &self,
1336 tenant: &TenantContext,
1337 group_id: &str,
1338 ) -> StorageResult<Vec<String>> {
1339 let client = self.get_client().await?;
1340 let tenant_id = tenant.tenant_id().as_str();
1341
1342 let rows = client
1343 .query(
1344 "SELECT data FROM resources WHERE tenant_id = $1 AND resource_type = 'Group' AND id = $2 AND is_deleted = FALSE",
1345 &[&tenant_id, &group_id],
1346 )
1347 .await
1348 .map_err(|e| internal_error(format!("Failed to fetch group: {}", e)))?;
1349
1350 if rows.is_empty() {
1351 return Ok(Vec::new());
1352 }
1353
1354 let data: Value = rows[0].get(0);
1355
1356 let mut member_refs = Vec::new();
1358 if let Some(members) = data.get("member").and_then(|m| m.as_array()) {
1359 for member in members {
1360 if let Some(reference) = member
1361 .get("entity")
1362 .and_then(|e| e.get("reference"))
1363 .and_then(|r| r.as_str())
1364 {
1365 member_refs.push(reference.to_string());
1366 }
1367 }
1368 }
1369
1370 Ok(member_refs)
1371 }
1372
1373 async fn resolve_group_patient_ids(
1374 &self,
1375 tenant: &TenantContext,
1376 group_id: &str,
1377 ) -> StorageResult<Vec<String>> {
1378 use std::collections::HashSet;
1381 let mut visited_groups: HashSet<String> = HashSet::new();
1382 let mut seen_patients: HashSet<String> = HashSet::new();
1383 let mut patient_ids: Vec<String> = Vec::new();
1384 let mut worklist: Vec<String> = vec![group_id.to_string()];
1385
1386 while let Some(gid) = worklist.pop() {
1387 if !visited_groups.insert(gid.clone()) {
1388 continue; }
1390 let members = self.get_group_members(tenant, &gid).await?;
1391 for member_ref in &members {
1392 if let Some(id) = member_ref.strip_prefix("Patient/") {
1393 if seen_patients.insert(id.to_string()) {
1394 patient_ids.push(id.to_string());
1395 }
1396 } else if let Some(nested) = member_ref.strip_prefix("Group/") {
1397 worklist.push(nested.to_string());
1398 }
1399 }
1400 }
1401
1402 Ok(patient_ids)
1403 }
1404
1405 async fn get_group_members_with_periods(
1406 &self,
1407 tenant: &TenantContext,
1408 group_id: &str,
1409 ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
1410 let client = self.get_client().await?;
1411 let tenant_id = tenant.tenant_id().as_str();
1412 let rows = client
1413 .query(
1414 "SELECT data FROM resources
1415 WHERE tenant_id = $1 AND resource_type = 'Group'
1416 AND id = $2 AND is_deleted = false",
1417 &[&tenant_id, &group_id],
1418 )
1419 .await
1420 .map_err(|e| internal_error(format!("Failed to get group: {}", e)))?;
1421 let row = rows.first().ok_or_else(|| {
1422 StorageError::BulkExport(BulkExportError::GroupNotFound {
1423 group_id: group_id.to_string(),
1424 })
1425 })?;
1426 let data: Vec<u8> = row.get(0);
1427 let group: Value = serde_json::from_slice(&data)
1428 .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
1429 let mut out = Vec::new();
1430 if let Some(arr) = group.get("member").and_then(|m| m.as_array()) {
1431 for member in arr {
1432 let Some(reference) = member
1433 .get("entity")
1434 .and_then(|e| e.get("reference"))
1435 .and_then(|r| r.as_str())
1436 else {
1437 continue;
1438 };
1439 let period_start = member
1440 .get("period")
1441 .and_then(|p| p.get("start"))
1442 .and_then(|s| s.as_str())
1443 .and_then(|s| {
1444 DateTime::parse_from_rfc3339(s)
1445 .ok()
1446 .map(|dt| dt.with_timezone(&Utc))
1447 });
1448 out.push((reference.to_string(), period_start));
1449 }
1450 }
1451 Ok(out)
1452 }
1453}