use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crate::core::bulk_export::{
BulkExportStorage, ExportDataProvider, ExportJobId, ExportLevel, ExportRequest, ExportStatus,
GroupExportProvider, PatientExportProvider, TypeExportProgress,
};
use crate::core::bulk_export_output::{ExportOutputStore, ExportPartKey, FinalizedPart};
use crate::error::{StorageError, StorageResult};
use crate::tenant::TenantContext;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct WorkerId(String);
impl WorkerId {
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn random() -> Self {
Self(uuid::Uuid::new_v4().to_string())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for WorkerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct ExportJobLease {
pub job_id: ExportJobId,
pub tenant: TenantContext,
pub worker_id: WorkerId,
pub lease_expiry: DateTime<Utc>,
pub fencing_token: u64,
}
#[derive(Debug)]
pub enum LeaseError {
LeaseLost {
job_id: ExportJobId,
},
Storage(StorageError),
}
impl std::fmt::Display for LeaseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::LeaseLost { job_id } => {
write!(
f,
"export job {job_id} lease lost (reclaimed by another worker)"
)
}
Self::Storage(e) => write!(f, "storage error: {e}"),
}
}
}
impl std::error::Error for LeaseError {}
impl From<StorageError> for LeaseError {
fn from(e: StorageError) -> Self {
Self::Storage(e)
}
}
#[derive(Debug, Clone)]
pub struct WorkerJobView {
pub request: ExportRequest,
pub level: ExportLevel,
pub transaction_time: DateTime<Utc>,
pub fhir_version: helios_fhir::FhirVersion,
pub type_progress: Vec<TypeExportProgress>,
}
#[async_trait]
pub trait ExportClaimStrategy: Send + Sync {
async fn claim_next(
&self,
worker_id: &WorkerId,
lease_duration: Duration,
) -> StorageResult<Option<ExportJobLease>>;
async fn heartbeat(&self, lease: &ExportJobLease) -> Result<DateTime<Utc>, LeaseError>;
async fn release(&self, lease: ExportJobLease) -> StorageResult<()>;
}
#[async_trait]
pub trait ExportWorkerStorage: Send + Sync {
async fn get_export_job_for_worker(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
worker_id: &WorkerId,
fencing_token: u64,
) -> Result<WorkerJobView, LeaseError>;
async fn mark_export_in_progress(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
worker_id: &WorkerId,
fencing_token: u64,
) -> Result<(), LeaseError>;
async fn update_export_type_progress(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
worker_id: &WorkerId,
fencing_token: u64,
progress: &TypeExportProgress,
) -> Result<(), LeaseError>;
async fn record_export_file(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
worker_id: &WorkerId,
fencing_token: u64,
part: &FinalizedPart,
file_type: &str,
) -> Result<(), LeaseError>;
async fn finish_export_job(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
worker_id: &WorkerId,
fencing_token: u64,
) -> Result<(), LeaseError>;
async fn fail_export_job(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
worker_id: &WorkerId,
fencing_token: u64,
error_message: &str,
) -> Result<(), LeaseError>;
}
pub trait BulkExportJobStore:
BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy
{
}
impl<T> BulkExportJobStore for T where
T: BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy
{
}
pub trait ExportResourceProvider:
ExportDataProvider + PatientExportProvider + GroupExportProvider
{
}
impl<T> ExportResourceProvider for T where
T: ExportDataProvider + PatientExportProvider + GroupExportProvider
{
}
pub struct DefaultExportWorker<Js: ?Sized, Dp: ?Sized, Os: ?Sized> {
pub jobs: Arc<Js>,
pub data: Arc<Dp>,
pub output: Arc<Os>,
pub worker_id: WorkerId,
pub exclude_since_newly_added: bool,
}
impl<Js, Dp, Os> DefaultExportWorker<Js, Dp, Os>
where
Js: BulkExportJobStore + ?Sized,
Dp: ExportResourceProvider + ?Sized,
Os: ExportOutputStore + ?Sized,
{
pub fn new(jobs: Arc<Js>, data: Arc<Dp>, output: Arc<Os>, worker_id: WorkerId) -> Self {
Self {
jobs,
data,
output,
worker_id,
exclude_since_newly_added: false,
}
}
pub fn with_exclude_since_newly_added(mut self, exclude: bool) -> Self {
self.exclude_since_newly_added = exclude;
self
}
pub async fn run_job(&self, lease: ExportJobLease) -> StorageResult<()> {
match self.run_job_inner(&lease).await {
Ok(()) => Ok(()),
Err(LeaseError::LeaseLost { .. }) => {
Ok(())
}
Err(LeaseError::Storage(e)) => {
let _ = self
.jobs
.fail_export_job(
&lease.tenant,
&lease.job_id,
&lease.worker_id,
lease.fencing_token,
&e.to_string(),
)
.await;
Err(e)
}
}
}
async fn run_job_inner(&self, lease: &ExportJobLease) -> Result<(), LeaseError> {
let tenant = &lease.tenant;
let job_id = &lease.job_id;
let wid = &lease.worker_id;
let token = lease.fencing_token;
let view = self
.jobs
.get_export_job_for_worker(tenant, job_id, wid, token)
.await?;
self.jobs
.mark_export_in_progress(tenant, job_id, wid, token)
.await?;
let request = &view.request;
let types = self
.data
.list_export_types(tenant, request)
.await
.map_err(LeaseError::Storage)?;
let group_patient_ids: Option<Vec<String>> = match &view.level {
ExportLevel::Group { group_id } => {
let ids = match (self.exclude_since_newly_added, view.request.since.as_ref()) {
(true, Some(since)) => {
let members = self
.data
.get_group_members_with_periods(tenant, group_id)
.await
.map_err(LeaseError::Storage)?;
members
.into_iter()
.filter_map(|(reference, period_start)| {
let pid = reference.strip_prefix("Patient/")?;
match period_start {
Some(start) if start > *since => None,
_ => Some(pid.to_string()),
}
})
.collect()
}
_ => self
.data
.resolve_group_patient_ids(tenant, group_id)
.await
.map_err(LeaseError::Storage)?,
};
Some(ids)
}
_ => None,
};
let batch_size = request.batch_size.max(1);
for resource_type in &types {
let mut cursor: Option<String> = view
.type_progress
.iter()
.find(|p| &p.resource_type == resource_type)
.and_then(|p| p.cursor_state.clone());
let mut exported: u64 = view
.type_progress
.iter()
.find(|p| &p.resource_type == resource_type)
.map(|p| p.exported_count)
.unwrap_or(0);
let mut part_index: u32 = 0;
loop {
if let Ok(progress) = self.jobs.get_export_status(tenant, job_id).await {
if progress.status == ExportStatus::Cancelled {
return Ok(());
}
}
let batch = match &group_patient_ids {
Some(pids) => self
.data
.fetch_patient_compartment_batch(
tenant,
request,
resource_type,
pids,
cursor.as_deref(),
batch_size,
)
.await
.map_err(LeaseError::Storage)?,
None if matches!(view.level, ExportLevel::Patient)
&& !request.patient_refs.is_empty() =>
{
let patient_ids: Vec<String> = request
.patient_refs
.iter()
.map(|r| r.strip_prefix("Patient/").unwrap_or(r).to_string())
.collect();
self.data
.fetch_patient_compartment_batch(
tenant,
request,
resource_type,
&patient_ids,
cursor.as_deref(),
batch_size,
)
.await
.map_err(LeaseError::Storage)?
}
None if matches!(view.level, ExportLevel::Patient) => {
self.data
.fetch_export_batch(
tenant,
request,
resource_type,
cursor.as_deref(),
batch_size,
)
.await
.map_err(LeaseError::Storage)?
}
None => self
.data
.fetch_export_batch(
tenant,
request,
resource_type,
cursor.as_deref(),
batch_size,
)
.await
.map_err(LeaseError::Storage)?,
};
if !batch.lines.is_empty() {
let key = ExportPartKey::output(
tenant.tenant_id().as_str(),
job_id.clone(),
resource_type.clone(),
part_index,
token,
);
let mut writer = self
.output
.open_writer(&key)
.await
.map_err(LeaseError::Storage)?;
for line in &batch.lines {
let out_line = apply_elements(line, &request.elements);
writer.write_line(&out_line).await.map_err(|e| {
LeaseError::Storage(StorageError::Backend(
crate::error::BackendError::Internal {
backend_name: "export-worker".to_string(),
message: format!("write_line: {e}"),
source: None,
},
))
})?;
}
let finalized = self
.output
.finalize_part(&key, writer)
.await
.map_err(LeaseError::Storage)?;
exported += finalized.line_count;
self.jobs
.record_export_file(tenant, job_id, wid, token, &finalized, "output")
.await?;
part_index += 1;
}
cursor = batch.next_cursor.clone();
let mut progress = TypeExportProgress::new(resource_type.clone());
progress.exported_count = exported;
progress.cursor_state = cursor.clone();
self.jobs
.update_export_type_progress(tenant, job_id, wid, token, &progress)
.await?;
self.jobs.heartbeat(lease).await?;
if batch.is_last {
break;
}
}
}
self.jobs
.finish_export_job(tenant, job_id, wid, token)
.await?;
Ok(())
}
}
fn apply_elements(line: &str, elements: &[String]) -> String {
if elements.is_empty() {
return line.to_string();
}
let Ok(serde_json::Value::Object(obj)) = serde_json::from_str::<serde_json::Value>(line) else {
return line.to_string();
};
let mut out = serde_json::Map::new();
for key in ["resourceType", "id"] {
if let Some(v) = obj.get(key) {
out.insert(key.to_string(), v.clone());
}
}
for el in elements {
let name = el.rsplit('.').next().unwrap_or(el.as_str());
if let Some(v) = obj.get(name) {
out.insert(name.to_string(), v.clone());
}
}
let mut meta = obj
.get("meta")
.and_then(|m| m.as_object().cloned())
.unwrap_or_default();
let tag = serde_json::json!({
"system": "http://terminology.hl7.org/CodeSystem/v3-ObservationValue",
"code": "SUBSETTED",
});
let tags = meta
.entry("tag".to_string())
.or_insert_with(|| serde_json::Value::Array(Vec::new()));
if let serde_json::Value::Array(arr) = tags {
arr.push(tag);
}
out.insert("meta".to_string(), serde_json::Value::Object(meta));
serde_json::to_string(&serde_json::Value::Object(out)).unwrap_or_else(|_| line.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_apply_elements_noop_when_empty() {
let line = r#"{"resourceType":"Patient","id":"1","name":[]}"#;
assert_eq!(apply_elements(line, &[]), line);
}
#[test]
fn test_apply_elements_subsets_and_tags() {
let line = r#"{"resourceType":"Patient","id":"1","name":[{"family":"X"}],"gender":"male"}"#;
let out = apply_elements(line, &["name".to_string()]);
let v: serde_json::Value = serde_json::from_str(&out).unwrap();
assert_eq!(v["resourceType"], "Patient");
assert_eq!(v["id"], "1");
assert!(v.get("name").is_some());
assert!(v.get("gender").is_none());
assert_eq!(v["meta"]["tag"][0]["code"], "SUBSETTED");
}
#[cfg(feature = "sqlite")]
mod worker_integration {
use super::*;
use crate::backends::local_fs::LocalFsOutputStore;
use crate::backends::sqlite::SqliteBackend;
use crate::core::ResourceStorage;
use crate::core::bulk_export::{ExportRequest, StartExportInput};
use crate::tenant::{TenantContext, TenantId, TenantPermissions};
use chrono::Utc;
use std::sync::Arc;
fn tenant() -> TenantContext {
TenantContext::new(TenantId::new("t1"), TenantPermissions::full_access())
}
#[tokio::test]
async fn test_run_job_system_export_end_to_end() {
let backend = Arc::new(SqliteBackend::in_memory().unwrap());
backend.init_schema().unwrap();
let tenant = tenant();
for i in 0..3 {
backend
.create(
&tenant,
"Patient",
serde_json::json!({"resourceType": "Patient", "id": format!("p{i}")}),
helios_fhir::FhirVersion::default(),
)
.await
.unwrap();
}
let tmp = tempfile::tempdir().unwrap();
let output = Arc::new(LocalFsOutputStore::new(tmp.path(), "http://localhost:8080"));
let job_id = backend
.start_export(
&tenant,
StartExportInput {
request: ExportRequest::system()
.with_types(vec!["Patient".to_string()])
.with_batch_size(2),
transaction_time: Utc::now(),
request_url: "http://localhost/$export".to_string(),
owner_subject: Some("sub".to_string()),
fhir_version: helios_fhir::FhirVersion::default(),
},
)
.await
.unwrap();
let worker_id = WorkerId::new("w1");
let worker = DefaultExportWorker::new(
Arc::clone(&backend),
Arc::clone(&backend),
Arc::clone(&output),
worker_id.clone(),
);
let lease = backend
.claim_next(&worker_id, Duration::from_secs(60))
.await
.unwrap()
.expect("job claimable");
assert_eq!(lease.job_id, job_id);
worker.run_job(lease).await.unwrap();
let progress = backend.get_export_status(&tenant, &job_id).await.unwrap();
assert_eq!(progress.status, ExportStatus::Complete);
let manifest = backend.get_export_manifest(&tenant, &job_id).await.unwrap();
let total: u64 = manifest.output.iter().map(|e| e.count).sum();
assert_eq!(total, 3);
}
}
}