use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncBufRead;
use uuid::Uuid;
use crate::core::storage::ResourceStorage;
use crate::error::StorageResult;
use crate::tenant::TenantContext;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SubmissionId {
pub submitter: String,
pub submission_id: String,
}
impl SubmissionId {
pub fn new(submitter: impl Into<String>, submission_id: impl Into<String>) -> Self {
Self {
submitter: submitter.into(),
submission_id: submission_id.into(),
}
}
pub fn generate(submitter: impl Into<String>) -> Self {
Self {
submitter: submitter.into(),
submission_id: Uuid::new_v4().to_string(),
}
}
}
impl std::fmt::Display for SubmissionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.submitter, self.submission_id)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum SubmissionStatus {
InProgress,
Complete,
Aborted,
}
impl SubmissionStatus {
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Complete | Self::Aborted)
}
}
impl std::fmt::Display for SubmissionStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InProgress => write!(f, "in-progress"),
Self::Complete => write!(f, "complete"),
Self::Aborted => write!(f, "aborted"),
}
}
}
impl std::str::FromStr for SubmissionStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"in-progress" | "in_progress" => Ok(Self::InProgress),
"complete" => Ok(Self::Complete),
"aborted" => Ok(Self::Aborted),
_ => Err(format!("unknown submission status: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ManifestStatus {
Pending,
Processing,
Completed,
Failed,
}
impl ManifestStatus {
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Completed | Self::Failed)
}
}
impl std::fmt::Display for ManifestStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Processing => write!(f, "processing"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
impl std::str::FromStr for ManifestStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"pending" => Ok(Self::Pending),
"processing" => Ok(Self::Processing),
"completed" => Ok(Self::Completed),
"failed" => Ok(Self::Failed),
_ => Err(format!("unknown manifest status: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmissionManifest {
pub manifest_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub manifest_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub replaces_manifest_url: Option<String>,
pub status: ManifestStatus,
pub added_at: DateTime<Utc>,
pub total_entries: u64,
pub processed_entries: u64,
pub failed_entries: u64,
}
impl SubmissionManifest {
pub fn new(manifest_id: impl Into<String>) -> Self {
Self {
manifest_id: manifest_id.into(),
manifest_url: None,
replaces_manifest_url: None,
status: ManifestStatus::Pending,
added_at: Utc::now(),
total_entries: 0,
processed_entries: 0,
failed_entries: 0,
}
}
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.manifest_url = Some(url.into());
self
}
pub fn with_replaces(mut self, url: impl Into<String>) -> Self {
self.replaces_manifest_url = Some(url.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum BulkEntryOutcome {
Success,
ValidationError,
ProcessingError,
Skipped,
}
impl std::fmt::Display for BulkEntryOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Success => write!(f, "success"),
Self::ValidationError => write!(f, "validation-error"),
Self::ProcessingError => write!(f, "processing-error"),
Self::Skipped => write!(f, "skipped"),
}
}
}
impl std::str::FromStr for BulkEntryOutcome {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"success" => Ok(Self::Success),
"validation-error" | "validation_error" => Ok(Self::ValidationError),
"processing-error" | "processing_error" => Ok(Self::ProcessingError),
"skipped" => Ok(Self::Skipped),
_ => Err(format!("unknown entry outcome: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BulkEntryResult {
pub line_number: u64,
pub resource_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub resource_id: Option<String>,
pub created: bool,
pub outcome: BulkEntryOutcome,
#[serde(skip_serializing_if = "Option::is_none")]
pub operation_outcome: Option<Value>,
}
impl BulkEntryResult {
pub fn success(
line_number: u64,
resource_type: impl Into<String>,
resource_id: impl Into<String>,
created: bool,
) -> Self {
Self {
line_number,
resource_type: resource_type.into(),
resource_id: Some(resource_id.into()),
created,
outcome: BulkEntryOutcome::Success,
operation_outcome: None,
}
}
pub fn validation_error(
line_number: u64,
resource_type: impl Into<String>,
outcome: Value,
) -> Self {
Self {
line_number,
resource_type: resource_type.into(),
resource_id: None,
created: false,
outcome: BulkEntryOutcome::ValidationError,
operation_outcome: Some(outcome),
}
}
pub fn processing_error(
line_number: u64,
resource_type: impl Into<String>,
outcome: Value,
) -> Self {
Self {
line_number,
resource_type: resource_type.into(),
resource_id: None,
created: false,
outcome: BulkEntryOutcome::ProcessingError,
operation_outcome: Some(outcome),
}
}
pub fn skipped(line_number: u64, resource_type: impl Into<String>, reason: &str) -> Self {
Self {
line_number,
resource_type: resource_type.into(),
resource_id: None,
created: false,
outcome: BulkEntryOutcome::Skipped,
operation_outcome: Some(serde_json::json!({
"resourceType": "OperationOutcome",
"issue": [{
"severity": "information",
"code": "informational",
"diagnostics": reason
}]
})),
}
}
pub fn is_success(&self) -> bool {
self.outcome == BulkEntryOutcome::Success
}
pub fn is_error(&self) -> bool {
matches!(
self.outcome,
BulkEntryOutcome::ValidationError | BulkEntryOutcome::ProcessingError
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmissionSummary {
pub id: SubmissionId,
pub status: SubmissionStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completed_at: Option<DateTime<Utc>>,
pub manifest_count: u32,
pub total_entries: u64,
pub success_count: u64,
pub error_count: u64,
pub skipped_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<Value>,
}
impl SubmissionSummary {
pub fn new(id: SubmissionId) -> Self {
let now = Utc::now();
Self {
id,
status: SubmissionStatus::InProgress,
created_at: now,
updated_at: now,
completed_at: None,
manifest_count: 0,
total_entries: 0,
success_count: 0,
error_count: 0,
skipped_count: 0,
metadata: None,
}
}
pub fn with_metadata(mut self, metadata: Value) -> Self {
self.metadata = Some(metadata);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NdjsonEntry {
pub line_number: u64,
pub resource_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub resource_id: Option<String>,
pub resource: Value,
}
impl NdjsonEntry {
pub fn new(line_number: u64, resource_type: impl Into<String>, resource: Value) -> Self {
let resource_type = resource_type.into();
let resource_id = resource
.get("id")
.and_then(|v| v.as_str())
.map(String::from);
Self {
line_number,
resource_type,
resource_id,
resource,
}
}
pub fn parse(line_number: u64, line: &str) -> Result<Self, String> {
let resource: Value =
serde_json::from_str(line).map_err(|e| format!("invalid JSON: {}", e))?;
let resource_type = resource
.get("resourceType")
.and_then(|v| v.as_str())
.ok_or_else(|| "missing resourceType".to_string())?
.to_string();
Ok(Self::new(line_number, resource_type, resource))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BulkProcessingOptions {
#[serde(default = "default_submit_batch_size")]
pub batch_size: u32,
#[serde(default = "default_continue_on_error")]
pub continue_on_error: bool,
#[serde(default)]
pub max_errors: u32,
#[serde(default = "default_allow_updates")]
pub allow_updates: bool,
}
fn default_submit_batch_size() -> u32 {
100
}
fn default_continue_on_error() -> bool {
true
}
fn default_allow_updates() -> bool {
true
}
impl Default for BulkProcessingOptions {
fn default() -> Self {
Self::new()
}
}
impl BulkProcessingOptions {
pub fn new() -> Self {
Self {
batch_size: default_submit_batch_size(),
continue_on_error: default_continue_on_error(),
max_errors: 0,
allow_updates: default_allow_updates(),
}
}
pub fn with_batch_size(mut self, batch_size: u32) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_continue_on_error(mut self, continue_on_error: bool) -> Self {
self.continue_on_error = continue_on_error;
self
}
pub fn with_max_errors(mut self, max_errors: u32) -> Self {
self.max_errors = max_errors;
self
}
pub fn with_allow_updates(mut self, allow_updates: bool) -> Self {
self.allow_updates = allow_updates;
self
}
pub fn strict() -> Self {
Self {
batch_size: default_submit_batch_size(),
continue_on_error: false,
max_errors: 1,
allow_updates: true,
}
}
pub fn create_only() -> Self {
Self {
batch_size: default_submit_batch_size(),
continue_on_error: true,
max_errors: 0,
allow_updates: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ChangeType {
Create,
Update,
}
impl std::fmt::Display for ChangeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Create => write!(f, "create"),
Self::Update => write!(f, "update"),
}
}
}
impl std::str::FromStr for ChangeType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"create" => Ok(Self::Create),
"update" => Ok(Self::Update),
_ => Err(format!("unknown change type: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubmissionChange {
pub change_id: String,
pub manifest_id: String,
pub change_type: ChangeType,
pub resource_type: String,
pub resource_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub previous_version: Option<String>,
pub new_version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub previous_content: Option<Value>,
pub changed_at: DateTime<Utc>,
}
impl SubmissionChange {
pub fn create(
manifest_id: impl Into<String>,
resource_type: impl Into<String>,
resource_id: impl Into<String>,
new_version: impl Into<String>,
) -> Self {
Self {
change_id: Uuid::new_v4().to_string(),
manifest_id: manifest_id.into(),
change_type: ChangeType::Create,
resource_type: resource_type.into(),
resource_id: resource_id.into(),
previous_version: None,
new_version: new_version.into(),
previous_content: None,
changed_at: Utc::now(),
}
}
pub fn update(
manifest_id: impl Into<String>,
resource_type: impl Into<String>,
resource_id: impl Into<String>,
previous_version: impl Into<String>,
new_version: impl Into<String>,
previous_content: Value,
) -> Self {
Self {
change_id: Uuid::new_v4().to_string(),
manifest_id: manifest_id.into(),
change_type: ChangeType::Update,
resource_type: resource_type.into(),
resource_id: resource_id.into(),
previous_version: Some(previous_version.into()),
new_version: new_version.into(),
previous_content: Some(previous_content),
changed_at: Utc::now(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EntryCountSummary {
pub total: u64,
pub success: u64,
pub validation_error: u64,
pub processing_error: u64,
pub skipped: u64,
}
impl EntryCountSummary {
pub fn new() -> Self {
Self::default()
}
pub fn error_count(&self) -> u64 {
self.validation_error + self.processing_error
}
pub fn increment(&mut self, outcome: BulkEntryOutcome) {
self.total += 1;
match outcome {
BulkEntryOutcome::Success => self.success += 1,
BulkEntryOutcome::ValidationError => self.validation_error += 1,
BulkEntryOutcome::ProcessingError => self.processing_error += 1,
BulkEntryOutcome::Skipped => self.skipped += 1,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamProcessingResult {
pub lines_processed: u64,
pub counts: EntryCountSummary,
pub aborted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub abort_reason: Option<String>,
}
impl StreamProcessingResult {
pub fn new() -> Self {
Self {
lines_processed: 0,
counts: EntryCountSummary::new(),
aborted: false,
abort_reason: None,
}
}
pub fn aborted(mut self, reason: impl Into<String>) -> Self {
self.aborted = true;
self.abort_reason = Some(reason.into());
self
}
}
impl Default for StreamProcessingResult {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
pub trait BulkSubmitProvider: ResourceStorage {
async fn create_submission(
&self,
tenant: &TenantContext,
id: &SubmissionId,
metadata: Option<Value>,
) -> StorageResult<SubmissionSummary>;
async fn get_submission(
&self,
tenant: &TenantContext,
id: &SubmissionId,
) -> StorageResult<Option<SubmissionSummary>>;
async fn list_submissions(
&self,
tenant: &TenantContext,
submitter: Option<&str>,
status: Option<SubmissionStatus>,
limit: u32,
offset: u32,
) -> StorageResult<Vec<SubmissionSummary>>;
async fn complete_submission(
&self,
tenant: &TenantContext,
id: &SubmissionId,
) -> StorageResult<SubmissionSummary>;
async fn abort_submission(
&self,
tenant: &TenantContext,
id: &SubmissionId,
reason: &str,
) -> StorageResult<u64>;
async fn add_manifest(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
manifest_url: Option<&str>,
replaces_manifest_url: Option<&str>,
) -> StorageResult<SubmissionManifest>;
async fn get_manifest(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
manifest_id: &str,
) -> StorageResult<Option<SubmissionManifest>>;
async fn list_manifests(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
) -> StorageResult<Vec<SubmissionManifest>>;
async fn process_entries(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
manifest_id: &str,
entries: Vec<NdjsonEntry>,
options: &BulkProcessingOptions,
) -> StorageResult<Vec<BulkEntryResult>>;
async fn get_entry_results(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
manifest_id: &str,
outcome_filter: Option<BulkEntryOutcome>,
limit: u32,
offset: u32,
) -> StorageResult<Vec<BulkEntryResult>>;
async fn get_entry_counts(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
manifest_id: &str,
) -> StorageResult<EntryCountSummary>;
}
#[async_trait]
pub trait StreamingBulkSubmitProvider: BulkSubmitProvider {
async fn process_ndjson_stream(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
manifest_id: &str,
resource_type: &str,
reader: Box<dyn AsyncBufRead + Send + Unpin>,
options: &BulkProcessingOptions,
) -> StorageResult<StreamProcessingResult>;
}
#[async_trait]
pub trait BulkSubmitRollbackProvider: BulkSubmitProvider {
async fn record_change(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
change: &SubmissionChange,
) -> StorageResult<()>;
async fn list_changes(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
limit: u32,
offset: u32,
) -> StorageResult<Vec<SubmissionChange>>;
async fn rollback_change(
&self,
tenant: &TenantContext,
submission_id: &SubmissionId,
change: &SubmissionChange,
) -> StorageResult<bool>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_submission_id() {
let id = SubmissionId::new("my-system", "sub-123");
assert_eq!(id.submitter, "my-system");
assert_eq!(id.submission_id, "sub-123");
assert_eq!(id.to_string(), "my-system/sub-123");
}
#[test]
fn test_submission_id_generate() {
let id = SubmissionId::generate("my-system");
assert_eq!(id.submitter, "my-system");
assert!(!id.submission_id.is_empty());
}
#[test]
fn test_submission_status() {
assert!(!SubmissionStatus::InProgress.is_terminal());
assert!(SubmissionStatus::Complete.is_terminal());
assert!(SubmissionStatus::Aborted.is_terminal());
let status: SubmissionStatus = "in-progress".parse().unwrap();
assert_eq!(status, SubmissionStatus::InProgress);
}
#[test]
fn test_manifest_status() {
assert!(!ManifestStatus::Pending.is_terminal());
assert!(!ManifestStatus::Processing.is_terminal());
assert!(ManifestStatus::Completed.is_terminal());
assert!(ManifestStatus::Failed.is_terminal());
}
#[test]
fn test_bulk_entry_result() {
let success = BulkEntryResult::success(1, "Patient", "pat-123", true);
assert!(success.is_success());
assert!(!success.is_error());
assert!(success.created);
let error = BulkEntryResult::validation_error(
2,
"Patient",
serde_json::json!({"resourceType": "OperationOutcome"}),
);
assert!(!error.is_success());
assert!(error.is_error());
}
#[test]
fn test_ndjson_entry_parse() {
let line = r#"{"resourceType":"Patient","id":"123","name":[{"family":"Smith"}]}"#;
let entry = NdjsonEntry::parse(1, line).unwrap();
assert_eq!(entry.line_number, 1);
assert_eq!(entry.resource_type, "Patient");
assert_eq!(entry.resource_id, Some("123".to_string()));
}
#[test]
fn test_ndjson_entry_parse_error() {
let result = NdjsonEntry::parse(1, "not json");
assert!(result.is_err());
let result = NdjsonEntry::parse(1, r#"{"id":"123"}"#);
assert!(result.is_err()); }
#[test]
fn test_bulk_processing_options() {
let options = BulkProcessingOptions::new()
.with_batch_size(50)
.with_max_errors(10)
.with_continue_on_error(false);
assert_eq!(options.batch_size, 50);
assert_eq!(options.max_errors, 10);
assert!(!options.continue_on_error);
}
#[test]
fn test_bulk_processing_options_strict() {
let options = BulkProcessingOptions::strict();
assert!(!options.continue_on_error);
assert_eq!(options.max_errors, 1);
}
#[test]
fn test_submission_change() {
let create = SubmissionChange::create("manifest-1", "Patient", "pat-123", "1");
assert_eq!(create.change_type, ChangeType::Create);
assert!(create.previous_content.is_none());
let update = SubmissionChange::update(
"manifest-1",
"Patient",
"pat-123",
"1",
"2",
serde_json::json!({"resourceType": "Patient"}),
);
assert_eq!(update.change_type, ChangeType::Update);
assert!(update.previous_content.is_some());
}
#[test]
fn test_entry_count_summary() {
let mut counts = EntryCountSummary::new();
counts.increment(BulkEntryOutcome::Success);
counts.increment(BulkEntryOutcome::Success);
counts.increment(BulkEntryOutcome::ValidationError);
counts.increment(BulkEntryOutcome::ProcessingError);
counts.increment(BulkEntryOutcome::Skipped);
assert_eq!(counts.total, 5);
assert_eq!(counts.success, 2);
assert_eq!(counts.error_count(), 2);
assert_eq!(counts.skipped, 1);
}
#[test]
fn test_stream_processing_result() {
let result = StreamProcessingResult::new().aborted("max errors exceeded");
assert!(result.aborted);
assert_eq!(result.abort_reason, Some("max errors exceeded".to_string()));
}
}