1use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use chrono::{DateTime, Utc};
14
15use crate::core::bulk_export::{
16 BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportRequest, ExportStatus,
17 GroupExportProvider, PatientExportProvider, TypeExportProgress,
18};
19use crate::core::bulk_export_output::{ExportOutputStore, ExportPartKey, FinalizedPart};
20use crate::error::{StorageError, StorageResult};
21use crate::tenant::TenantContext;
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub struct WorkerId(String);
26
27impl WorkerId {
28 pub fn new(id: impl Into<String>) -> Self {
30 Self(id.into())
31 }
32
33 pub fn random() -> Self {
35 Self(uuid::Uuid::new_v4().to_string())
36 }
37
38 pub fn as_str(&self) -> &str {
40 &self.0
41 }
42}
43
44impl std::fmt::Display for WorkerId {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "{}", self.0)
47 }
48}
49
50#[derive(Debug, Clone)]
56pub struct ExportJobLease {
57 pub job_id: ExportJobId,
59 pub tenant: TenantContext,
61 pub worker_id: WorkerId,
63 pub lease_expiry: DateTime<Utc>,
65 pub fencing_token: u64,
67}
68
69#[derive(Debug)]
71pub enum LeaseError {
72 LeaseLost {
75 job_id: ExportJobId,
77 },
78 Storage(StorageError),
80}
81
82impl std::fmt::Display for LeaseError {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 Self::LeaseLost { job_id } => {
86 write!(
87 f,
88 "export job {job_id} lease lost (reclaimed by another worker)"
89 )
90 }
91 Self::Storage(e) => write!(f, "storage error: {e}"),
92 }
93 }
94}
95
96impl std::error::Error for LeaseError {}
97
98impl From<StorageError> for LeaseError {
99 fn from(e: StorageError) -> Self {
100 Self::Storage(e)
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct WorkerJobView {
107 pub request: ExportRequest,
109 pub level: ExportLevel,
111 pub transaction_time: DateTime<Utc>,
113 pub fhir_version: helios_fhir::FhirVersion,
115 pub type_progress: Vec<TypeExportProgress>,
117}
118
119#[async_trait]
124pub trait ExportClaimStrategy: Send + Sync {
125 async fn claim_next(
129 &self,
130 worker_id: &WorkerId,
131 lease_duration: Duration,
132 ) -> StorageResult<Option<ExportJobLease>>;
133
134 async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError>;
137
138 async fn release(&self, lease: ExportJobLease) -> StorageResult<()>;
140}
141
142#[async_trait]
149pub trait ExportWorkerStorage: Send + Sync {
150 async fn get_export_job_for_worker(
153 &self,
154 tenant: &TenantContext,
155 job_id: &ExportJobId,
156 worker_id: &WorkerId,
157 fencing_token: u64,
158 ) -> Result<WorkerJobView, LeaseError>;
159
160 async fn mark_export_in_progress(
162 &self,
163 tenant: &TenantContext,
164 job_id: &ExportJobId,
165 worker_id: &WorkerId,
166 fencing_token: u64,
167 ) -> Result<(), LeaseError>;
168
169 async fn update_export_type_progress(
171 &self,
172 tenant: &TenantContext,
173 job_id: &ExportJobId,
174 worker_id: &WorkerId,
175 fencing_token: u64,
176 progress: &TypeExportProgress,
177 ) -> Result<(), LeaseError>;
178
179 async fn record_export_file(
181 &self,
182 tenant: &TenantContext,
183 job_id: &ExportJobId,
184 worker_id: &WorkerId,
185 fencing_token: u64,
186 part: &FinalizedPart,
187 file_type: &str,
188 ) -> Result<(), LeaseError>;
189
190 async fn finish_export_job(
192 &self,
193 tenant: &TenantContext,
194 job_id: &ExportJobId,
195 worker_id: &WorkerId,
196 fencing_token: u64,
197 ) -> Result<(), LeaseError>;
198
199 async fn fail_export_job(
201 &self,
202 tenant: &TenantContext,
203 job_id: &ExportJobId,
204 worker_id: &WorkerId,
205 fencing_token: u64,
206 error_message: &str,
207 ) -> Result<(), LeaseError>;
208}
209
210pub trait BulkExportJobStore:
216 BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy
217{
218}
219
220impl<T> BulkExportJobStore for T where
221 T: BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy
222{
223}
224
225pub trait ExportResourceProvider:
227 ExportDataProvider + PatientExportProvider + GroupExportProvider
228{
229}
230
231impl<T> ExportResourceProvider for T where
232 T: ExportDataProvider + PatientExportProvider + GroupExportProvider
233{
234}
235
236pub struct DefaultExportWorker<Js: ?Sized, Dp: ?Sized, Os: ?Sized> {
243 pub jobs: Arc<Js>,
245 pub data: Arc<Dp>,
247 pub output: Arc<Os>,
249 pub worker_id: WorkerId,
251 pub exclude_since_newly_added: bool,
255}
256
257impl<Js, Dp, Os> DefaultExportWorker<Js, Dp, Os>
258where
259 Js: BulkExportJobStore + ?Sized,
260 Dp: ExportResourceProvider + ?Sized,
261 Os: ExportOutputStore + ?Sized,
262{
263 pub fn new(jobs: Arc<Js>, data: Arc<Dp>, output: Arc<Os>, worker_id: WorkerId) -> Self {
265 Self {
266 jobs,
267 data,
268 output,
269 worker_id,
270 exclude_since_newly_added: false,
271 }
272 }
273
274 pub fn with_exclude_since_newly_added(mut self, exclude: bool) -> Self {
276 self.exclude_since_newly_added = exclude;
277 self
278 }
279
280 pub async fn run_job(&self, lease: ExportJobLease) -> StorageResult<()> {
286 match self.run_job_inner(&lease).await {
287 Ok(()) => Ok(()),
288 Err(LeaseError::LeaseLost { .. }) => {
289 Ok(())
291 }
292 Err(LeaseError::Storage(e)) => {
293 let _ = self
295 .jobs
296 .fail_export_job(
297 &lease.tenant,
298 &lease.job_id,
299 &lease.worker_id,
300 lease.fencing_token,
301 &e.to_string(),
302 )
303 .await;
304 Err(e)
305 }
306 }
307 }
308
309 async fn run_job_inner(&self, lease: &ExportJobLease) -> Result<(), LeaseError> {
310 let tenant = &lease.tenant;
311 let job_id = &lease.job_id;
312 let wid = &lease.worker_id;
313 let token = lease.fencing_token;
314
315 let view = self
316 .jobs
317 .get_export_job_for_worker(tenant, job_id, wid, token)
318 .await?;
319 self.jobs
320 .mark_export_in_progress(tenant, job_id, wid, token)
321 .await?;
322
323 let request = &view.request;
324
325 let types = self
327 .data
328 .list_export_types(tenant, request)
329 .await
330 .map_err(LeaseError::Storage)?;
331
332 let group_patient_ids: Option<Vec<String>> = match &view.level {
338 ExportLevel::Group { group_id } => {
339 let ids = match (self.exclude_since_newly_added, view.request.since.as_ref()) {
340 (true, Some(since)) => {
341 let members = self
342 .data
343 .get_group_members_with_periods(tenant, group_id)
344 .await
345 .map_err(LeaseError::Storage)?;
346 members
347 .into_iter()
348 .filter_map(|(reference, period_start)| {
349 let pid = reference.strip_prefix("Patient/")?;
350 match period_start {
353 Some(start) if start > *since => None,
354 _ => Some(pid.to_string()),
355 }
356 })
357 .collect()
358 }
359 _ => self
360 .data
361 .resolve_group_patient_ids(tenant, group_id)
362 .await
363 .map_err(LeaseError::Storage)?,
364 };
365 Some(ids)
366 }
367 _ => None,
368 };
369
370 let batch_size = request.batch_size.max(1);
371
372 for resource_type in &types {
373 let mut cursor: Option<String> = view
375 .type_progress
376 .iter()
377 .find(|p| &p.resource_type == resource_type)
378 .and_then(|p| p.cursor_state.clone());
379 let mut exported: u64 = view
380 .type_progress
381 .iter()
382 .find(|p| &p.resource_type == resource_type)
383 .map(|p| p.exported_count)
384 .unwrap_or(0);
385 let mut part_index: u32 = 0;
386
387 loop {
388 if let Ok(progress) = self.jobs.get_export_status(tenant, job_id).await {
390 if progress.status == ExportStatus::Cancelled {
391 return Ok(());
392 }
393 }
394
395 let batch = match &group_patient_ids {
396 Some(pids) => self
397 .data
398 .fetch_patient_compartment_batch(
399 tenant,
400 request,
401 resource_type,
402 pids,
403 cursor.as_deref(),
404 batch_size,
405 )
406 .await
407 .map_err(LeaseError::Storage)?,
408 None if matches!(view.level, ExportLevel::Patient)
409 && !request.patient_refs.is_empty() =>
410 {
411 let patient_ids: Vec<String> = request
414 .patient_refs
415 .iter()
416 .map(|r| r.strip_prefix("Patient/").unwrap_or(r).to_string())
417 .collect();
418 self.data
419 .fetch_patient_compartment_batch(
420 tenant,
421 request,
422 resource_type,
423 &patient_ids,
424 cursor.as_deref(),
425 batch_size,
426 )
427 .await
428 .map_err(LeaseError::Storage)?
429 }
430 None if matches!(view.level, ExportLevel::Patient) => {
431 self.data
434 .fetch_export_batch(
435 tenant,
436 request,
437 resource_type,
438 cursor.as_deref(),
439 batch_size,
440 )
441 .await
442 .map_err(LeaseError::Storage)?
443 }
444 None => self
445 .data
446 .fetch_export_batch(
447 tenant,
448 request,
449 resource_type,
450 cursor.as_deref(),
451 batch_size,
452 )
453 .await
454 .map_err(LeaseError::Storage)?,
455 };
456
457 if !batch.lines.is_empty() {
458 let key = ExportPartKey::output(
459 tenant.tenant_id().as_str(),
460 job_id.clone(),
461 resource_type.clone(),
462 part_index,
463 token,
464 );
465 let mut writer = self
466 .output
467 .open_writer(&key)
468 .await
469 .map_err(LeaseError::Storage)?;
470 for line in &batch.lines {
471 let out_line = apply_elements(line, &request.elements);
472 writer.write_line(&out_line).await.map_err(|e| {
473 LeaseError::Storage(StorageError::Backend(
474 crate::error::BackendError::Internal {
475 backend_name: "export-worker".to_string(),
476 message: format!("write_line: {e}"),
477 source: None,
478 },
479 ))
480 })?;
481 }
482 let finalized = self
483 .output
484 .finalize_part(&key, writer)
485 .await
486 .map_err(LeaseError::Storage)?;
487 exported += finalized.line_count;
488 self.jobs
489 .record_export_file(tenant, job_id, wid, token, &finalized, "output")
490 .await?;
491 part_index += 1;
492 }
493
494 cursor = batch.next_cursor.clone();
495
496 let mut progress = TypeExportProgress::new(resource_type.clone());
498 progress.exported_count = exported;
499 progress.cursor_state = cursor.clone();
500 self.jobs
501 .update_export_type_progress(tenant, job_id, wid, token, &progress)
502 .await?;
503 self.jobs.heartbeat(lease).await?;
504
505 if batch.is_last {
506 break;
507 }
508 }
509 }
510
511 self.jobs
512 .finish_export_job(tenant, job_id, wid, token)
513 .await?;
514 Ok(())
515 }
516}
517
518fn apply_elements(line: &str, elements: &[String]) -> String {
524 if elements.is_empty() {
525 return line.to_string();
526 }
527 let Ok(serde_json::Value::Object(obj)) = serde_json::from_str::<serde_json::Value>(line) else {
528 return line.to_string();
529 };
530 let mut out = serde_json::Map::new();
531 for key in ["resourceType", "id"] {
533 if let Some(v) = obj.get(key) {
534 out.insert(key.to_string(), v.clone());
535 }
536 }
537 for el in elements {
539 let name = el.rsplit('.').next().unwrap_or(el.as_str());
540 if let Some(v) = obj.get(name) {
541 out.insert(name.to_string(), v.clone());
542 }
543 }
544 let mut meta = obj
546 .get("meta")
547 .and_then(|m| m.as_object().cloned())
548 .unwrap_or_default();
549 let tag = serde_json::json!({
550 "system": "http://terminology.hl7.org/CodeSystem/v3-ObservationValue",
551 "code": "SUBSETTED",
552 });
553 let tags = meta
554 .entry("tag".to_string())
555 .or_insert_with(|| serde_json::Value::Array(Vec::new()));
556 if let serde_json::Value::Array(arr) = tags {
557 arr.push(tag);
558 }
559 out.insert("meta".to_string(), serde_json::Value::Object(meta));
560 serde_json::to_string(&serde_json::Value::Object(out)).unwrap_or_else(|_| line.to_string())
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 #[test]
568 fn test_apply_elements_noop_when_empty() {
569 let line = r#"{"resourceType":"Patient","id":"1","name":[]}"#;
570 assert_eq!(apply_elements(line, &[]), line);
571 }
572
573 #[test]
574 fn test_apply_elements_subsets_and_tags() {
575 let line = r#"{"resourceType":"Patient","id":"1","name":[{"family":"X"}],"gender":"male"}"#;
576 let out = apply_elements(line, &["name".to_string()]);
577 let v: serde_json::Value = serde_json::from_str(&out).unwrap();
578 assert_eq!(v["resourceType"], "Patient");
579 assert_eq!(v["id"], "1");
580 assert!(v.get("name").is_some());
581 assert!(v.get("gender").is_none());
582 assert_eq!(v["meta"]["tag"][0]["code"], "SUBSETTED");
583 }
584
585 #[cfg(feature = "sqlite")]
586 mod worker_integration {
587 use super::*;
588 use crate::backends::local_fs::LocalFsOutputStore;
589 use crate::backends::sqlite::SqliteBackend;
590 use crate::core::ResourceStorage;
591 use crate::core::bulk_export::{ExportRequest, StartExportInput};
592 use crate::tenant::{TenantContext, TenantId, TenantPermissions};
593 use chrono::Utc;
594 use std::sync::Arc;
595
596 fn tenant() -> TenantContext {
597 TenantContext::new(TenantId::new("t1"), TenantPermissions::full_access())
598 }
599
600 #[tokio::test]
601 async fn test_run_job_system_export_end_to_end() {
602 let backend = Arc::new(SqliteBackend::in_memory().unwrap());
603 backend.init_schema().unwrap();
604 let tenant = tenant();
605
606 for i in 0..3 {
607 backend
608 .create(
609 &tenant,
610 "Patient",
611 serde_json::json!({"resourceType": "Patient", "id": format!("p{i}")}),
612 helios_fhir::FhirVersion::default(),
613 )
614 .await
615 .unwrap();
616 }
617
618 let tmp = tempfile::tempdir().unwrap();
619 let output = Arc::new(LocalFsOutputStore::new(tmp.path(), "http://localhost:8080"));
620
621 let job_id = backend
622 .start_export(
623 &tenant,
624 StartExportInput {
625 request: ExportRequest::system()
626 .with_types(vec!["Patient".to_string()])
627 .with_batch_size(2),
628 transaction_time: Utc::now(),
629 request_url: "http://localhost/$export".to_string(),
630 owner_subject: Some("sub".to_string()),
631 fhir_version: helios_fhir::FhirVersion::default(),
632 },
633 )
634 .await
635 .unwrap();
636
637 let worker_id = WorkerId::new("w1");
638 let worker = DefaultExportWorker::new(
639 Arc::clone(&backend),
640 Arc::clone(&backend),
641 Arc::clone(&output),
642 worker_id.clone(),
643 );
644
645 let lease = backend
646 .claim_next(&worker_id, Duration::from_secs(60))
647 .await
648 .unwrap()
649 .expect("job claimable");
650 assert_eq!(lease.job_id, job_id);
651
652 worker.run_job(lease).await.unwrap();
653
654 let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
655 assert_eq!(progress.status, ExportStatus::Complete);
656
657 let manifest = backend.get_export_manifest(&tenant, &job_id).await.unwrap();
658 let total: u64 = manifest.output.iter().map(|e| e.count).sum();
659 assert_eq!(total, 3);
660 }
661 }
662}