1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use rusqlite::params;
6use serde_json::Value;
7use std::time::Duration as StdDuration;
8use tokio::sync::Mutex;
9
10use crate::core::bulk_export::{
11 BulkExportStorage, ExpiredExportRef, ExportDataProvider, ExportFileMetadata, ExportJobId,
12 ExportJobMetadata, ExportLevel, ExportProgress, ExportRequest, ExportStatus,
13 GroupExportProvider, NdjsonBatch, PatientExportProvider, RawExportManifest, RawManifestEntry,
14 StartExportInput, TypeExportProgress,
15};
16use crate::core::bulk_export_output::{ExportPartKey, FinalizedPart};
17use crate::core::bulk_export_worker::{
18 ExportClaimStrategy, ExportJobLease, ExportWorkerStorage, LeaseError, WorkerId, WorkerJobView,
19};
20use crate::error::{BackendError, BulkExportError, StorageError, StorageResult};
21use crate::tenant::{TenantContext, TenantId, TenantPermissions};
22
23use super::SqliteBackend;
24
25static CLAIM_LOCK: Mutex<()> = Mutex::const_new(());
28
29fn parse_dt(s: &str) -> StorageResult<DateTime<Utc>> {
31 DateTime::parse_from_rfc3339(s)
32 .map(|dt| dt.with_timezone(&Utc))
33 .map_err(|e| internal_error(format!("invalid timestamp '{s}': {e}")))
34}
35
36fn parse_dt_opt(s: Option<String>) -> Option<DateTime<Utc>> {
38 s.and_then(|s| {
39 DateTime::parse_from_rfc3339(&s)
40 .ok()
41 .map(|dt| dt.with_timezone(&Utc))
42 })
43}
44
45fn parse_part_segment(part: &str) -> Option<(String, u32)> {
47 let idx = part.rfind('-')?;
48 let resource_type = &part[..idx];
49 let part_index: u32 = part[idx + 1..].parse().ok()?;
50 if resource_type.is_empty() {
51 return None;
52 }
53 Some((resource_type.to_string(), part_index))
54}
55
56fn internal_error(message: String) -> StorageError {
57 StorageError::Backend(BackendError::Internal {
58 backend_name: "sqlite".to_string(),
59 message,
60 source: None,
61 })
62}
63
64#[async_trait]
65impl BulkExportStorage for SqliteBackend {
66 async fn start_export(
67 &self,
68 tenant: &TenantContext,
69 input: StartExportInput,
70 ) -> StorageResult<ExportJobId> {
71 let conn = self.get_connection()?;
72 let tenant_id = tenant.tenant_id().as_str();
73
74 let job_id = ExportJobId::new();
75 let now = Utc::now().to_rfc3339();
76 let transaction_time = input.transaction_time.to_rfc3339();
77
78 let level_str = match &input.request.level {
79 ExportLevel::System => "system".to_string(),
80 ExportLevel::Patient => "patient".to_string(),
81 ExportLevel::Group { .. } => "group".to_string(),
82 };
83
84 let group_id = input.request.group_id().map(|s| s.to_string());
85
86 let request_json = serde_json::to_string(&input.request)
87 .map_err(|e| internal_error(format!("Failed to serialize request: {}", e)))?;
88
89 conn.execute(
90 "INSERT INTO bulk_export_jobs
91 (id, tenant_id, status, level, group_id, request_json, transaction_time,
92 created_at, owner_subject, request_url, fhir_version, fencing_token)
93 VALUES (?1, ?2, 'accepted', ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, 0)",
94 params![
95 job_id.as_str(),
96 tenant_id,
97 level_str,
98 group_id,
99 request_json,
100 transaction_time,
101 now,
102 input.owner_subject,
103 input.request_url,
104 input.fhir_version.as_mime_param(),
105 ],
106 )
107 .map_err(|e| internal_error(format!("Failed to create export job: {}", e)))?;
108
109 Ok(job_id)
110 }
111
112 async fn get_export_status(
113 &self,
114 tenant: &TenantContext,
115 job_id: &ExportJobId,
116 ) -> StorageResult<ExportProgress> {
117 let conn = self.get_connection()?;
118 let tenant_id = tenant.tenant_id().as_str();
119
120 let (status_str, level_str, group_id, transaction_time, started_at, completed_at, error_message, current_type):
121 (String, String, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<String>) = conn
122 .query_row(
123 "SELECT status, level, group_id, transaction_time, started_at, completed_at, error_message, current_type
124 FROM bulk_export_jobs
125 WHERE id = ?1 AND tenant_id = ?2",
126 params![job_id.as_str(), tenant_id],
127 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?,
128 row.get(4)?, row.get(5)?, row.get(6)?, row.get(7)?)),
129 )
130 .map_err(|e| {
131 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
132 StorageError::BulkExport(BulkExportError::JobNotFound {
133 job_id: job_id.to_string(),
134 })
135 } else {
136 internal_error(format!("Failed to get export status: {}", e))
137 }
138 })?;
139
140 let status: ExportStatus = status_str
141 .parse()
142 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
143
144 let level = match level_str.as_str() {
145 "system" => ExportLevel::System,
146 "patient" => ExportLevel::Patient,
147 "group" => ExportLevel::Group {
148 group_id: group_id.unwrap_or_default(),
149 },
150 _ => {
151 return Err(internal_error(format!(
152 "Invalid level in database: {}",
153 level_str
154 )));
155 }
156 };
157
158 let transaction_time = chrono::DateTime::parse_from_rfc3339(&transaction_time)
159 .map_err(|e| internal_error(format!("Invalid transaction_time: {}", e)))?
160 .with_timezone(&Utc);
161
162 let started_at = started_at.and_then(|s| {
163 chrono::DateTime::parse_from_rfc3339(&s)
164 .ok()
165 .map(|dt| dt.with_timezone(&Utc))
166 });
167
168 let completed_at = completed_at.and_then(|s| {
169 chrono::DateTime::parse_from_rfc3339(&s)
170 .ok()
171 .map(|dt| dt.with_timezone(&Utc))
172 });
173
174 let mut stmt = conn
176 .prepare(
177 "SELECT resource_type, total_count, exported_count, error_count, cursor_state
178 FROM bulk_export_progress
179 WHERE job_id = ?1",
180 )
181 .map_err(|e| internal_error(format!("Failed to prepare progress query: {}", e)))?;
182
183 let type_progress: Vec<TypeExportProgress> = stmt
184 .query_map(params![job_id.as_str()], |row| {
185 Ok(TypeExportProgress {
186 resource_type: row.get(0)?,
187 total_count: row.get::<_, Option<i64>>(1)?.map(|v| v as u64),
188 exported_count: row.get::<_, i64>(2)? as u64,
189 error_count: row.get::<_, i64>(3)? as u64,
190 cursor_state: row.get(4)?,
191 })
192 })
193 .map_err(|e| internal_error(format!("Failed to query progress: {}", e)))?
194 .filter_map(|r| r.ok())
195 .collect();
196
197 Ok(ExportProgress {
198 job_id: job_id.clone(),
199 status,
200 level,
201 transaction_time,
202 started_at,
203 completed_at,
204 type_progress,
205 current_type,
206 error_message,
207 })
208 }
209
210 async fn cancel_export(
211 &self,
212 tenant: &TenantContext,
213 job_id: &ExportJobId,
214 ) -> StorageResult<()> {
215 let conn = self.get_connection()?;
216 let tenant_id = tenant.tenant_id().as_str();
217
218 let current_status: String = conn
220 .query_row(
221 "SELECT status FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
222 params![job_id.as_str(), tenant_id],
223 |row| row.get(0),
224 )
225 .map_err(|e| {
226 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
227 StorageError::BulkExport(BulkExportError::JobNotFound {
228 job_id: job_id.to_string(),
229 })
230 } else {
231 internal_error(format!("Failed to get export status: {}", e))
232 }
233 })?;
234
235 let status: ExportStatus = current_status.parse().map_err(|_| {
236 internal_error(format!("Invalid status in database: {}", current_status))
237 })?;
238
239 if status.is_terminal() {
240 return Err(StorageError::BulkExport(BulkExportError::InvalidJobState {
241 job_id: job_id.to_string(),
242 expected: "accepted or in-progress".to_string(),
243 actual: current_status,
244 }));
245 }
246
247 let now = Utc::now().to_rfc3339();
248 conn.execute(
249 "UPDATE bulk_export_jobs SET status = 'cancelled', completed_at = ?1 WHERE id = ?2",
250 params![now, job_id.as_str()],
251 )
252 .map_err(|e| internal_error(format!("Failed to cancel export: {}", e)))?;
253
254 Ok(())
255 }
256
257 async fn delete_export(
258 &self,
259 tenant: &TenantContext,
260 job_id: &ExportJobId,
261 ) -> StorageResult<()> {
262 let conn = self.get_connection()?;
263 let tenant_id = tenant.tenant_id().as_str();
264
265 let exists: bool = conn
267 .query_row(
268 "SELECT 1 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
269 params![job_id.as_str(), tenant_id],
270 |_| Ok(true),
271 )
272 .unwrap_or(false);
273
274 if !exists {
275 return Err(StorageError::BulkExport(BulkExportError::JobNotFound {
276 job_id: job_id.to_string(),
277 }));
278 }
279
280 conn.execute(
282 "DELETE FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
283 params![job_id.as_str(), tenant_id],
284 )
285 .map_err(|e| internal_error(format!("Failed to delete export: {}", e)))?;
286
287 Ok(())
288 }
289
290 async fn get_export_manifest(
291 &self,
292 tenant: &TenantContext,
293 job_id: &ExportJobId,
294 ) -> StorageResult<RawExportManifest> {
295 let conn = self.get_connection()?;
296 let tenant_id = tenant.tenant_id().as_str();
297
298 let (status_str, transaction_time, request_url, error_message, completed_at): (
299 String,
300 String,
301 String,
302 Option<String>,
303 Option<String>,
304 ) = conn
305 .query_row(
306 "SELECT status, transaction_time, request_url, error_message, completed_at
307 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
308 params![job_id.as_str(), tenant_id],
309 |row| {
310 Ok((
311 row.get(0)?,
312 row.get(1)?,
313 row.get(2)?,
314 row.get(3)?,
315 row.get(4)?,
316 ))
317 },
318 )
319 .map_err(|e| {
320 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
321 StorageError::BulkExport(BulkExportError::JobNotFound {
322 job_id: job_id.to_string(),
323 })
324 } else {
325 internal_error(format!("Failed to get export job: {}", e))
326 }
327 })?;
328
329 let status: ExportStatus = status_str
330 .parse()
331 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
332
333 let mut stmt = conn
335 .prepare(
336 "SELECT resource_type, resource_count, file_type, part_index, fencing_token
337 FROM bulk_export_files
338 WHERE job_id = ?1
339 ORDER BY file_type, resource_type, part_index",
340 )
341 .map_err(|e| internal_error(format!("Failed to prepare files query: {}", e)))?;
342
343 let rows: Vec<(String, i64, String, i64, i64)> = stmt
344 .query_map(params![job_id.as_str()], |row| {
345 Ok((
346 row.get(0)?,
347 row.get::<_, Option<i64>>(1)?.unwrap_or(0),
348 row.get(2)?,
349 row.get(3)?,
350 row.get(4)?,
351 ))
352 })
353 .map_err(|e| internal_error(format!("Failed to query files: {}", e)))?
354 .filter_map(|r| r.ok())
355 .collect();
356
357 let mut output = Vec::new();
358 let mut errors = Vec::new();
359 for (resource_type, count, file_type, part_index, fencing_token) in rows {
360 let key = ExportPartKey {
361 tenant_id: tenant_id.to_string(),
362 job_id: job_id.clone(),
363 resource_type: resource_type.clone(),
364 file_type: file_type.clone(),
365 part_index: part_index as u32,
366 fencing_token: fencing_token as u64,
367 };
368 let entry = RawManifestEntry {
369 resource_type,
370 key,
371 count: count as u64,
372 };
373 if file_type == "error" {
374 errors.push(entry);
375 } else {
376 output.push(entry);
377 }
378 }
379
380 Ok(RawExportManifest {
381 transaction_time: parse_dt(&transaction_time)?,
382 request_url,
383 status,
384 error_message,
385 completed_at: parse_dt_opt(completed_at),
386 output,
387 errors,
388 })
389 }
390
391 async fn list_exports(
392 &self,
393 tenant: &TenantContext,
394 include_completed: bool,
395 ) -> StorageResult<Vec<ExportProgress>> {
396 let job_ids: Vec<String> = {
398 let conn = self.get_connection()?;
399 let tenant_id = tenant.tenant_id().as_str();
400
401 let query = if include_completed {
402 "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 ORDER BY created_at DESC"
403 } else {
404 "SELECT id FROM bulk_export_jobs WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress') ORDER BY created_at DESC"
405 };
406
407 let mut stmt = conn
408 .prepare(query)
409 .map_err(|e| internal_error(format!("Failed to prepare list query: {}", e)))?;
410
411 stmt.query_map(params![tenant_id], |row| row.get(0))
412 .map_err(|e| internal_error(format!("Failed to query exports: {}", e)))?
413 .filter_map(|r| r.ok())
414 .collect()
415 };
416
417 let mut results = Vec::new();
418 for id in job_ids {
419 let job_id = ExportJobId::from_string(id);
420 if let Ok(progress) = self.get_export_status(tenant, &job_id).await {
421 results.push(progress);
422 }
423 }
424
425 Ok(results)
426 }
427
428 async fn get_export_job_metadata(
429 &self,
430 tenant: &TenantContext,
431 job_id: &ExportJobId,
432 ) -> StorageResult<ExportJobMetadata> {
433 let conn = self.get_connection()?;
434 let tenant_id = tenant.tenant_id().as_str();
435
436 let (status_str, level_str, group_id, owner_subject, transaction_time, completed_at, request_url): (
437 String,
438 String,
439 Option<String>,
440 Option<String>,
441 String,
442 Option<String>,
443 String,
444 ) = conn
445 .query_row(
446 "SELECT status, level, group_id, owner_subject, transaction_time, completed_at, request_url
447 FROM bulk_export_jobs WHERE id = ?1 AND tenant_id = ?2",
448 params![job_id.as_str(), tenant_id],
449 |row| {
450 Ok((
451 row.get(0)?,
452 row.get(1)?,
453 row.get(2)?,
454 row.get(3)?,
455 row.get(4)?,
456 row.get(5)?,
457 row.get(6)?,
458 ))
459 },
460 )
461 .map_err(|e| {
462 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
463 StorageError::BulkExport(BulkExportError::JobNotFound {
464 job_id: job_id.to_string(),
465 })
466 } else {
467 internal_error(format!("Failed to get export job metadata: {}", e))
468 }
469 })?;
470
471 let status: ExportStatus = status_str
472 .parse()
473 .map_err(|_| internal_error(format!("Invalid status in database: {}", status_str)))?;
474 let level = match level_str.as_str() {
475 "system" => ExportLevel::System,
476 "patient" => ExportLevel::Patient,
477 "group" => ExportLevel::Group {
478 group_id: group_id.unwrap_or_default(),
479 },
480 _ => return Err(internal_error(format!("Invalid level: {}", level_str))),
481 };
482
483 Ok(ExportJobMetadata {
484 job_id: job_id.clone(),
485 status,
486 level,
487 owner_subject,
488 transaction_time: parse_dt(&transaction_time)?,
489 completed_at: parse_dt_opt(completed_at),
490 request_url,
491 })
492 }
493
494 async fn get_export_file_metadata(
495 &self,
496 tenant: &TenantContext,
497 job_id: &ExportJobId,
498 part: &str,
499 ) -> StorageResult<ExportFileMetadata> {
500 let (resource_type, part_index) = parse_part_segment(part).ok_or_else(|| {
501 StorageError::BulkExport(BulkExportError::JobNotFound {
502 job_id: format!("{job_id}/{part}"),
503 })
504 })?;
505
506 let conn = self.get_connection()?;
507 let tenant_id = tenant.tenant_id().as_str();
508
509 let (file_type, resource_count, fencing_token, owner_subject): (
510 String,
511 i64,
512 i64,
513 Option<String>,
514 ) = conn
515 .query_row(
516 "SELECT f.file_type, f.resource_count, f.fencing_token, j.owner_subject
517 FROM bulk_export_files f
518 JOIN bulk_export_jobs j ON j.id = f.job_id
519 WHERE f.job_id = ?1 AND j.tenant_id = ?2
520 AND f.resource_type = ?3 AND f.part_index = ?4",
521 params![job_id.as_str(), tenant_id, resource_type, part_index as i64],
522 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
523 )
524 .map_err(|e| {
525 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
526 StorageError::BulkExport(BulkExportError::JobNotFound {
527 job_id: format!("{job_id}/{part}"),
528 })
529 } else {
530 internal_error(format!("Failed to get export file metadata: {}", e))
531 }
532 })?;
533
534 let key = ExportPartKey {
535 tenant_id: tenant_id.to_string(),
536 job_id: job_id.clone(),
537 resource_type: resource_type.clone(),
538 file_type: file_type.clone(),
539 part_index,
540 fencing_token: fencing_token as u64,
541 };
542
543 Ok(ExportFileMetadata {
544 key,
545 resource_type,
546 file_type,
547 line_count: resource_count as u64,
548 job_owner_subject: owner_subject,
549 })
550 }
551
552 async fn count_active_exports(&self, tenant: &TenantContext) -> StorageResult<u64> {
553 let conn = self.get_connection()?;
554 let tenant_id = tenant.tenant_id().as_str();
555 let count: i64 = conn
556 .query_row(
557 "SELECT COUNT(*) FROM bulk_export_jobs
558 WHERE tenant_id = ?1 AND status IN ('accepted', 'in-progress')",
559 params![tenant_id],
560 |row| row.get(0),
561 )
562 .map_err(|e| internal_error(format!("Failed to count active exports: {}", e)))?;
563 Ok(count as u64)
564 }
565
566 async fn list_expired_exports(
567 &self,
568 now: DateTime<Utc>,
569 output_ttl: StdDuration,
570 limit: u32,
571 ) -> StorageResult<Vec<ExpiredExportRef>> {
572 let conn = self.get_connection()?;
573 let cutoff = (now
574 - chrono::Duration::from_std(output_ttl)
575 .unwrap_or_else(|_| chrono::Duration::seconds(0)))
576 .to_rfc3339();
577
578 let mut stmt = conn
579 .prepare(
580 "SELECT tenant_id, id FROM bulk_export_jobs
581 WHERE status IN ('complete', 'error', 'cancelled')
582 AND completed_at IS NOT NULL AND completed_at < ?1
583 ORDER BY completed_at LIMIT ?2",
584 )
585 .map_err(|e| internal_error(format!("Failed to prepare expired query: {}", e)))?;
586
587 let rows: Vec<(String, String)> = stmt
588 .query_map(params![cutoff, limit], |row| Ok((row.get(0)?, row.get(1)?)))
589 .map_err(|e| internal_error(format!("Failed to query expired exports: {}", e)))?
590 .filter_map(|r| r.ok())
591 .collect();
592
593 Ok(rows
594 .into_iter()
595 .map(|(tenant_id, id)| ExpiredExportRef {
596 tenant: TenantContext::new(
597 TenantId::new(tenant_id),
598 TenantPermissions::full_access(),
599 ),
600 job_id: ExportJobId::from_string(id),
601 })
602 .collect())
603 }
604}
605
606fn encode_part_path(key: &ExportPartKey) -> String {
608 format!(
609 "{}/{}/{}/{}-{}-{}",
610 key.tenant_id,
611 key.job_id,
612 key.file_type,
613 key.resource_type,
614 key.part_index,
615 key.fencing_token
616 )
617}
618
619#[async_trait]
620impl ExportClaimStrategy for SqliteBackend {
621 async fn claim_next(
622 &self,
623 worker_id: &WorkerId,
624 lease_duration: StdDuration,
625 ) -> StorageResult<Option<ExportJobLease>> {
626 let _guard = CLAIM_LOCK.lock().await;
627 let conn = self.get_connection()?;
628 let now = Utc::now();
629 let now_str = now.to_rfc3339();
630 let lease_expiry = now
631 + chrono::Duration::from_std(lease_duration)
632 .unwrap_or_else(|_| chrono::Duration::seconds(60));
633 let lease_expiry_str = lease_expiry.to_rfc3339();
634
635 let row: Option<(String, String, i64)> = conn
637 .query_row(
638 "SELECT id, tenant_id, fencing_token FROM bulk_export_jobs
639 WHERE status = 'accepted'
640 OR (status = 'in-progress' AND (lease_expiry IS NULL OR lease_expiry < ?1))
641 ORDER BY created_at LIMIT 1",
642 params![now_str],
643 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
644 )
645 .ok();
646
647 let Some((job_id, tenant_id, fencing_token)) = row else {
648 return Ok(None);
649 };
650 let new_token = fencing_token + 1;
651
652 conn.execute(
653 "UPDATE bulk_export_jobs
654 SET status = 'in-progress', worker_id = ?1, lease_expiry = ?2,
655 heartbeat_at = ?3, fencing_token = ?4,
656 started_at = COALESCE(started_at, ?3)
657 WHERE id = ?5",
658 params![
659 worker_id.as_str(),
660 lease_expiry_str,
661 now_str,
662 new_token,
663 job_id
664 ],
665 )
666 .map_err(|e| internal_error(format!("Failed to claim export job: {}", e)))?;
667
668 Ok(Some(ExportJobLease {
669 job_id: ExportJobId::from_string(job_id),
670 tenant: TenantContext::new(TenantId::new(tenant_id), TenantPermissions::full_access()),
671 worker_id: worker_id.clone(),
672 lease_expiry,
673 fencing_token: new_token as u64,
674 }))
675 }
676
677 async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError> {
678 let conn = self.get_connection().map_err(LeaseError::Storage)?;
679 let now = Utc::now();
680 let new_expiry = now + chrono::Duration::seconds(60);
681 let affected = conn
682 .execute(
683 "UPDATE bulk_export_jobs
684 SET lease_expiry = ?1, heartbeat_at = ?2
685 WHERE id = ?3 AND worker_id = ?4 AND fencing_token = ?5",
686 params![
687 new_expiry.to_rfc3339(),
688 now.to_rfc3339(),
689 lease.job_id.as_str(),
690 lease.worker_id.as_str(),
691 lease.fencing_token as i64
692 ],
693 )
694 .map_err(|e| LeaseError::Storage(internal_error(format!("heartbeat failed: {e}"))))?;
695 if affected == 0 {
696 Err(LeaseError::LeaseLost {
697 job_id: lease.job_id.clone(),
698 })
699 } else {
700 Ok(new_expiry)
701 }
702 }
703
704 async fn release(&self, lease: ExportJobLease) -> StorageResult<()> {
705 let conn = self.get_connection()?;
706 conn.execute(
707 "UPDATE bulk_export_jobs
708 SET status = 'accepted', worker_id = NULL, lease_expiry = NULL
709 WHERE id = ?1 AND worker_id = ?2 AND fencing_token = ?3
710 AND status = 'in-progress'",
711 params![
712 lease.job_id.as_str(),
713 lease.worker_id.as_str(),
714 lease.fencing_token as i64
715 ],
716 )
717 .map_err(|e| internal_error(format!("Failed to release lease: {}", e)))?;
718 Ok(())
719 }
720}
721
722#[async_trait]
723impl ExportWorkerStorage for SqliteBackend {
724 async fn get_export_job_for_worker(
725 &self,
726 tenant: &TenantContext,
727 job_id: &ExportJobId,
728 worker_id: &WorkerId,
729 fencing_token: u64,
730 ) -> Result<WorkerJobView, LeaseError> {
731 let conn = self.get_connection().map_err(LeaseError::Storage)?;
732 let tenant_id = tenant.tenant_id().as_str();
733
734 let (request_json, level_str, group_id, transaction_time, fhir_version): (
735 String,
736 String,
737 Option<String>,
738 String,
739 String,
740 ) = conn
741 .query_row(
742 "SELECT request_json, level, group_id, transaction_time, fhir_version
743 FROM bulk_export_jobs
744 WHERE id = ?1 AND tenant_id = ?2 AND worker_id = ?3 AND fencing_token = ?4",
745 params![
746 job_id.as_str(),
747 tenant_id,
748 worker_id.as_str(),
749 fencing_token as i64
750 ],
751 |row| {
752 Ok((
753 row.get(0)?,
754 row.get(1)?,
755 row.get(2)?,
756 row.get(3)?,
757 row.get(4)?,
758 ))
759 },
760 )
761 .map_err(|e| match e {
762 rusqlite::Error::QueryReturnedNoRows => LeaseError::LeaseLost {
763 job_id: job_id.clone(),
764 },
765 other => LeaseError::Storage(internal_error(format!(
766 "Failed to load worker job: {other}"
767 ))),
768 })?;
769
770 let request: ExportRequest = serde_json::from_str(&request_json).map_err(|e| {
771 LeaseError::Storage(internal_error(format!("Failed to parse request_json: {e}")))
772 })?;
773 let level = match level_str.as_str() {
774 "system" => ExportLevel::System,
775 "patient" => ExportLevel::Patient,
776 "group" => ExportLevel::Group {
777 group_id: group_id.unwrap_or_default(),
778 },
779 _ => {
780 return Err(LeaseError::Storage(internal_error(format!(
781 "Invalid level: {level_str}"
782 ))));
783 }
784 };
785 let fhir_version = helios_fhir::FhirVersion::from_mime_param(&fhir_version)
786 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
787 let transaction_time = parse_dt(&transaction_time).map_err(LeaseError::Storage)?;
788
789 let mut stmt = conn
791 .prepare(
792 "SELECT resource_type, total_count, exported_count, error_count, cursor_state
793 FROM bulk_export_progress WHERE job_id = ?1",
794 )
795 .map_err(|e| LeaseError::Storage(internal_error(format!("prepare progress: {e}"))))?;
796 let type_progress: Vec<TypeExportProgress> = stmt
797 .query_map(params![job_id.as_str()], |row| {
798 Ok(TypeExportProgress {
799 resource_type: row.get(0)?,
800 total_count: row.get::<_, Option<i64>>(1)?.map(|v| v as u64),
801 exported_count: row.get::<_, i64>(2)? as u64,
802 error_count: row.get::<_, i64>(3)? as u64,
803 cursor_state: row.get(4)?,
804 })
805 })
806 .map_err(|e| LeaseError::Storage(internal_error(format!("query progress: {e}"))))?
807 .filter_map(|r| r.ok())
808 .collect();
809
810 Ok(WorkerJobView {
811 request,
812 level,
813 transaction_time,
814 fhir_version,
815 type_progress,
816 })
817 }
818
819 async fn mark_export_in_progress(
820 &self,
821 tenant: &TenantContext,
822 job_id: &ExportJobId,
823 worker_id: &WorkerId,
824 fencing_token: u64,
825 ) -> Result<(), LeaseError> {
826 let conn = self.get_connection().map_err(LeaseError::Storage)?;
827 let now = Utc::now().to_rfc3339();
828 let affected = conn
829 .execute(
830 "UPDATE bulk_export_jobs
831 SET status = 'in-progress', started_at = COALESCE(started_at, ?1)
832 WHERE id = ?2 AND tenant_id = ?3 AND worker_id = ?4 AND fencing_token = ?5",
833 params![
834 now,
835 job_id.as_str(),
836 tenant.tenant_id().as_str(),
837 worker_id.as_str(),
838 fencing_token as i64
839 ],
840 )
841 .map_err(|e| LeaseError::Storage(internal_error(format!("mark_in_progress: {e}"))))?;
842 if affected == 0 {
843 Err(LeaseError::LeaseLost {
844 job_id: job_id.clone(),
845 })
846 } else {
847 Ok(())
848 }
849 }
850
851 async fn update_export_type_progress(
852 &self,
853 tenant: &TenantContext,
854 job_id: &ExportJobId,
855 worker_id: &WorkerId,
856 fencing_token: u64,
857 progress: &TypeExportProgress,
858 ) -> Result<(), LeaseError> {
859 let conn = self.get_connection().map_err(LeaseError::Storage)?;
860 let affected = conn
861 .execute(
862 "INSERT INTO bulk_export_progress
863 (job_id, resource_type, total_count, exported_count, error_count, cursor_state)
864 SELECT ?1, ?2, ?3, ?4, ?5, ?6
865 WHERE EXISTS (
866 SELECT 1 FROM bulk_export_jobs
867 WHERE id = ?1 AND tenant_id = ?7 AND worker_id = ?8 AND fencing_token = ?9
868 )
869 ON CONFLICT(job_id, resource_type) DO UPDATE SET
870 total_count = excluded.total_count,
871 exported_count = excluded.exported_count,
872 error_count = excluded.error_count,
873 cursor_state = excluded.cursor_state",
874 params![
875 job_id.as_str(),
876 progress.resource_type,
877 progress.total_count.map(|v| v as i64),
878 progress.exported_count as i64,
879 progress.error_count as i64,
880 progress.cursor_state,
881 tenant.tenant_id().as_str(),
882 worker_id.as_str(),
883 fencing_token as i64,
884 ],
885 )
886 .map_err(|e| {
887 LeaseError::Storage(internal_error(format!("update_type_progress: {e}")))
888 })?;
889 if affected == 0 {
890 Err(LeaseError::LeaseLost {
891 job_id: job_id.clone(),
892 })
893 } else {
894 Ok(())
895 }
896 }
897
898 async fn record_export_file(
899 &self,
900 tenant: &TenantContext,
901 job_id: &ExportJobId,
902 worker_id: &WorkerId,
903 fencing_token: u64,
904 part: &FinalizedPart,
905 file_type: &str,
906 ) -> Result<(), LeaseError> {
907 let conn = self.get_connection().map_err(LeaseError::Storage)?;
908 let file_path = encode_part_path(&part.key);
909 let affected = conn
910 .execute(
911 "INSERT INTO bulk_export_files
912 (job_id, resource_type, file_type, file_path, resource_count, byte_count,
913 part_index, fencing_token)
914 SELECT ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8
915 WHERE EXISTS (
916 SELECT 1 FROM bulk_export_jobs
917 WHERE id = ?1 AND tenant_id = ?9 AND worker_id = ?10 AND fencing_token = ?11
918 )
919 ON CONFLICT(job_id, file_type, resource_type, part_index) DO UPDATE SET
920 file_path = excluded.file_path,
921 resource_count = excluded.resource_count,
922 byte_count = excluded.byte_count,
923 fencing_token = excluded.fencing_token",
924 params![
925 job_id.as_str(),
926 part.resource_type,
927 file_type,
928 file_path,
929 part.line_count as i64,
930 part.size_bytes as i64,
931 part.key.part_index as i64,
932 part.key.fencing_token as i64,
933 tenant.tenant_id().as_str(),
934 worker_id.as_str(),
935 fencing_token as i64,
936 ],
937 )
938 .map_err(|e| LeaseError::Storage(internal_error(format!("record_export_file: {e}"))))?;
939 if affected == 0 {
940 Err(LeaseError::LeaseLost {
941 job_id: job_id.clone(),
942 })
943 } else {
944 Ok(())
945 }
946 }
947
948 async fn finish_export_job(
949 &self,
950 tenant: &TenantContext,
951 job_id: &ExportJobId,
952 worker_id: &WorkerId,
953 fencing_token: u64,
954 ) -> Result<(), LeaseError> {
955 let conn = self.get_connection().map_err(LeaseError::Storage)?;
956 let now = Utc::now().to_rfc3339();
957 let affected = conn
958 .execute(
959 "UPDATE bulk_export_jobs
960 SET status = 'complete', completed_at = ?1
961 WHERE id = ?2 AND tenant_id = ?3 AND worker_id = ?4 AND fencing_token = ?5",
962 params![
963 now,
964 job_id.as_str(),
965 tenant.tenant_id().as_str(),
966 worker_id.as_str(),
967 fencing_token as i64
968 ],
969 )
970 .map_err(|e| LeaseError::Storage(internal_error(format!("finish_job: {e}"))))?;
971 if affected == 0 {
972 Err(LeaseError::LeaseLost {
973 job_id: job_id.clone(),
974 })
975 } else {
976 Ok(())
977 }
978 }
979
980 async fn fail_export_job(
981 &self,
982 tenant: &TenantContext,
983 job_id: &ExportJobId,
984 worker_id: &WorkerId,
985 fencing_token: u64,
986 error_message: &str,
987 ) -> Result<(), LeaseError> {
988 let conn = self.get_connection().map_err(LeaseError::Storage)?;
989 let now = Utc::now().to_rfc3339();
990 let affected = conn
991 .execute(
992 "UPDATE bulk_export_jobs
993 SET status = 'error', error_message = ?1, completed_at = ?2
994 WHERE id = ?3 AND tenant_id = ?4 AND worker_id = ?5 AND fencing_token = ?6",
995 params![
996 error_message,
997 now,
998 job_id.as_str(),
999 tenant.tenant_id().as_str(),
1000 worker_id.as_str(),
1001 fencing_token as i64
1002 ],
1003 )
1004 .map_err(|e| LeaseError::Storage(internal_error(format!("fail_job: {e}"))))?;
1005 if affected == 0 {
1006 Err(LeaseError::LeaseLost {
1007 job_id: job_id.clone(),
1008 })
1009 } else {
1010 Ok(())
1011 }
1012 }
1013}
1014
1015#[async_trait]
1016impl ExportDataProvider for SqliteBackend {
1017 async fn list_export_types(
1018 &self,
1019 tenant: &TenantContext,
1020 request: &ExportRequest,
1021 ) -> StorageResult<Vec<String>> {
1022 let conn = self.get_connection()?;
1023 let tenant_id = tenant.tenant_id().as_str();
1024
1025 if !request.resource_types.is_empty() {
1027 let mut valid_types = Vec::new();
1029 for rt in &request.resource_types {
1030 let exists: bool = conn
1031 .query_row(
1032 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 LIMIT 1",
1033 params![tenant_id, rt],
1034 |_| Ok(true),
1035 )
1036 .unwrap_or(false);
1037 if exists {
1038 valid_types.push(rt.clone());
1039 }
1040 }
1041 return Ok(valid_types);
1042 }
1043
1044 let mut stmt = conn
1046 .prepare(
1047 "SELECT DISTINCT resource_type FROM resources
1048 WHERE tenant_id = ?1 AND is_deleted = 0
1049 ORDER BY resource_type",
1050 )
1051 .map_err(|e| internal_error(format!("Failed to prepare types query: {}", e)))?;
1052
1053 let types: Vec<String> = stmt
1054 .query_map(params![tenant_id], |row| row.get(0))
1055 .map_err(|e| internal_error(format!("Failed to query types: {}", e)))?
1056 .filter_map(|r| r.ok())
1057 .collect();
1058
1059 Ok(types)
1060 }
1061
1062 async fn count_export_resources(
1063 &self,
1064 tenant: &TenantContext,
1065 request: &ExportRequest,
1066 resource_type: &str,
1067 ) -> StorageResult<u64> {
1068 let conn = self.get_connection()?;
1069 let tenant_id = tenant.tenant_id().as_str();
1070
1071 let mut query = "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
1072 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1073 Box::new(tenant_id.to_string()),
1074 Box::new(resource_type.to_string()),
1075 ];
1076
1077 if let Some(since) = request.since {
1079 query.push_str(" AND last_updated >= ?3");
1080 params_vec.push(Box::new(since.to_rfc3339()));
1081 }
1082
1083 let params_slice: Vec<&dyn rusqlite::ToSql> =
1084 params_vec.iter().map(|p| p.as_ref()).collect();
1085
1086 let count: i64 = conn
1087 .query_row(&query, params_slice.as_slice(), |row| row.get(0))
1088 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1089
1090 Ok(count as u64)
1091 }
1092
1093 async fn fetch_export_batch(
1094 &self,
1095 tenant: &TenantContext,
1096 request: &ExportRequest,
1097 resource_type: &str,
1098 cursor: Option<&str>,
1099 batch_size: u32,
1100 ) -> StorageResult<NdjsonBatch> {
1101 let conn = self.get_connection()?;
1102 let tenant_id = tenant.tenant_id().as_str();
1103
1104 let mut query = "SELECT id, data, last_updated FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0".to_string();
1105 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1106 Box::new(tenant_id.to_string()),
1107 Box::new(resource_type.to_string()),
1108 ];
1109
1110 if let Some(since) = request.since {
1112 query.push_str(" AND last_updated >= ?");
1113 params_vec.push(Box::new(since.to_rfc3339()));
1114 }
1115
1116 if let Some(cursor) = cursor {
1118 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1120 if parts.len() == 2 {
1121 query.push_str(" AND (last_updated, id) > (?, ?)");
1122 params_vec.push(Box::new(parts[0].to_string()));
1123 params_vec.push(Box::new(parts[1].to_string()));
1124 }
1125 }
1126
1127 query.push_str(" ORDER BY last_updated, id");
1128 query.push_str(&format!(" LIMIT {}", batch_size + 1)); let params_slice: Vec<&dyn rusqlite::ToSql> =
1131 params_vec.iter().map(|p| p.as_ref()).collect();
1132
1133 let mut stmt = conn
1134 .prepare(&query)
1135 .map_err(|e| internal_error(format!("Failed to prepare batch query: {}", e)))?;
1136
1137 let rows: Vec<(String, Vec<u8>, String)> = stmt
1138 .query_map(params_slice.as_slice(), |row| {
1139 Ok((
1140 row.get::<_, String>(0)?,
1141 row.get::<_, Vec<u8>>(1)?,
1142 row.get::<_, String>(2)?,
1143 ))
1144 })
1145 .map_err(|e| internal_error(format!("Failed to query batch: {}", e)))?
1146 .filter_map(|r| r.ok())
1147 .collect();
1148
1149 let has_more = rows.len() > batch_size as usize;
1150 let rows = if has_more {
1151 &rows[..batch_size as usize]
1152 } else {
1153 &rows[..]
1154 };
1155
1156 let mut lines = Vec::new();
1157 let mut last_cursor = None;
1158
1159 for (id, data, last_updated) in rows {
1160 let resource: Value = serde_json::from_slice(data)
1161 .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
1162 let line = serde_json::to_string(&resource)
1163 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1164 lines.push(line);
1165 last_cursor = Some(format!("{}|{}", last_updated, id));
1166 }
1167
1168 Ok(NdjsonBatch {
1169 lines,
1170 next_cursor: if has_more { last_cursor } else { None },
1171 is_last: !has_more,
1172 })
1173 }
1174}
1175
1176#[async_trait]
1177impl PatientExportProvider for SqliteBackend {
1178 async fn list_patient_ids(
1179 &self,
1180 tenant: &TenantContext,
1181 request: &ExportRequest,
1182 cursor: Option<&str>,
1183 batch_size: u32,
1184 ) -> StorageResult<(Vec<String>, Option<String>)> {
1185 let conn = self.get_connection()?;
1186 let tenant_id = tenant.tenant_id().as_str();
1187
1188 let mut query = "SELECT id FROM resources WHERE tenant_id = ?1 AND resource_type = 'Patient' AND is_deleted = 0".to_string();
1189 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(tenant_id.to_string())];
1190
1191 if let Some(since) = request.since {
1192 query.push_str(" AND last_updated >= ?");
1193 params_vec.push(Box::new(since.to_rfc3339()));
1194 }
1195
1196 if let Some(cursor) = cursor {
1197 query.push_str(" AND id > ?");
1198 params_vec.push(Box::new(cursor.to_string()));
1199 }
1200
1201 query.push_str(" ORDER BY id");
1202 query.push_str(&format!(" LIMIT {}", batch_size + 1));
1203
1204 let params_slice: Vec<&dyn rusqlite::ToSql> =
1205 params_vec.iter().map(|p| p.as_ref()).collect();
1206
1207 let mut stmt = conn
1208 .prepare(&query)
1209 .map_err(|e| internal_error(format!("Failed to prepare patient ids query: {}", e)))?;
1210
1211 let ids: Vec<String> = stmt
1212 .query_map(params_slice.as_slice(), |row| row.get(0))
1213 .map_err(|e| internal_error(format!("Failed to query patient ids: {}", e)))?
1214 .filter_map(|r| r.ok())
1215 .collect();
1216
1217 let has_more = ids.len() > batch_size as usize;
1218 let ids = if has_more {
1219 ids[..batch_size as usize].to_vec()
1220 } else {
1221 ids
1222 };
1223
1224 let next_cursor = if has_more { ids.last().cloned() } else { None };
1225
1226 Ok((ids, next_cursor))
1227 }
1228
1229 async fn fetch_patient_compartment_batch(
1230 &self,
1231 tenant: &TenantContext,
1232 request: &ExportRequest,
1233 resource_type: &str,
1234 patient_ids: &[String],
1235 cursor: Option<&str>,
1236 batch_size: u32,
1237 ) -> StorageResult<NdjsonBatch> {
1238 if patient_ids.is_empty() {
1239 return Ok(NdjsonBatch::empty());
1240 }
1241
1242 let conn = self.get_connection()?;
1243 let tenant_id = tenant.tenant_id().as_str();
1244
1245 if resource_type == "Patient" {
1247 let placeholders: Vec<String> = (0..patient_ids.len())
1248 .map(|i| format!("?{}", i + 3))
1249 .collect();
1250 let mut query = format!(
1251 "SELECT id, data, last_updated FROM resources
1252 WHERE tenant_id = ?1 AND resource_type = ?2 AND id IN ({}) AND is_deleted = 0",
1253 placeholders.join(",")
1254 );
1255
1256 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1257 Box::new(tenant_id.to_string()),
1258 Box::new(resource_type.to_string()),
1259 ];
1260 for id in patient_ids {
1261 params_vec.push(Box::new(id.clone()));
1262 }
1263
1264 if let Some(cursor) = cursor {
1265 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1266 if parts.len() == 2 {
1267 query.push_str(" AND (last_updated, id) > (?, ?)");
1268 params_vec.push(Box::new(parts[0].to_string()));
1269 params_vec.push(Box::new(parts[1].to_string()));
1270 }
1271 }
1272
1273 query.push_str(" ORDER BY last_updated, id");
1274 query.push_str(&format!(" LIMIT {}", batch_size + 1));
1275
1276 let params_slice: Vec<&dyn rusqlite::ToSql> =
1277 params_vec.iter().map(|p| p.as_ref()).collect();
1278
1279 let mut stmt = conn.prepare(&query).map_err(|e| {
1280 internal_error(format!("Failed to prepare compartment query: {}", e))
1281 })?;
1282
1283 let rows: Vec<(String, Vec<u8>, String)> = stmt
1284 .query_map(params_slice.as_slice(), |row| {
1285 Ok((
1286 row.get::<_, String>(0)?,
1287 row.get::<_, Vec<u8>>(1)?,
1288 row.get::<_, String>(2)?,
1289 ))
1290 })
1291 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
1292 .filter_map(|r| r.ok())
1293 .collect();
1294
1295 let has_more = rows.len() > batch_size as usize;
1296 let rows = if has_more {
1297 &rows[..batch_size as usize]
1298 } else {
1299 &rows[..]
1300 };
1301
1302 let mut lines = Vec::new();
1303 let mut last_cursor = None;
1304
1305 for (id, data, last_updated) in rows {
1306 let resource: Value = serde_json::from_slice(data)
1307 .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
1308 let line = serde_json::to_string(&resource)
1309 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1310 lines.push(line);
1311 last_cursor = Some(format!("{}|{}", last_updated, id));
1312 }
1313
1314 return Ok(NdjsonBatch {
1315 lines,
1316 next_cursor: if has_more { last_cursor } else { None },
1317 is_last: !has_more,
1318 });
1319 }
1320
1321 let patient_refs: Vec<String> = patient_ids
1328 .iter()
1329 .map(|id| format!("Patient/{}", id))
1330 .collect();
1331
1332 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
1333 Box::new(tenant_id.to_string()),
1334 Box::new(resource_type.to_string()),
1335 ];
1336 let mut query = "SELECT id, data, last_updated FROM resources \
1337 WHERE tenant_id = ? AND resource_type = ? AND is_deleted = 0"
1338 .to_string();
1339
1340 if let Some(since) = request.since {
1341 query.push_str(" AND last_updated >= ?");
1342 params_vec.push(Box::new(since.to_rfc3339()));
1343 }
1344
1345 let placeholders: Vec<&str> = patient_refs.iter().map(|_| "?").collect();
1346 let in_list = placeholders.join(",");
1347 query.push_str(&format!(
1348 " AND (json_extract(data, '$.subject.reference') IN ({in_list}) \
1349 OR json_extract(data, '$.patient.reference') IN ({in_list}))"
1350 ));
1351 for patient_ref in &patient_refs {
1353 params_vec.push(Box::new(patient_ref.clone()));
1354 }
1355 for patient_ref in &patient_refs {
1356 params_vec.push(Box::new(patient_ref.clone()));
1357 }
1358
1359 if let Some(cursor) = cursor {
1360 let parts: Vec<&str> = cursor.splitn(2, '|').collect();
1361 if parts.len() == 2 {
1362 query.push_str(" AND (last_updated, id) > (?, ?)");
1363 params_vec.push(Box::new(parts[0].to_string()));
1364 params_vec.push(Box::new(parts[1].to_string()));
1365 }
1366 }
1367
1368 query.push_str(" ORDER BY last_updated, id");
1369 query.push_str(&format!(" LIMIT {}", batch_size + 1));
1370
1371 let params_slice: Vec<&dyn rusqlite::ToSql> =
1372 params_vec.iter().map(|p| p.as_ref()).collect();
1373
1374 let mut stmt = conn
1375 .prepare(&query)
1376 .map_err(|e| internal_error(format!("Failed to prepare compartment query: {}", e)))?;
1377
1378 let rows: Vec<(String, Vec<u8>, String)> = stmt
1379 .query_map(params_slice.as_slice(), |row| {
1380 Ok((
1381 row.get::<_, String>(0)?,
1382 row.get::<_, Vec<u8>>(1)?,
1383 row.get::<_, String>(2)?,
1384 ))
1385 })
1386 .map_err(|e| internal_error(format!("Failed to query compartment: {}", e)))?
1387 .filter_map(|r| r.ok())
1388 .collect();
1389
1390 let has_more = rows.len() > batch_size as usize;
1391 let rows = if has_more {
1392 &rows[..batch_size as usize]
1393 } else {
1394 &rows[..]
1395 };
1396
1397 let mut lines = Vec::new();
1398 let mut last_cursor = None;
1399
1400 for (id, data, last_updated) in rows {
1401 let resource: Value = serde_json::from_slice(data)
1402 .map_err(|e| internal_error(format!("Failed to parse resource: {}", e)))?;
1403 let line = serde_json::to_string(&resource)
1404 .map_err(|e| internal_error(format!("Failed to serialize resource: {}", e)))?;
1405 lines.push(line);
1406 last_cursor = Some(format!("{}|{}", last_updated, id));
1407 }
1408
1409 Ok(NdjsonBatch {
1410 lines,
1411 next_cursor: if has_more { last_cursor } else { None },
1412 is_last: !has_more,
1413 })
1414 }
1415}
1416
1417#[async_trait]
1418impl GroupExportProvider for SqliteBackend {
1419 async fn get_group_members(
1420 &self,
1421 tenant: &TenantContext,
1422 group_id: &str,
1423 ) -> StorageResult<Vec<String>> {
1424 let conn = self.get_connection()?;
1425 let tenant_id = tenant.tenant_id().as_str();
1426
1427 let data: Vec<u8> = conn
1429 .query_row(
1430 "SELECT data FROM resources WHERE tenant_id = ?1 AND resource_type = 'Group' AND id = ?2 AND is_deleted = 0",
1431 params![tenant_id, group_id],
1432 |row| row.get(0),
1433 )
1434 .map_err(|e| {
1435 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
1436 StorageError::BulkExport(BulkExportError::GroupNotFound {
1437 group_id: group_id.to_string(),
1438 })
1439 } else {
1440 internal_error(format!("Failed to get group: {}", e))
1441 }
1442 })?;
1443
1444 let group: Value = serde_json::from_slice(&data)
1445 .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
1446
1447 let mut members = Vec::new();
1449 if let Some(member_array) = group.get("member").and_then(|m| m.as_array()) {
1450 for member in member_array {
1451 if let Some(entity) = member.get("entity") {
1452 if let Some(reference) = entity.get("reference").and_then(|r| r.as_str()) {
1453 members.push(reference.to_string());
1454 }
1455 }
1456 }
1457 }
1458
1459 Ok(members)
1460 }
1461
1462 async fn resolve_group_patient_ids(
1463 &self,
1464 tenant: &TenantContext,
1465 group_id: &str,
1466 ) -> StorageResult<Vec<String>> {
1467 use std::collections::HashSet;
1470 let mut visited_groups: HashSet<String> = HashSet::new();
1471 let mut seen_patients: HashSet<String> = HashSet::new();
1472 let mut patient_ids: Vec<String> = Vec::new();
1473 let mut worklist: Vec<String> = vec![group_id.to_string()];
1474
1475 while let Some(gid) = worklist.pop() {
1476 if !visited_groups.insert(gid.clone()) {
1477 continue; }
1479 let members = self.get_group_members(tenant, &gid).await?;
1480 for reference in members {
1481 if let Some(pid) = reference.strip_prefix("Patient/") {
1482 if seen_patients.insert(pid.to_string()) {
1483 patient_ids.push(pid.to_string());
1484 }
1485 } else if let Some(nested) = reference.strip_prefix("Group/") {
1486 worklist.push(nested.to_string());
1487 }
1488 }
1489 }
1490
1491 Ok(patient_ids)
1492 }
1493
1494 async fn get_group_members_with_periods(
1495 &self,
1496 tenant: &TenantContext,
1497 group_id: &str,
1498 ) -> StorageResult<Vec<(String, Option<DateTime<Utc>>)>> {
1499 let conn = self.get_connection()?;
1500 let tenant_id = tenant.tenant_id().as_str();
1501 let data: Vec<u8> = conn
1502 .query_row(
1503 "SELECT data FROM resources
1504 WHERE tenant_id = ?1 AND resource_type = 'Group'
1505 AND id = ?2 AND is_deleted = 0",
1506 params![tenant_id, group_id],
1507 |row| row.get(0),
1508 )
1509 .map_err(|e| {
1510 if matches!(e, rusqlite::Error::QueryReturnedNoRows) {
1511 StorageError::BulkExport(BulkExportError::GroupNotFound {
1512 group_id: group_id.to_string(),
1513 })
1514 } else {
1515 internal_error(format!("Failed to get group: {}", e))
1516 }
1517 })?;
1518 let group: Value = serde_json::from_slice(&data)
1519 .map_err(|e| internal_error(format!("Failed to parse group: {}", e)))?;
1520 let mut out = Vec::new();
1521 if let Some(arr) = group.get("member").and_then(|m| m.as_array()) {
1522 for member in arr {
1523 let Some(reference) = member
1524 .get("entity")
1525 .and_then(|e| e.get("reference"))
1526 .and_then(|r| r.as_str())
1527 else {
1528 continue;
1529 };
1530 let period_start = member
1531 .get("period")
1532 .and_then(|p| p.get("start"))
1533 .and_then(|s| s.as_str())
1534 .and_then(|s| {
1535 DateTime::parse_from_rfc3339(s)
1536 .ok()
1537 .map(|dt| dt.with_timezone(&Utc))
1538 });
1539 out.push((reference.to_string(), period_start));
1540 }
1541 }
1542 Ok(out)
1543 }
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548 use super::*;
1549 use crate::core::ResourceStorage;
1550 use crate::tenant::{TenantId, TenantPermissions};
1551 use helios_fhir::FhirVersion;
1552 use serde_json::json;
1553
1554 fn create_test_backend() -> SqliteBackend {
1555 let backend = SqliteBackend::in_memory().unwrap();
1556 backend.init_schema().unwrap();
1557 backend
1558 }
1559
1560 fn create_test_tenant() -> TenantContext {
1561 TenantContext::new(
1562 TenantId::new("test-tenant"),
1563 TenantPermissions::full_access(),
1564 )
1565 }
1566
1567 fn test_input(request: ExportRequest) -> StartExportInput {
1569 StartExportInput {
1570 request,
1571 transaction_time: Utc::now(),
1572 request_url: "http://localhost/$export".to_string(),
1573 owner_subject: Some("test-subject".to_string()),
1574 fhir_version: FhirVersion::default(),
1575 }
1576 }
1577
1578 #[tokio::test]
1579 async fn test_start_export() {
1580 let backend = create_test_backend();
1581 let tenant = create_test_tenant();
1582
1583 let request = ExportRequest::system().with_types(vec!["Patient".to_string()]);
1584 let job_id = backend
1585 .start_export(&tenant, test_input(request))
1586 .await
1587 .unwrap();
1588
1589 assert!(!job_id.as_str().is_empty());
1590
1591 let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
1592 assert_eq!(progress.status, ExportStatus::Accepted);
1593 }
1594
1595 #[tokio::test]
1596 async fn test_cancel_export() {
1597 let backend = create_test_backend();
1598 let tenant = create_test_tenant();
1599
1600 let job_id = backend
1601 .start_export(&tenant, test_input(ExportRequest::system()))
1602 .await
1603 .unwrap();
1604
1605 backend.cancel_export(&tenant, &job_id).await.unwrap();
1606
1607 let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
1608 assert_eq!(progress.status, ExportStatus::Cancelled);
1609 }
1610
1611 #[tokio::test]
1612 async fn test_list_exports() {
1613 let backend = create_test_backend();
1614 let tenant = create_test_tenant();
1615
1616 let _job_id1 = backend
1617 .start_export(&tenant, test_input(ExportRequest::system()))
1618 .await
1619 .unwrap();
1620 let _job_id2 = backend
1621 .start_export(&tenant, test_input(ExportRequest::patient()))
1622 .await
1623 .unwrap();
1624
1625 let exports = backend.list_exports(&tenant, false).await.unwrap();
1626 assert_eq!(exports.len(), 2);
1627 }
1628
1629 #[tokio::test]
1630 async fn test_count_active_exports() {
1631 let backend = create_test_backend();
1632 let tenant = create_test_tenant();
1633
1634 for _ in 0..3 {
1635 backend
1636 .start_export(&tenant, test_input(ExportRequest::system()))
1637 .await
1638 .unwrap();
1639 }
1640 assert_eq!(backend.count_active_exports(&tenant).await.unwrap(), 3);
1641 }
1642
1643 #[tokio::test]
1644 async fn test_get_export_job_metadata() {
1645 let backend = create_test_backend();
1646 let tenant = create_test_tenant();
1647
1648 let job_id = backend
1649 .start_export(&tenant, test_input(ExportRequest::patient()))
1650 .await
1651 .unwrap();
1652
1653 let meta = backend
1654 .get_export_job_metadata(&tenant, &job_id)
1655 .await
1656 .unwrap();
1657 assert_eq!(meta.status, ExportStatus::Accepted);
1658 assert_eq!(meta.owner_subject.as_deref(), Some("test-subject"));
1659 assert!(matches!(meta.level, ExportLevel::Patient));
1660
1661 let missing = backend
1662 .get_export_job_metadata(&tenant, &ExportJobId::from_string("nope"))
1663 .await;
1664 assert!(missing.is_err());
1665 }
1666
1667 #[tokio::test]
1668 async fn test_claim_and_worker_lifecycle() {
1669 let backend = create_test_backend();
1670 let tenant = create_test_tenant();
1671
1672 let job_id = backend
1673 .start_export(&tenant, test_input(ExportRequest::system()))
1674 .await
1675 .unwrap();
1676
1677 let worker = WorkerId::new("worker-1");
1678 let lease = backend
1679 .claim_next(&worker, StdDuration::from_secs(60))
1680 .await
1681 .unwrap()
1682 .expect("a job should be claimable");
1683 assert_eq!(lease.job_id, job_id);
1684 assert_eq!(lease.fencing_token, 1);
1685
1686 assert!(
1688 backend
1689 .claim_next(&worker, StdDuration::from_secs(60))
1690 .await
1691 .unwrap()
1692 .is_none()
1693 );
1694
1695 backend
1697 .mark_export_in_progress(&tenant, &job_id, &worker, lease.fencing_token)
1698 .await
1699 .unwrap();
1700 backend
1701 .update_export_type_progress(
1702 &tenant,
1703 &job_id,
1704 &worker,
1705 lease.fencing_token,
1706 &TypeExportProgress::new("Patient"),
1707 )
1708 .await
1709 .unwrap();
1710 backend
1711 .finish_export_job(&tenant, &job_id, &worker, lease.fencing_token)
1712 .await
1713 .unwrap();
1714
1715 let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
1716 assert_eq!(progress.status, ExportStatus::Complete);
1717 }
1718
1719 #[tokio::test]
1720 async fn test_stale_worker_fenced_out() {
1721 let backend = create_test_backend();
1722 let tenant = create_test_tenant();
1723
1724 let job_id = backend
1725 .start_export(&tenant, test_input(ExportRequest::system()))
1726 .await
1727 .unwrap();
1728
1729 let worker_a = WorkerId::new("worker-a");
1730 let lease_a = backend
1731 .claim_next(&worker_a, StdDuration::from_millis(1))
1732 .await
1733 .unwrap()
1734 .unwrap();
1735
1736 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1738 let worker_b = WorkerId::new("worker-b");
1739 let lease_b = backend
1740 .claim_next(&worker_b, StdDuration::from_secs(60))
1741 .await
1742 .unwrap()
1743 .unwrap();
1744 assert!(lease_b.fencing_token > lease_a.fencing_token);
1745
1746 assert!(matches!(
1748 backend
1749 .mark_export_in_progress(&tenant, &job_id, &worker_a, lease_a.fencing_token)
1750 .await,
1751 Err(LeaseError::LeaseLost { .. })
1752 ));
1753 assert!(matches!(
1754 backend
1755 .update_export_type_progress(
1756 &tenant,
1757 &job_id,
1758 &worker_a,
1759 lease_a.fencing_token,
1760 &TypeExportProgress::new("Patient"),
1761 )
1762 .await,
1763 Err(LeaseError::LeaseLost { .. })
1764 ));
1765 assert!(matches!(
1766 backend
1767 .finish_export_job(&tenant, &job_id, &worker_a, lease_a.fencing_token)
1768 .await,
1769 Err(LeaseError::LeaseLost { .. })
1770 ));
1771
1772 backend
1774 .finish_export_job(&tenant, &job_id, &worker_b, lease_b.fencing_token)
1775 .await
1776 .unwrap();
1777 }
1778
1779 #[tokio::test]
1780 async fn test_since_newly_added_exclude_filters_late_joiners() {
1781 use crate::core::bulk_export_output::{ExportPartKey, ExportPartWriter};
1782 let _ = ExportPartKey::output("t", ExportJobId::new(), "x", 0, 0); let backend = create_test_backend();
1785 let tenant = create_test_tenant();
1786
1787 backend
1790 .create(
1791 &tenant,
1792 "Group",
1793 json!({
1794 "resourceType": "Group", "id": "g-cohort",
1795 "member": [
1796 {
1797 "entity": {"reference": "Patient/p-old"},
1798 "period": {"start": "2024-01-01T00:00:00Z"}
1799 },
1800 {
1801 "entity": {"reference": "Patient/p-new"},
1802 "period": {"start": "2026-06-01T00:00:00Z"}
1803 }
1804 ]
1805 }),
1806 FhirVersion::default(),
1807 )
1808 .await
1809 .unwrap();
1810
1811 let members = backend
1812 .get_group_members_with_periods(&tenant, "g-cohort")
1813 .await
1814 .unwrap();
1815 assert_eq!(members.len(), 2);
1816 assert!(members.iter().all(|(_, p)| p.is_some()));
1817
1818 let since = chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z")
1821 .unwrap()
1822 .with_timezone(&Utc);
1823 let kept: Vec<String> = members
1824 .iter()
1825 .filter_map(|(reference, period_start)| {
1826 let pid = reference.strip_prefix("Patient/")?;
1827 match period_start {
1828 Some(start) if *start > since => None,
1829 _ => Some(pid.to_string()),
1830 }
1831 })
1832 .collect();
1833 assert_eq!(kept, vec!["p-old".to_string()]);
1834
1835 let _ = ExportPartWriter::new(Box::pin(Vec::<u8>::new()));
1837 }
1838
1839 #[tokio::test]
1840 async fn test_patient_compartment_uses_resource_payload_not_search_index() {
1841 let mut backend = SqliteBackend::in_memory().unwrap();
1847 backend.init_schema().unwrap();
1848 backend.set_search_offloaded(true);
1849 let tenant = create_test_tenant();
1850
1851 backend
1852 .create(
1853 &tenant,
1854 "Patient",
1855 json!({"resourceType": "Patient", "id": "p1"}),
1856 FhirVersion::default(),
1857 )
1858 .await
1859 .unwrap();
1860 backend
1861 .create(
1862 &tenant,
1863 "Observation",
1864 json!({
1865 "resourceType": "Observation", "id": "o1", "status": "final",
1866 "subject": {"reference": "Patient/p1"}
1867 }),
1868 FhirVersion::default(),
1869 )
1870 .await
1871 .unwrap();
1872
1873 let request = ExportRequest::patient();
1874 let batch = backend
1875 .fetch_patient_compartment_batch(
1876 &tenant,
1877 &request,
1878 "Observation",
1879 &["p1".to_string()],
1880 None,
1881 100,
1882 )
1883 .await
1884 .unwrap();
1885 assert_eq!(
1886 batch.lines.len(),
1887 1,
1888 "Observation should be found via subject.reference"
1889 );
1890 assert!(batch.lines[0].contains("\"o1\""));
1891 }
1892
1893 #[tokio::test]
1894 async fn test_resolve_nested_groups_with_cycle_guard() {
1895 let backend = create_test_backend();
1896 let tenant = create_test_tenant();
1897
1898 backend
1900 .create(
1901 &tenant,
1902 "Group",
1903 json!({
1904 "resourceType": "Group", "id": "g1",
1905 "member": [
1906 {"entity": {"reference": "Patient/p1"}},
1907 {"entity": {"reference": "Group/g2"}}
1908 ]
1909 }),
1910 FhirVersion::default(),
1911 )
1912 .await
1913 .unwrap();
1914 backend
1915 .create(
1916 &tenant,
1917 "Group",
1918 json!({
1919 "resourceType": "Group", "id": "g2",
1920 "member": [
1921 {"entity": {"reference": "Patient/p2"}},
1922 {"entity": {"reference": "Group/g1"}}
1923 ]
1924 }),
1925 FhirVersion::default(),
1926 )
1927 .await
1928 .unwrap();
1929
1930 let mut ids = backend
1931 .resolve_group_patient_ids(&tenant, "g1")
1932 .await
1933 .unwrap();
1934 ids.sort();
1935 assert_eq!(ids, vec!["p1".to_string(), "p2".to_string()]);
1937 }
1938
1939 #[tokio::test]
1940 async fn test_list_export_types() {
1941 let backend = create_test_backend();
1942 let tenant = create_test_tenant();
1943
1944 backend
1946 .create(
1947 &tenant,
1948 "Patient",
1949 json!({"resourceType": "Patient", "name": [{"family": "Test"}]}),
1950 FhirVersion::default(),
1951 )
1952 .await
1953 .unwrap();
1954
1955 backend
1956 .create(
1957 &tenant,
1958 "Observation",
1959 json!({"resourceType": "Observation", "status": "final"}),
1960 FhirVersion::default(),
1961 )
1962 .await
1963 .unwrap();
1964
1965 let request = ExportRequest::system();
1966 let types = backend.list_export_types(&tenant, &request).await.unwrap();
1967
1968 assert!(types.contains(&"Patient".to_string()));
1969 assert!(types.contains(&"Observation".to_string()));
1970 }
1971
1972 #[tokio::test]
1973 async fn test_fetch_export_batch() {
1974 let backend = create_test_backend();
1975 let tenant = create_test_tenant();
1976
1977 for i in 0..5 {
1979 backend
1980 .create(
1981 &tenant,
1982 "Patient",
1983 json!({"resourceType": "Patient", "name": [{"family": format!("Patient{}", i)}]}),
1984 FhirVersion::default(),
1985 )
1986 .await
1987 .unwrap();
1988 }
1989
1990 let request = ExportRequest::system();
1991 let batch = backend
1992 .fetch_export_batch(&tenant, &request, "Patient", None, 3)
1993 .await
1994 .unwrap();
1995
1996 assert_eq!(batch.lines.len(), 3);
1997 assert!(!batch.is_last);
1998 assert!(batch.next_cursor.is_some());
1999
2000 let batch2 = backend
2002 .fetch_export_batch(
2003 &tenant,
2004 &request,
2005 "Patient",
2006 batch.next_cursor.as_deref(),
2007 3,
2008 )
2009 .await
2010 .unwrap();
2011
2012 assert_eq!(batch2.lines.len(), 2);
2013 assert!(batch2.is_last);
2014 }
2015
2016 #[tokio::test]
2017 async fn test_delete_export() {
2018 let backend = create_test_backend();
2019 let tenant = create_test_tenant();
2020
2021 let job_id = backend
2022 .start_export(&tenant, test_input(ExportRequest::system()))
2023 .await
2024 .unwrap();
2025
2026 backend.delete_export(&tenant, &job_id).await.unwrap();
2027
2028 let result = backend.get_export_status(&tenant, &job_id).await;
2030 assert!(matches!(
2031 result,
2032 Err(StorageError::BulkExport(
2033 BulkExportError::JobNotFound { .. }
2034 ))
2035 ));
2036 }
2037}