1use std::sync::Arc;
13use std::time::Duration;
14
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use helios_fhir::FhirVersion;
18use serde_json::{Value, json};
19
20use crate::core::bulk_export_output::{ExportOutputStore, ExportPartKey};
21use crate::core::bulk_export_worker::{LeaseError, WorkerId};
22use crate::core::bulk_submit::{
23 BulkEntryOutcome, BulkProcessingOptions, BulkSubmitProvider, BulkSubmitRollbackProvider,
24 StreamingBulkSubmitProvider, SubmissionId,
25};
26use crate::core::bulk_submit_input::{SubmitInputFetcher, submission_output_job_id};
27use crate::error::StorageResult;
28use crate::tenant::TenantContext;
29
30#[derive(Debug, Clone)]
36pub struct ManifestLease {
37 pub tenant: TenantContext,
39 pub submission_id: SubmissionId,
41 pub manifest_id: String,
43 pub worker_id: WorkerId,
45 pub lease_expiry: DateTime<Utc>,
47 pub fencing_token: u64,
49}
50
51#[derive(Debug, Clone)]
53pub struct ManifestWorkerView {
54 pub manifest_id: String,
56 pub manifest_url: Option<String>,
58 pub fhir_base_url: Option<String>,
60 pub output_format: Option<String>,
62 pub file_request_headers: Vec<(String, String)>,
64 pub oauth_metadata_urls: Vec<String>,
66 pub file_encryption_key: Option<Value>,
68 pub last_processed_line: u64,
70 pub fhir_version: FhirVersion,
72}
73
74#[derive(Debug, Clone)]
76pub struct SubmitFileRecord {
77 pub manifest_url: Option<String>,
79 pub file_type: String,
81 pub resource_type: Option<String>,
83 pub part_index: u32,
85 pub file_path: String,
87 pub line_count: u64,
89 pub byte_count: u64,
91 pub count_severity: Option<Value>,
93}
94
95#[derive(Debug, Clone)]
97pub struct SubmitFileRow {
98 pub manifest_url: Option<String>,
100 pub file_type: String,
102 pub resource_type: Option<String>,
104 pub part_index: u32,
106 pub fencing_token: u64,
108 pub file_path: String,
110 pub line_count: u64,
112 pub byte_count: u64,
114 pub count_severity: Option<Value>,
116}
117
118#[derive(Debug, Clone)]
120pub struct PollTokenTarget {
121 pub tenant: TenantContext,
123 pub submission_id: SubmissionId,
125 pub owner_subject: Option<String>,
127}
128
129#[async_trait]
134pub trait SubmitClaimStrategy: Send + Sync {
135 async fn claim_next_manifest(
139 &self,
140 worker_id: &WorkerId,
141 lease_duration: Duration,
142 ) -> StorageResult<Option<ManifestLease>>;
143
144 async fn heartbeat(&self, lease: &ManifestLease) -> Result<DateTime<Utc>, LeaseError>;
147
148 async fn release(&self, lease: ManifestLease) -> StorageResult<()>;
150}
151
152#[async_trait]
159pub trait SubmitWorkerStorage: Send + Sync {
160 async fn get_manifest_for_worker(
162 &self,
163 lease: &ManifestLease,
164 ) -> Result<ManifestWorkerView, LeaseError>;
165
166 async fn mark_manifest_processing(&self, lease: &ManifestLease) -> Result<(), LeaseError>;
168
169 async fn update_manifest_progress(
171 &self,
172 lease: &ManifestLease,
173 processed_entries: u64,
174 failed_entries: u64,
175 last_processed_line: u64,
176 ) -> Result<(), LeaseError>;
177
178 async fn record_submit_file(
180 &self,
181 lease: &ManifestLease,
182 file: &SubmitFileRecord,
183 ) -> Result<(), LeaseError>;
184
185 async fn finish_manifest(&self, lease: &ManifestLease) -> Result<(), LeaseError>;
187
188 async fn fail_manifest(
190 &self,
191 lease: &ManifestLease,
192 error_message: &str,
193 ) -> Result<(), LeaseError>;
194
195 #[allow(clippy::too_many_arguments)]
203 async fn set_manifest_fetch_params(
204 &self,
205 tenant: &TenantContext,
206 id: &SubmissionId,
207 manifest_id: &str,
208 fhir_base_url: Option<&str>,
209 output_format: Option<&str>,
210 file_request_headers: &[(String, String)],
211 oauth_metadata_urls: &[String],
212 file_encryption_key: Option<&Value>,
213 ) -> StorageResult<()>;
214
215 async fn replace_manifest_by_url(
219 &self,
220 tenant: &TenantContext,
221 id: &SubmissionId,
222 manifest_url: &str,
223 ) -> StorageResult<Vec<String>>;
224
225 async fn set_submission_kickoff_meta(
227 &self,
228 tenant: &TenantContext,
229 id: &SubmissionId,
230 owner_subject: Option<&str>,
231 request_url: &str,
232 requires_access_token: bool,
233 ) -> StorageResult<()>;
234
235 async fn ensure_poll_token(
238 &self,
239 tenant: &TenantContext,
240 id: &SubmissionId,
241 ) -> StorageResult<String>;
242
243 async fn resolve_poll_token(&self, token: &str) -> StorageResult<Option<PollTokenTarget>>;
245
246 async fn clear_poll_token(
248 &self,
249 tenant: &TenantContext,
250 id: &SubmissionId,
251 ) -> StorageResult<()>;
252
253 async fn list_submit_files(
255 &self,
256 tenant: &TenantContext,
257 id: &SubmissionId,
258 ) -> StorageResult<Vec<SubmitFileRow>>;
259
260 async fn delete_submission_artifacts(
268 &self,
269 tenant: &TenantContext,
270 id: &SubmissionId,
271 ) -> StorageResult<()>;
272
273 async fn count_active_submissions(&self, tenant: &TenantContext) -> StorageResult<u64>;
275
276 async fn list_expired_submissions(
281 &self,
282 now: DateTime<Utc>,
283 ttl: Duration,
284 limit: u32,
285 ) -> StorageResult<Vec<(TenantContext, SubmissionId)>>;
286
287 async fn ensure_transaction_time(
290 &self,
291 tenant: &TenantContext,
292 id: &SubmissionId,
293 ) -> StorageResult<DateTime<Utc>>;
294}
295
296pub trait BulkSubmitJobStore:
301 BulkSubmitProvider
302 + StreamingBulkSubmitProvider
303 + BulkSubmitRollbackProvider
304 + SubmitWorkerStorage
305 + SubmitClaimStrategy
306{
307}
308
309impl<T> BulkSubmitJobStore for T where
310 T: BulkSubmitProvider
311 + StreamingBulkSubmitProvider
312 + BulkSubmitRollbackProvider
313 + SubmitWorkerStorage
314 + SubmitClaimStrategy
315{
316}
317
318pub struct DefaultSubmitWorker<Js: ?Sized, Fetcher: ?Sized, Os: ?Sized> {
326 jobs: Arc<Js>,
327 fetcher: Arc<Fetcher>,
328 output: Arc<Os>,
329 #[allow(dead_code)]
330 worker_id: WorkerId,
331}
332
333impl<Js, Fetcher, Os> DefaultSubmitWorker<Js, Fetcher, Os>
334where
335 Js: BulkSubmitJobStore + ?Sized,
336 Fetcher: SubmitInputFetcher + ?Sized,
337 Os: ExportOutputStore + ?Sized,
338{
339 pub fn new(jobs: Arc<Js>, fetcher: Arc<Fetcher>, output: Arc<Os>, worker_id: WorkerId) -> Self {
341 Self {
342 jobs,
343 fetcher,
344 output,
345 worker_id,
346 }
347 }
348
349 pub async fn run_job(&self, lease: ManifestLease) -> StorageResult<()> {
355 let view = match self.jobs.get_manifest_for_worker(&lease).await {
356 Ok(v) => v,
357 Err(LeaseError::LeaseLost { .. }) => return Ok(()),
358 Err(LeaseError::Storage(e)) => return Err(e),
359 };
360 if let Err(LeaseError::Storage(e)) = self.jobs.mark_manifest_processing(&lease).await {
361 return Err(e);
362 }
363
364 let Some(manifest_url) = view.manifest_url.clone() else {
365 let _ = self.jobs.finish_manifest(&lease).await;
367 return Ok(());
368 };
369
370 let manifest = match self
372 .fetcher
373 .fetch_manifest(
374 &manifest_url,
375 &view.file_request_headers,
376 &view.oauth_metadata_urls,
377 )
378 .await
379 {
380 Ok(m) => m,
381 Err(e) => {
382 self.record_manifest_error(
383 &lease,
384 &manifest_url,
385 &format!("failed to fetch manifest: {e}"),
386 )
387 .await?;
388 let _ = self.jobs.fail_manifest(&lease, &e.to_string()).await;
389 return Ok(());
390 }
391 };
392
393 let opts = BulkProcessingOptions::new();
394 let mut processed: u64 = 0;
395 let mut failed: u64 = 0;
396
397 for file in &manifest.output {
399 if let Err(LeaseError::LeaseLost { .. }) = self.jobs.heartbeat(&lease).await {
400 return Ok(());
401 }
402 let resource_type = file
403 .resource_type
404 .clone()
405 .unwrap_or_else(|| "Resource".into());
406 let stream = match self
407 .fetcher
408 .open_file_stream(
409 &file.url,
410 &view.file_request_headers,
411 manifest.requires_access_token,
412 &view.oauth_metadata_urls,
413 view.file_encryption_key.as_ref(),
414 )
415 .await
416 {
417 Ok(s) => s,
418 Err(e) => {
419 self.record_manifest_error(
420 &lease,
421 &manifest_url,
422 &format!("failed to fetch file {}: {e}", file.url),
423 )
424 .await?;
425 failed += 1;
426 continue;
427 }
428 };
429
430 match self
431 .jobs
432 .process_ndjson_stream(
433 &lease.tenant,
434 &lease.submission_id,
435 &lease.manifest_id,
436 &resource_type,
437 stream,
438 &opts,
439 )
440 .await
441 {
442 Ok(result) => {
443 processed += result.counts.success + result.counts.skipped;
444 failed += result.counts.error_count();
445 }
446 Err(e) => {
447 self.record_manifest_error(
448 &lease,
449 &manifest_url,
450 &format!("failed to ingest file {}: {e}", file.url),
451 )
452 .await?;
453 failed += 1;
454 }
455 }
456
457 if let Err(LeaseError::Storage(e)) = self
458 .jobs
459 .update_manifest_progress(&lease, processed, failed, processed + failed)
460 .await
461 {
462 return Err(e);
463 }
464 }
465
466 let mut deleted_refs: Vec<String> = Vec::new();
468 for file in &manifest.deleted {
469 if let Err(LeaseError::LeaseLost { .. }) = self.jobs.heartbeat(&lease).await {
470 return Ok(());
471 }
472 match self
473 .fetcher
474 .open_file_stream(
475 &file.url,
476 &view.file_request_headers,
477 manifest.requires_access_token,
478 &view.oauth_metadata_urls,
479 view.file_encryption_key.as_ref(),
480 )
481 .await
482 {
483 Ok(reader) => {
484 self.process_deleted_stream(&lease, reader, &mut deleted_refs)
485 .await;
486 }
487 Err(e) => {
488 self.record_manifest_error(
489 &lease,
490 &manifest_url,
491 &format!("failed to fetch deleted file {}: {e}", file.url),
492 )
493 .await?;
494 }
495 }
496 }
497 if !deleted_refs.is_empty() {
498 self.write_deleted_artifact(&lease, &manifest_url, &deleted_refs)
499 .await?;
500 }
501
502 self.write_result_artifacts(&lease, &manifest_url, view.fhir_version, failed)
504 .await?;
505
506 if let Err(LeaseError::Storage(e)) = self.jobs.finish_manifest(&lease).await {
508 return Err(e);
509 }
510 Ok(())
511 }
512
513 async fn write_result_artifacts(
516 &self,
517 lease: &ManifestLease,
518 manifest_url: &str,
519 _fhir_version: FhirVersion,
520 failed_count: u64,
521 ) -> StorageResult<()> {
522 let job_id = submission_output_job_id(&lease.submission_id);
523 let tenant_id = lease.tenant.tenant_id().as_str().to_string();
524
525 let mut all = Vec::new();
527 let limit = 1000u32;
528 let mut offset = 0u32;
529 loop {
530 let batch = self
531 .jobs
532 .get_entry_results(
533 &lease.tenant,
534 &lease.submission_id,
535 &lease.manifest_id,
536 None,
537 limit,
538 offset,
539 )
540 .await?;
541 let n = batch.len() as u32;
542 all.extend(batch);
543 if n < limit {
544 break;
545 }
546 offset += limit;
547 }
548
549 let mut by_type: std::collections::BTreeMap<String, Vec<String>> =
551 std::collections::BTreeMap::new();
552 let mut error_lines: Vec<String> = Vec::new();
553 let mut severity: std::collections::BTreeMap<String, u64> =
554 std::collections::BTreeMap::new();
555
556 for entry in &all {
557 match entry.outcome {
558 BulkEntryOutcome::Success => {
559 if let Some(id) = &entry.resource_id {
560 let line = json!({"reference": format!("{}/{}", entry.resource_type, id)})
561 .to_string();
562 by_type
563 .entry(entry.resource_type.clone())
564 .or_default()
565 .push(line);
566 }
567 }
568 BulkEntryOutcome::ValidationError | BulkEntryOutcome::ProcessingError => {
569 let oo = entry
570 .operation_outcome
571 .clone()
572 .unwrap_or_else(|| default_error_outcome(entry));
573 tally_severity(&oo, &mut severity);
574 error_lines.push(oo.to_string());
575 }
576 BulkEntryOutcome::Skipped => {}
577 }
578 }
579
580 let recorded_errors = error_lines.len() as u64;
585 if failed_count > recorded_errors {
586 let uncaptured = failed_count - recorded_errors;
587 let oo = json!({
588 "resourceType": "OperationOutcome",
589 "issue": [{
590 "severity": "error",
591 "code": "processing",
592 "diagnostics": format!(
593 "{uncaptured} submitted resource(s) could not be parsed or did not \
594 match the declared resource type"
595 )
596 }]
597 });
598 tally_severity(&oo, &mut severity);
599 error_lines.push(oo.to_string());
600 }
601
602 for (idx, (resource_type, lines)) in by_type.iter().enumerate() {
604 let key = ExportPartKey::output(
605 tenant_id.clone(),
606 job_id.clone(),
607 resource_type.clone(),
608 idx as u32,
609 lease.fencing_token,
610 );
611 let part = self.write_part(&key, lines).await?;
612 self.jobs
613 .record_submit_file(
614 lease,
615 &SubmitFileRecord {
616 manifest_url: Some(manifest_url.to_string()),
617 file_type: "output".to_string(),
618 resource_type: Some(resource_type.clone()),
619 part_index: idx as u32,
620 file_path: key.part_segment(),
621 line_count: part.0,
622 byte_count: part.1,
623 count_severity: None,
624 },
625 )
626 .await
627 .map_err(lease_err_to_storage)?;
628 }
629
630 if !error_lines.is_empty() {
632 let key = ExportPartKey::error(
633 tenant_id.clone(),
634 job_id.clone(),
635 "OperationOutcome",
636 0,
637 lease.fencing_token,
638 );
639 let part = self.write_part(&key, &error_lines).await?;
640 let count_severity = Value::Object(
641 severity
642 .into_iter()
643 .map(|(k, v)| (k, Value::from(v)))
644 .collect(),
645 );
646 self.jobs
647 .record_submit_file(
648 lease,
649 &SubmitFileRecord {
650 manifest_url: Some(manifest_url.to_string()),
651 file_type: "error".to_string(),
652 resource_type: Some("OperationOutcome".to_string()),
653 part_index: 0,
654 file_path: key.part_segment(),
655 line_count: part.0,
656 byte_count: part.1,
657 count_severity: Some(count_severity),
658 },
659 )
660 .await
661 .map_err(lease_err_to_storage)?;
662 }
663 Ok(())
664 }
665
666 async fn process_deleted_stream(
669 &self,
670 lease: &ManifestLease,
671 reader: Box<dyn tokio::io::AsyncBufRead + Send + Unpin>,
672 refs: &mut Vec<String>,
673 ) {
674 use tokio::io::AsyncBufReadExt;
675 let mut lines = reader.lines();
676 while let Ok(Some(line)) = lines.next_line().await {
677 let line = line.trim();
678 if line.is_empty() {
679 continue;
680 }
681 let Ok(val) = serde_json::from_str::<Value>(line) else {
682 continue;
683 };
684 if val.get("resourceType").and_then(|v| v.as_str()) == Some("Bundle") {
685 if let Some(entries) = val.get("entry").and_then(|e| e.as_array()) {
686 for e in entries {
687 if let Some(url) = e
688 .get("request")
689 .and_then(|r| r.get("url"))
690 .and_then(|u| u.as_str())
691 {
692 if let Some((ty, id)) = url.split_once('/') {
693 if self.jobs.delete(&lease.tenant, ty, id).await.is_ok() {
694 refs.push(format!("{ty}/{id}"));
695 }
696 }
697 }
698 }
699 }
700 } else if let (Some(ty), Some(id)) = (
701 val.get("resourceType").and_then(|v| v.as_str()),
702 val.get("id").and_then(|v| v.as_str()),
703 ) {
704 if self.jobs.delete(&lease.tenant, ty, id).await.is_ok() {
705 refs.push(format!("{ty}/{id}"));
706 }
707 }
708 }
709 }
710
711 async fn write_deleted_artifact(
713 &self,
714 lease: &ManifestLease,
715 manifest_url: &str,
716 refs: &[String],
717 ) -> StorageResult<()> {
718 let job_id = submission_output_job_id(&lease.submission_id);
719 let tenant_id = lease.tenant.tenant_id().as_str().to_string();
720 let lines: Vec<String> = refs
721 .iter()
722 .map(|r| json!({ "reference": r }).to_string())
723 .collect();
724 let key = ExportPartKey {
725 tenant_id,
726 job_id,
727 resource_type: "Bundle".to_string(),
728 file_type: "deleted".to_string(),
729 part_index: 0,
730 fencing_token: lease.fencing_token,
731 };
732 let (line_count, byte_count) = self.write_part(&key, &lines).await?;
733 self.jobs
734 .record_submit_file(
735 lease,
736 &SubmitFileRecord {
737 manifest_url: Some(manifest_url.to_string()),
738 file_type: "deleted".to_string(),
739 resource_type: Some("Bundle".to_string()),
740 part_index: 0,
741 file_path: key.part_segment(),
742 line_count,
743 byte_count,
744 count_severity: None,
745 },
746 )
747 .await
748 .map_err(lease_err_to_storage)?;
749 Ok(())
750 }
751
752 async fn record_manifest_error(
754 &self,
755 lease: &ManifestLease,
756 manifest_url: &str,
757 message: &str,
758 ) -> StorageResult<()> {
759 let job_id = submission_output_job_id(&lease.submission_id);
760 let tenant_id = lease.tenant.tenant_id().as_str().to_string();
761 let oo = json!({
762 "resourceType": "OperationOutcome",
763 "issue": [{
764 "severity": "error",
765 "code": "processing",
766 "diagnostics": message
767 }]
768 })
769 .to_string();
770 let key = ExportPartKey::error(
773 tenant_id,
774 job_id,
775 "OperationOutcome",
776 1_000_000 + (manifest_url.len() as u32 % 1000),
777 lease.fencing_token,
778 );
779 let part = self.write_part(&key, std::slice::from_ref(&oo)).await?;
780 self.jobs
781 .record_submit_file(
782 lease,
783 &SubmitFileRecord {
784 manifest_url: Some(manifest_url.to_string()),
785 file_type: "error".to_string(),
786 resource_type: Some("OperationOutcome".to_string()),
787 part_index: key.part_index,
788 file_path: key.part_segment(),
789 line_count: part.0,
790 byte_count: part.1,
791 count_severity: Some(json!({"error": 1})),
792 },
793 )
794 .await
795 .map_err(lease_err_to_storage)?;
796 Ok(())
797 }
798
799 async fn write_part(&self, key: &ExportPartKey, lines: &[String]) -> StorageResult<(u64, u64)> {
801 let mut writer = self.output.open_writer(key).await?;
802 for line in lines {
803 writer.write_line(line).await.map_err(|e| {
804 crate::error::StorageError::Backend(crate::error::BackendError::Internal {
805 backend_name: "bulk-submit-output".to_string(),
806 message: format!("write artifact: {e}"),
807 source: None,
808 })
809 })?;
810 }
811 let finalized = self.output.finalize_part(key, writer).await?;
812 Ok((finalized.line_count, finalized.size_bytes))
813 }
814}
815
816fn lease_err_to_storage(e: LeaseError) -> crate::error::StorageError {
818 match e {
819 LeaseError::Storage(s) => s,
820 LeaseError::LeaseLost { job_id } => {
821 crate::error::StorageError::Backend(crate::error::BackendError::Internal {
822 backend_name: "bulk-submit".to_string(),
823 message: format!("lease lost for {job_id}"),
824 source: None,
825 })
826 }
827 }
828}
829
830fn default_error_outcome(entry: &crate::core::bulk_submit::BulkEntryResult) -> Value {
832 json!({
833 "resourceType": "OperationOutcome",
834 "issue": [{
835 "severity": "error",
836 "code": "processing",
837 "diagnostics": format!(
838 "{} error on {} line {}",
839 entry.outcome, entry.resource_type, entry.line_number
840 )
841 }]
842 })
843}
844
845fn tally_severity(oo: &Value, acc: &mut std::collections::BTreeMap<String, u64>) {
847 if let Some(issues) = oo.get("issue").and_then(|v| v.as_array()) {
848 for issue in issues {
849 let sev = issue
850 .get("severity")
851 .and_then(|v| v.as_str())
852 .unwrap_or("error");
853 *acc.entry(sev.to_string()).or_insert(0) += 1;
854 }
855 } else {
856 *acc.entry("error".to_string()).or_insert(0) += 1;
857 }
858}
859
860#[cfg(test)]
861mod tests {
862 use super::*;
863 use crate::backends::local_fs::LocalFsOutputStore;
864 use crate::backends::sqlite::SqliteBackend;
865 use crate::core::bulk_submit::BulkSubmitProvider;
866 use crate::core::bulk_submit_input::{RemoteFile, RemoteManifest};
867 use crate::tenant::{TenantContext, TenantId, TenantPermissions};
868 use std::time::Duration as StdDuration;
869
870 struct MockFetcher {
872 files: std::collections::HashMap<String, Vec<u8>>,
873 manifest: RemoteManifest,
874 }
875
876 #[async_trait]
877 impl SubmitInputFetcher for MockFetcher {
878 async fn fetch_manifest(
879 &self,
880 _url: &str,
881 _headers: &[(String, String)],
882 _oauth: &[String],
883 ) -> StorageResult<RemoteManifest> {
884 Ok(self.manifest.clone())
885 }
886
887 async fn open_file_stream(
888 &self,
889 url: &str,
890 _headers: &[(String, String)],
891 _requires_access_token: bool,
892 _oauth: &[String],
893 _encryption_key: Option<&Value>,
894 ) -> StorageResult<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>> {
895 let data = self.files.get(url).cloned().unwrap_or_default();
896 Ok(Box::new(tokio::io::BufReader::new(std::io::Cursor::new(
897 data,
898 ))))
899 }
900 }
901
902 fn tenant() -> TenantContext {
903 TenantContext::new(TenantId::new("t1"), TenantPermissions::full_access())
904 }
905
906 #[tokio::test]
907 async fn test_worker_ingests_output_and_records_artifacts() {
908 let backend = Arc::new(SqliteBackend::in_memory().unwrap());
909 backend.init_schema().unwrap();
910 let tmp = tempfile::tempdir().unwrap();
911 let output = Arc::new(LocalFsOutputStore::new(
912 tmp.path().to_path_buf(),
913 "http://localhost:8080",
914 ));
915
916 let tenant = tenant();
917 let sub_id = SubmissionId::generate("mock-system");
918 backend
919 .create_submission(&tenant, &sub_id, None)
920 .await
921 .unwrap();
922 backend
923 .add_manifest(
924 &tenant,
925 &sub_id,
926 Some("http://provider/manifest.json"),
927 None,
928 )
929 .await
930 .unwrap();
931
932 let ndjson = concat!(
933 "{\"resourceType\":\"Patient\",\"id\":\"p1\",\"name\":[{\"family\":\"A\"}]}\n",
934 "{\"resourceType\":\"Patient\",\"name\":[{\"family\":\"B\"}]}\n"
935 );
936 let mut files = std::collections::HashMap::new();
937 files.insert(
938 "http://provider/patient.ndjson".to_string(),
939 ndjson.as_bytes().to_vec(),
940 );
941 let fetcher = Arc::new(MockFetcher {
942 files,
943 manifest: RemoteManifest {
944 requires_access_token: false,
945 output: vec![RemoteFile {
946 resource_type: Some("Patient".to_string()),
947 url: "http://provider/patient.ndjson".to_string(),
948 count: Some(2),
949 }],
950 deleted: vec![],
951 },
952 });
953
954 let worker = DefaultSubmitWorker::new(
955 backend.clone(),
956 fetcher,
957 output,
958 WorkerId::new("test-worker"),
959 );
960
961 let lease = backend
962 .claim_next_manifest(&WorkerId::new("test-worker"), StdDuration::from_secs(60))
963 .await
964 .unwrap()
965 .expect("claimable manifest");
966 worker.run_job(lease).await.unwrap();
967
968 let manifests = backend.list_manifests(&tenant, &sub_id).await.unwrap();
969 let manifest_id = manifests[0].manifest_id.clone();
970
971 let counts = backend
973 .get_entry_counts(&tenant, &sub_id, &manifest_id)
974 .await
975 .unwrap();
976 assert_eq!(counts.success, 2);
977
978 assert_eq!(
980 manifests[0].status,
981 crate::core::bulk_submit::ManifestStatus::Completed
982 );
983
984 let files = backend.list_submit_files(&tenant, &sub_id).await.unwrap();
986 assert!(
987 files
988 .iter()
989 .any(|f| f.file_type == "output" && f.resource_type.as_deref() == Some("Patient"))
990 );
991 }
992
993 struct FailingManifestFetcher;
995 #[async_trait]
996 impl SubmitInputFetcher for FailingManifestFetcher {
997 async fn fetch_manifest(
998 &self,
999 _url: &str,
1000 _h: &[(String, String)],
1001 _o: &[String],
1002 ) -> StorageResult<RemoteManifest> {
1003 Err(crate::error::StorageError::Backend(
1004 crate::error::BackendError::Internal {
1005 backend_name: "test".into(),
1006 message: "unreachable manifest".into(),
1007 source: None,
1008 },
1009 ))
1010 }
1011 async fn open_file_stream(
1012 &self,
1013 _url: &str,
1014 _h: &[(String, String)],
1015 _r: bool,
1016 _o: &[String],
1017 _k: Option<&Value>,
1018 ) -> StorageResult<Box<dyn tokio::io::AsyncBufRead + Send + Unpin>> {
1019 Ok(Box::new(tokio::io::BufReader::new(std::io::Cursor::new(
1020 Vec::new(),
1021 ))))
1022 }
1023 }
1024
1025 async fn seed(backend: &Arc<SqliteBackend>, tenant: &TenantContext) -> SubmissionId {
1026 let sub_id = SubmissionId::generate("mock-system");
1027 backend
1028 .create_submission(tenant, &sub_id, None)
1029 .await
1030 .unwrap();
1031 backend
1032 .add_manifest(tenant, &sub_id, Some("http://provider/manifest.json"), None)
1033 .await
1034 .unwrap();
1035 sub_id
1036 }
1037
1038 #[tokio::test]
1039 async fn test_worker_fails_manifest_on_fetch_error() {
1040 let backend = Arc::new(SqliteBackend::in_memory().unwrap());
1041 backend.init_schema().unwrap();
1042 let tmp = tempfile::tempdir().unwrap();
1043 let output = Arc::new(LocalFsOutputStore::new(
1044 tmp.path().to_path_buf(),
1045 "http://x",
1046 ));
1047 let tenant = tenant();
1048 let sub_id = seed(&backend, &tenant).await;
1049
1050 let worker = DefaultSubmitWorker::new(
1051 backend.clone(),
1052 Arc::new(FailingManifestFetcher),
1053 output,
1054 WorkerId::new("w"),
1055 );
1056 let lease = backend
1057 .claim_next_manifest(&WorkerId::new("w"), StdDuration::from_secs(60))
1058 .await
1059 .unwrap()
1060 .unwrap();
1061 worker.run_job(lease).await.unwrap();
1063
1064 let manifests = backend.list_manifests(&tenant, &sub_id).await.unwrap();
1065 assert_eq!(
1066 manifests[0].status,
1067 crate::core::bulk_submit::ManifestStatus::Failed
1068 );
1069 let files = backend.list_submit_files(&tenant, &sub_id).await.unwrap();
1071 assert!(files.iter().any(|f| f.file_type == "error"));
1072 }
1073
1074 #[tokio::test]
1075 async fn test_worker_partial_success_on_invalid_ndjson() {
1076 let backend = Arc::new(SqliteBackend::in_memory().unwrap());
1077 backend.init_schema().unwrap();
1078 let tmp = tempfile::tempdir().unwrap();
1079 let output = Arc::new(LocalFsOutputStore::new(
1080 tmp.path().to_path_buf(),
1081 "http://x",
1082 ));
1083 let tenant = tenant();
1084 let sub_id = seed(&backend, &tenant).await;
1085
1086 let ndjson = "{\"resourceType\":\"Patient\",\"id\":\"ok\"}\nnot-json\n";
1088 let mut files = std::collections::HashMap::new();
1089 files.insert(
1090 "http://provider/p.ndjson".to_string(),
1091 ndjson.as_bytes().to_vec(),
1092 );
1093 let fetcher = Arc::new(MockFetcher {
1094 files,
1095 manifest: RemoteManifest {
1096 requires_access_token: false,
1097 output: vec![RemoteFile {
1098 resource_type: Some("Patient".to_string()),
1099 url: "http://provider/p.ndjson".to_string(),
1100 count: Some(2),
1101 }],
1102 deleted: vec![],
1103 },
1104 });
1105 let worker = DefaultSubmitWorker::new(backend.clone(), fetcher, output, WorkerId::new("w"));
1106 let lease = backend
1107 .claim_next_manifest(&WorkerId::new("w"), StdDuration::from_secs(60))
1108 .await
1109 .unwrap()
1110 .unwrap();
1111 worker.run_job(lease).await.unwrap();
1112
1113 let manifests = backend.list_manifests(&tenant, &sub_id).await.unwrap();
1114 let counts = backend
1115 .get_entry_counts(&tenant, &sub_id, &manifests[0].manifest_id)
1116 .await
1117 .unwrap();
1118 assert_eq!(counts.success, 1);
1122 assert!(manifests[0].failed_entries >= 1);
1123 assert_eq!(
1124 manifests[0].status,
1125 crate::core::bulk_submit::ManifestStatus::Completed
1126 );
1127 let files = backend.list_submit_files(&tenant, &sub_id).await.unwrap();
1128 assert!(files.iter().any(|f| f.file_type == "error"));
1129 assert!(files.iter().any(|f| f.file_type == "output"));
1130 }
1131}