Skip to main content

helios_persistence/core/
bulk_export_worker.rs

1//! Worker-facing traits for bulk export job execution.
2//!
3//! These traits are *not* part of the REST-facing [`BulkExportStorage`] surface
4//! — they are what the export worker uses to claim jobs and persist progress
5//! under a heartbeated, fencing-token-guarded lease.
6//!
7//! [`BulkExportStorage`]: crate::core::bulk_export::BulkExportStorage
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use chrono::{DateTime, Utc};
14
15use crate::core::bulk_export::{
16    BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportRequest, ExportStatus,
17    GroupExportProvider, PatientExportProvider, TypeExportProgress,
18};
19use crate::core::bulk_export_output::{ExportOutputStore, ExportPartKey, FinalizedPart};
20use crate::error::{StorageError, StorageResult};
21use crate::tenant::TenantContext;
22
23/// Identifier for an export worker instance.
24#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub struct WorkerId(String);
26
27impl WorkerId {
28    /// Creates a worker ID from a string.
29    pub fn new(id: impl Into<String>) -> Self {
30        Self(id.into())
31    }
32
33    /// Generates a fresh random worker ID.
34    pub fn random() -> Self {
35        Self(uuid::Uuid::new_v4().to_string())
36    }
37
38    /// Returns the ID as a string slice.
39    pub fn as_str(&self) -> &str {
40        &self.0
41    }
42}
43
44impl std::fmt::Display for WorkerId {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        write!(f, "{}", self.0)
47    }
48}
49
50/// A lease over a single export job, held by exactly one worker at a time.
51///
52/// Leases expire; if the holding worker does not heartbeat before
53/// `lease_expiry`, the lease is reclaimable. The `fencing_token` is bumped on
54/// every claim so a zombie worker cannot mutate a job another worker now owns.
55#[derive(Debug, Clone)]
56pub struct ExportJobLease {
57    /// The leased job.
58    pub job_id: ExportJobId,
59    /// The tenant the job belongs to.
60    pub tenant: TenantContext,
61    /// The worker holding the lease.
62    pub worker_id: WorkerId,
63    /// When the lease expires if not renewed.
64    pub lease_expiry: DateTime<Utc>,
65    /// Monotonically increasing token, bumped on every claim.
66    pub fencing_token: u64,
67}
68
69/// Error returned by fenced worker-storage operations.
70#[derive(Debug)]
71pub enum LeaseError {
72    /// The lease was lost — another worker reclaimed the job. The caller MUST
73    /// stop writing immediately.
74    LeaseLost {
75        /// The job whose lease was lost.
76        job_id: ExportJobId,
77    },
78    /// An underlying storage error.
79    Storage(StorageError),
80}
81
82impl std::fmt::Display for LeaseError {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        match self {
85            Self::LeaseLost { job_id } => {
86                write!(
87                    f,
88                    "export job {job_id} lease lost (reclaimed by another worker)"
89                )
90            }
91            Self::Storage(e) => write!(f, "storage error: {e}"),
92        }
93    }
94}
95
96impl std::error::Error for LeaseError {}
97
98impl From<StorageError> for LeaseError {
99    fn from(e: StorageError) -> Self {
100        Self::Storage(e)
101    }
102}
103
104/// The worker's view of a claimed job: everything needed to (re)run it.
105#[derive(Debug, Clone)]
106pub struct WorkerJobView {
107    /// The original export request.
108    pub request: ExportRequest,
109    /// The export level.
110    pub level: ExportLevel,
111    /// Server wall-clock frozen at kickoff.
112    pub transaction_time: DateTime<Utc>,
113    /// The FHIR version the export runs against.
114    pub fhir_version: helios_fhir::FhirVersion,
115    /// Already-persisted per-type progress, for resuming after a crash.
116    pub type_progress: Vec<TypeExportProgress>,
117}
118
119/// Strategy for atomically claiming the next available export job.
120///
121/// Each backend reaches for its native primitive — `SELECT … FOR UPDATE SKIP
122/// LOCKED` on Postgres, a process-local mutex on SQLite.
123#[async_trait]
124pub trait ExportClaimStrategy: Send + Sync {
125    /// Atomically transitions one eligible job (`accepted`, or `in_progress`
126    /// with an expired lease) to held-by-this-worker, bumping the fencing
127    /// token. Returns `Ok(None)` when no job is available.
128    async fn claim_next(
129        &self,
130        worker_id: &WorkerId,
131        lease_duration: Duration,
132    ) -> StorageResult<Option<ExportJobLease>>;
133
134    /// Renews a lease the worker still holds; returns the new expiry, or
135    /// `LeaseError::LeaseLost` if the job was reclaimed.
136    async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError>;
137
138    /// Releases a lease early (graceful shutdown). Best-effort.
139    async fn release(&self, lease: ExportJobLease) -> StorageResult<()>;
140}
141
142/// Worker-owned mutations of job state.
143///
144/// **Every method is fenced** by `worker_id` + `fencing_token`: a guarded
145/// mutation affecting zero rows returns `LeaseError::LeaseLost`, so a zombie
146/// worker cannot corrupt progress, file rows, or terminal status after its
147/// job has been reclaimed.
148#[async_trait]
149pub trait ExportWorkerStorage: Send + Sync {
150    /// Loads the claimed job's request, level, frozen metadata and persisted
151    /// per-type progress (for resume). Fenced.
152    async fn get_export_job_for_worker(
153        &self,
154        tenant: &TenantContext,
155        job_id: &ExportJobId,
156        worker_id: &WorkerId,
157        fencing_token: u64,
158    ) -> Result<WorkerJobView, LeaseError>;
159
160    /// Marks the job `in_progress` (sets `started_at` if unset). Fenced.
161    async fn mark_export_in_progress(
162        &self,
163        tenant: &TenantContext,
164        job_id: &ExportJobId,
165        worker_id: &WorkerId,
166        fencing_token: u64,
167    ) -> Result<(), LeaseError>;
168
169    /// Idempotent upsert of per-type progress (cursor + counts). Fenced.
170    async fn update_export_type_progress(
171        &self,
172        tenant: &TenantContext,
173        job_id: &ExportJobId,
174        worker_id: &WorkerId,
175        fencing_token: u64,
176        progress: &TypeExportProgress,
177    ) -> Result<(), LeaseError>;
178
179    /// Idempotent upsert of a finalized output/error file row. Fenced.
180    async fn record_export_file(
181        &self,
182        tenant: &TenantContext,
183        job_id: &ExportJobId,
184        worker_id: &WorkerId,
185        fencing_token: u64,
186        part: &FinalizedPart,
187        file_type: &str,
188    ) -> Result<(), LeaseError>;
189
190    /// Marks the job `complete` (sets `completed_at`). Fenced.
191    async fn finish_export_job(
192        &self,
193        tenant: &TenantContext,
194        job_id: &ExportJobId,
195        worker_id: &WorkerId,
196        fencing_token: u64,
197    ) -> Result<(), LeaseError>;
198
199    /// Marks the job `error` with a message (sets `completed_at`). Fenced.
200    async fn fail_export_job(
201        &self,
202        tenant: &TenantContext,
203        job_id: &ExportJobId,
204        worker_id: &WorkerId,
205        fencing_token: u64,
206        error_message: &str,
207    ) -> Result<(), LeaseError>;
208}
209
210/// Marker trait composing the three job-state surfaces a worker needs.
211///
212/// Only the SQLite and Postgres backends implement this; it is held as an
213/// `Arc<dyn BulkExportJobStore>` and selected at bootstrap by
214/// `HFS_BULK_EXPORT_BACKEND`.
215pub trait BulkExportJobStore:
216    BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy
217{
218}
219
220impl<T> BulkExportJobStore for T where
221    T: BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy
222{
223}
224
225/// Marker trait for a resource-store that can feed every export level.
226pub trait ExportResourceProvider:
227    ExportDataProvider + PatientExportProvider + GroupExportProvider
228{
229}
230
231impl<T> ExportResourceProvider for T where
232    T: ExportDataProvider + PatientExportProvider + GroupExportProvider
233{
234}
235
236/// The default in-process export worker.
237///
238/// Binds a [`BulkExportJobStore`] (job state + claim + worker storage), an
239/// [`ExportResourceProvider`] (the resource store), and an
240/// [`ExportOutputStore`] (where NDJSON bytes go), and drives a claimed job to
241/// completion under its lease.
242pub struct DefaultExportWorker<Js: ?Sized, Dp: ?Sized, Os: ?Sized> {
243    /// Job-state store (claim, worker storage, lifecycle).
244    pub jobs: Arc<Js>,
245    /// Resource data provider.
246    pub data: Arc<Dp>,
247    /// Output store for NDJSON parts.
248    pub output: Arc<Os>,
249    /// This worker's identifier.
250    pub worker_id: WorkerId,
251    /// Group-export `_since` toggle: when `true`, exclude resources from
252    /// before `_since` for patients added to the Group after `_since`
253    /// (using `Group.member.period.start`).
254    pub exclude_since_newly_added: bool,
255}
256
257impl<Js, Dp, Os> DefaultExportWorker<Js, Dp, Os>
258where
259    Js: BulkExportJobStore + ?Sized,
260    Dp: ExportResourceProvider + ?Sized,
261    Os: ExportOutputStore + ?Sized,
262{
263    /// Creates a new worker (defaults to `since_newly_added=include`).
264    pub fn new(jobs: Arc<Js>, data: Arc<Dp>, output: Arc<Os>, worker_id: WorkerId) -> Self {
265        Self {
266            jobs,
267            data,
268            output,
269            worker_id,
270            exclude_since_newly_added: false,
271        }
272    }
273
274    /// Sets the `since_newly_added=exclude` toggle for Group exports.
275    pub fn with_exclude_since_newly_added(mut self, exclude: bool) -> Self {
276        self.exclude_since_newly_added = exclude;
277        self
278    }
279
280    /// Runs the export job described by `lease` to completion.
281    ///
282    /// Every job-state mutation is fenced by `lease.worker_id` +
283    /// `lease.fencing_token`; any `LeaseError::LeaseLost` aborts the run
284    /// silently (the worker that reclaimed the job now owns it).
285    pub async fn run_job(&self, lease: ExportJobLease) -> StorageResult<()> {
286        match self.run_job_inner(&lease).await {
287            Ok(()) => Ok(()),
288            Err(LeaseError::LeaseLost { .. }) => {
289                // Another worker owns the job now — stop silently.
290                Ok(())
291            }
292            Err(LeaseError::Storage(e)) => {
293                // Best-effort: mark the job failed (also fenced).
294                let _ = self
295                    .jobs
296                    .fail_export_job(
297                        &lease.tenant,
298                        &lease.job_id,
299                        &lease.worker_id,
300                        lease.fencing_token,
301                        &e.to_string(),
302                    )
303                    .await;
304                Err(e)
305            }
306        }
307    }
308
309    async fn run_job_inner(&self, lease: &ExportJobLease) -> Result<(), LeaseError> {
310        let tenant = &lease.tenant;
311        let job_id = &lease.job_id;
312        let wid = &lease.worker_id;
313        let token = lease.fencing_token;
314
315        let view = self
316            .jobs
317            .get_export_job_for_worker(tenant, job_id, wid, token)
318            .await?;
319        self.jobs
320            .mark_export_in_progress(tenant, job_id, wid, token)
321            .await?;
322
323        let request = &view.request;
324
325        // Resolve the resource types to export.
326        let types = self
327            .data
328            .list_export_types(tenant, request)
329            .await
330            .map_err(LeaseError::Storage)?;
331
332        // For Group exports, resolve the member patient IDs once.
333        // When `exclude_since_newly_added` is set AND `_since` is provided,
334        // filter out patients whose `Group.member.period.start` is *after*
335        // `_since` (i.e., they joined the cohort after the client's last
336        // sync) — the IG-recommended behavior under the `exclude` toggle.
337        let group_patient_ids: Option<Vec<String>> = match &view.level {
338            ExportLevel::Group { group_id } => {
339                let ids = match (self.exclude_since_newly_added, view.request.since.as_ref()) {
340                    (true, Some(since)) => {
341                        let members = self
342                            .data
343                            .get_group_members_with_periods(tenant, group_id)
344                            .await
345                            .map_err(LeaseError::Storage)?;
346                        members
347                            .into_iter()
348                            .filter_map(|(reference, period_start)| {
349                                let pid = reference.strip_prefix("Patient/")?;
350                                // Keep members whose period.start is unknown OR
351                                // <= since (i.e., were already members at since).
352                                match period_start {
353                                    Some(start) if start > *since => None,
354                                    _ => Some(pid.to_string()),
355                                }
356                            })
357                            .collect()
358                    }
359                    _ => self
360                        .data
361                        .resolve_group_patient_ids(tenant, group_id)
362                        .await
363                        .map_err(LeaseError::Storage)?,
364                };
365                Some(ids)
366            }
367            _ => None,
368        };
369
370        let batch_size = request.batch_size.max(1);
371
372        for resource_type in &types {
373            // Resume from any persisted cursor for this type.
374            let mut cursor: Option<String> = view
375                .type_progress
376                .iter()
377                .find(|p| &p.resource_type == resource_type)
378                .and_then(|p| p.cursor_state.clone());
379            let mut exported: u64 = view
380                .type_progress
381                .iter()
382                .find(|p| &p.resource_type == resource_type)
383                .map(|p| p.exported_count)
384                .unwrap_or(0);
385            let mut part_index: u32 = 0;
386
387            loop {
388                // Cooperative cancellation check.
389                if let Ok(progress) = self.jobs.get_export_status(tenant, job_id).await {
390                    if progress.status == ExportStatus::Cancelled {
391                        return Ok(());
392                    }
393                }
394
395                let batch = match &group_patient_ids {
396                    Some(pids) => self
397                        .data
398                        .fetch_patient_compartment_batch(
399                            tenant,
400                            request,
401                            resource_type,
402                            pids,
403                            cursor.as_deref(),
404                            batch_size,
405                        )
406                        .await
407                        .map_err(LeaseError::Storage)?,
408                    None if matches!(view.level, ExportLevel::Patient)
409                        && !request.patient_refs.is_empty() =>
410                    {
411                        // Patient-level with specific patient filter: scope to
412                        // exactly the requested patients' compartments.
413                        let patient_ids: Vec<String> = request
414                            .patient_refs
415                            .iter()
416                            .map(|r| r.strip_prefix("Patient/").unwrap_or(r).to_string())
417                            .collect();
418                        self.data
419                            .fetch_patient_compartment_batch(
420                                tenant,
421                                request,
422                                resource_type,
423                                &patient_ids,
424                                cursor.as_deref(),
425                                batch_size,
426                            )
427                            .await
428                            .map_err(LeaseError::Storage)?
429                    }
430                    None if matches!(view.level, ExportLevel::Patient) => {
431                        // Patient-level without a patient filter: export all
432                        // resources of this type across the patient compartment.
433                        self.data
434                            .fetch_export_batch(
435                                tenant,
436                                request,
437                                resource_type,
438                                cursor.as_deref(),
439                                batch_size,
440                            )
441                            .await
442                            .map_err(LeaseError::Storage)?
443                    }
444                    None => self
445                        .data
446                        .fetch_export_batch(
447                            tenant,
448                            request,
449                            resource_type,
450                            cursor.as_deref(),
451                            batch_size,
452                        )
453                        .await
454                        .map_err(LeaseError::Storage)?,
455                };
456
457                if !batch.lines.is_empty() {
458                    let key = ExportPartKey::output(
459                        tenant.tenant_id().as_str(),
460                        job_id.clone(),
461                        resource_type.clone(),
462                        part_index,
463                        token,
464                    );
465                    let mut writer = self
466                        .output
467                        .open_writer(&key)
468                        .await
469                        .map_err(LeaseError::Storage)?;
470                    for line in &batch.lines {
471                        let out_line = apply_elements(line, &request.elements);
472                        writer.write_line(&out_line).await.map_err(|e| {
473                            LeaseError::Storage(StorageError::Backend(
474                                crate::error::BackendError::Internal {
475                                    backend_name: "export-worker".to_string(),
476                                    message: format!("write_line: {e}"),
477                                    source: None,
478                                },
479                            ))
480                        })?;
481                    }
482                    let finalized = self
483                        .output
484                        .finalize_part(&key, writer)
485                        .await
486                        .map_err(LeaseError::Storage)?;
487                    exported += finalized.line_count;
488                    self.jobs
489                        .record_export_file(tenant, job_id, wid, token, &finalized, "output")
490                        .await?;
491                    part_index += 1;
492                }
493
494                cursor = batch.next_cursor.clone();
495
496                // Persist progress + heartbeat after each batch.
497                let mut progress = TypeExportProgress::new(resource_type.clone());
498                progress.exported_count = exported;
499                progress.cursor_state = cursor.clone();
500                self.jobs
501                    .update_export_type_progress(tenant, job_id, wid, token, &progress)
502                    .await?;
503                self.jobs.heartbeat(lease).await?;
504
505                if batch.is_last {
506                    break;
507                }
508            }
509        }
510
511        self.jobs
512            .finish_export_job(tenant, job_id, wid, token)
513            .await?;
514        Ok(())
515    }
516}
517
518/// Applies `_elements` projection to an NDJSON line.
519///
520/// When `elements` is non-empty, keeps `resourceType`, `id`, `meta` and the
521/// listed top-level element names, and adds a `SUBSETTED` `meta.tag`. On any
522/// parse failure the original line is returned unchanged.
523fn apply_elements(line: &str, elements: &[String]) -> String {
524    if elements.is_empty() {
525        return line.to_string();
526    }
527    let Ok(serde_json::Value::Object(obj)) = serde_json::from_str::<serde_json::Value>(line) else {
528        return line.to_string();
529    };
530    let mut out = serde_json::Map::new();
531    // Always-included mandatory elements.
532    for key in ["resourceType", "id"] {
533        if let Some(v) = obj.get(key) {
534            out.insert(key.to_string(), v.clone());
535        }
536    }
537    // Requested top-level elements (strip a leading `ResourceType.` prefix).
538    for el in elements {
539        let name = el.rsplit('.').next().unwrap_or(el.as_str());
540        if let Some(v) = obj.get(name) {
541            out.insert(name.to_string(), v.clone());
542        }
543    }
544    // meta + SUBSETTED tag.
545    let mut meta = obj
546        .get("meta")
547        .and_then(|m| m.as_object().cloned())
548        .unwrap_or_default();
549    let tag = serde_json::json!({
550        "system": "http://terminology.hl7.org/CodeSystem/v3-ObservationValue",
551        "code": "SUBSETTED",
552    });
553    let tags = meta
554        .entry("tag".to_string())
555        .or_insert_with(|| serde_json::Value::Array(Vec::new()));
556    if let serde_json::Value::Array(arr) = tags {
557        arr.push(tag);
558    }
559    out.insert("meta".to_string(), serde_json::Value::Object(meta));
560    serde_json::to_string(&serde_json::Value::Object(out)).unwrap_or_else(|_| line.to_string())
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566
567    #[test]
568    fn test_apply_elements_noop_when_empty() {
569        let line = r#"{"resourceType":"Patient","id":"1","name":[]}"#;
570        assert_eq!(apply_elements(line, &[]), line);
571    }
572
573    #[test]
574    fn test_apply_elements_subsets_and_tags() {
575        let line = r#"{"resourceType":"Patient","id":"1","name":[{"family":"X"}],"gender":"male"}"#;
576        let out = apply_elements(line, &["name".to_string()]);
577        let v: serde_json::Value = serde_json::from_str(&out).unwrap();
578        assert_eq!(v["resourceType"], "Patient");
579        assert_eq!(v["id"], "1");
580        assert!(v.get("name").is_some());
581        assert!(v.get("gender").is_none());
582        assert_eq!(v["meta"]["tag"][0]["code"], "SUBSETTED");
583    }
584
585    #[cfg(feature = "sqlite")]
586    mod worker_integration {
587        use super::*;
588        use crate::backends::local_fs::LocalFsOutputStore;
589        use crate::backends::sqlite::SqliteBackend;
590        use crate::core::ResourceStorage;
591        use crate::core::bulk_export::{ExportRequest, StartExportInput};
592        use crate::tenant::{TenantContext, TenantId, TenantPermissions};
593        use chrono::Utc;
594        use std::sync::Arc;
595
596        fn tenant() -> TenantContext {
597            TenantContext::new(TenantId::new("t1"), TenantPermissions::full_access())
598        }
599
600        #[tokio::test]
601        async fn test_run_job_system_export_end_to_end() {
602            let backend = Arc::new(SqliteBackend::in_memory().unwrap());
603            backend.init_schema().unwrap();
604            let tenant = tenant();
605
606            for i in 0..3 {
607                backend
608                    .create(
609                        &tenant,
610                        "Patient",
611                        serde_json::json!({"resourceType": "Patient", "id": format!("p{i}")}),
612                        helios_fhir::FhirVersion::default(),
613                    )
614                    .await
615                    .unwrap();
616            }
617
618            let tmp = tempfile::tempdir().unwrap();
619            let output = Arc::new(LocalFsOutputStore::new(tmp.path(), "http://localhost:8080"));
620
621            let job_id = backend
622                .start_export(
623                    &tenant,
624                    StartExportInput {
625                        request: ExportRequest::system()
626                            .with_types(vec!["Patient".to_string()])
627                            .with_batch_size(2),
628                        transaction_time: Utc::now(),
629                        request_url: "http://localhost/$export".to_string(),
630                        owner_subject: Some("sub".to_string()),
631                        fhir_version: helios_fhir::FhirVersion::default(),
632                    },
633                )
634                .await
635                .unwrap();
636
637            let worker_id = WorkerId::new("w1");
638            let worker = DefaultExportWorker::new(
639                Arc::clone(&backend),
640                Arc::clone(&backend),
641                Arc::clone(&output),
642                worker_id.clone(),
643            );
644
645            let lease = backend
646                .claim_next(&worker_id, Duration::from_secs(60))
647                .await
648                .unwrap()
649                .expect("job claimable");
650            assert_eq!(lease.job_id, job_id);
651
652            worker.run_job(lease).await.unwrap();
653
654            let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
655            assert_eq!(progress.status, ExportStatus::Complete);
656
657            let manifest = backend.get_export_manifest(&tenant, &job_id).await.unwrap();
658            let total: u64 = manifest.output.iter().map(|e| e.count).sum();
659            assert_eq!(total, 3);
660        }
661    }
662}