use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use crate::error::StorageResult;
use crate::tenant::TenantContext;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ExportJobId(String);
impl ExportJobId {
pub fn new() -> Self {
Self(Uuid::new_v4().to_string())
}
pub fn from_string(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for ExportJobId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for ExportJobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for ExportJobId {
fn from(s: String) -> Self {
Self(s)
}
}
impl From<&str> for ExportJobId {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ExportStatus {
Accepted,
InProgress,
Complete,
Error,
Cancelled,
}
impl ExportStatus {
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Complete | Self::Error | Self::Cancelled)
}
pub fn is_active(&self) -> bool {
matches!(self, Self::Accepted | Self::InProgress)
}
}
impl std::fmt::Display for ExportStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Accepted => write!(f, "accepted"),
Self::InProgress => write!(f, "in-progress"),
Self::Complete => write!(f, "complete"),
Self::Error => write!(f, "error"),
Self::Cancelled => write!(f, "cancelled"),
}
}
}
impl std::str::FromStr for ExportStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"accepted" => Ok(Self::Accepted),
"in-progress" | "in_progress" => Ok(Self::InProgress),
"complete" => Ok(Self::Complete),
"error" => Ok(Self::Error),
"cancelled" => Ok(Self::Cancelled),
_ => Err(format!("unknown export status: {}", s)),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ExportLevel {
System,
Patient,
Group {
group_id: String,
},
}
impl ExportLevel {
pub fn system() -> Self {
Self::System
}
pub fn patient() -> Self {
Self::Patient
}
pub fn group(group_id: impl Into<String>) -> Self {
Self::Group {
group_id: group_id.into(),
}
}
}
impl std::fmt::Display for ExportLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::System => write!(f, "system"),
Self::Patient => write!(f, "patient"),
Self::Group { group_id } => write!(f, "group/{}", group_id),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TypeFilter {
pub resource_type: String,
pub query: String,
}
impl TypeFilter {
pub fn new(resource_type: impl Into<String>, query: impl Into<String>) -> Self {
Self {
resource_type: resource_type.into(),
query: query.into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportRequest {
pub level: ExportLevel,
#[serde(default)]
pub resource_types: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub since: Option<DateTime<Utc>>,
#[serde(default)]
pub type_filters: Vec<TypeFilter>,
#[serde(default = "default_batch_size")]
pub batch_size: u32,
#[serde(default = "default_output_format")]
pub output_format: String,
}
fn default_batch_size() -> u32 {
1000
}
fn default_output_format() -> String {
"application/fhir+ndjson".to_string()
}
impl ExportRequest {
pub fn new(level: ExportLevel) -> Self {
Self {
level,
resource_types: Vec::new(),
since: None,
type_filters: Vec::new(),
batch_size: default_batch_size(),
output_format: default_output_format(),
}
}
pub fn system() -> Self {
Self::new(ExportLevel::System)
}
pub fn patient() -> Self {
Self::new(ExportLevel::Patient)
}
pub fn group(group_id: impl Into<String>) -> Self {
Self::new(ExportLevel::Group {
group_id: group_id.into(),
})
}
pub fn with_types(mut self, types: Vec<String>) -> Self {
self.resource_types = types;
self
}
pub fn with_since(mut self, since: DateTime<Utc>) -> Self {
self.since = Some(since);
self
}
pub fn with_type_filter(mut self, filter: TypeFilter) -> Self {
self.type_filters.push(filter);
self
}
pub fn with_type_filters(mut self, filters: Vec<TypeFilter>) -> Self {
self.type_filters.extend(filters);
self
}
pub fn with_batch_size(mut self, batch_size: u32) -> Self {
self.batch_size = batch_size;
self
}
pub fn group_id(&self) -> Option<&str> {
match &self.level {
ExportLevel::Group { group_id } => Some(group_id),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypeExportProgress {
pub resource_type: String,
pub total_count: Option<u64>,
pub exported_count: u64,
pub error_count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor_state: Option<String>,
}
impl TypeExportProgress {
pub fn new(resource_type: impl Into<String>) -> Self {
Self {
resource_type: resource_type.into(),
total_count: None,
exported_count: 0,
error_count: 0,
cursor_state: None,
}
}
pub fn with_total(mut self, total: u64) -> Self {
self.total_count = Some(total);
self
}
pub fn progress_fraction(&self) -> Option<f64> {
self.total_count.map(|total| {
if total == 0 {
1.0
} else {
self.exported_count as f64 / total as f64
}
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportProgress {
pub job_id: ExportJobId,
pub status: ExportStatus,
pub level: ExportLevel,
pub transaction_time: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub completed_at: Option<DateTime<Utc>>,
pub type_progress: Vec<TypeExportProgress>,
#[serde(skip_serializing_if = "Option::is_none")]
pub current_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
}
impl ExportProgress {
pub fn accepted(
job_id: ExportJobId,
level: ExportLevel,
transaction_time: DateTime<Utc>,
) -> Self {
Self {
job_id,
status: ExportStatus::Accepted,
level,
transaction_time,
started_at: None,
completed_at: None,
type_progress: Vec::new(),
current_type: None,
error_message: None,
}
}
pub fn overall_progress(&self) -> f64 {
if self.type_progress.is_empty() {
return 0.0;
}
let (total_exported, total_count) = self
.type_progress
.iter()
.fold((0u64, 0u64), |(exp, tot), tp| {
(exp + tp.exported_count, tot + tp.total_count.unwrap_or(0))
});
if total_count == 0 {
0.0
} else {
total_exported as f64 / total_count as f64
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportOutputFile {
#[serde(rename = "type")]
pub resource_type: String,
pub url: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub count: Option<u64>,
}
impl ExportOutputFile {
pub fn new(resource_type: impl Into<String>, url: impl Into<String>) -> Self {
Self {
resource_type: resource_type.into(),
url: url.into(),
count: None,
}
}
pub fn with_count(mut self, count: u64) -> Self {
self.count = Some(count);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportManifest {
#[serde(rename = "transactionTime")]
pub transaction_time: DateTime<Utc>,
pub request: String,
#[serde(rename = "requiresAccessToken")]
pub requires_access_token: bool,
pub output: Vec<ExportOutputFile>,
#[serde(default)]
pub error: Vec<ExportOutputFile>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub extension: Option<Value>,
}
impl ExportManifest {
pub fn new(transaction_time: DateTime<Utc>, request: impl Into<String>) -> Self {
Self {
transaction_time,
request: request.into(),
requires_access_token: true,
output: Vec::new(),
error: Vec::new(),
message: None,
extension: None,
}
}
pub fn with_output(mut self, file: ExportOutputFile) -> Self {
self.output.push(file);
self
}
pub fn with_error(mut self, file: ExportOutputFile) -> Self {
self.error.push(file);
self
}
pub fn with_message(mut self, message: impl Into<String>) -> Self {
self.message = Some(message.into());
self
}
}
#[derive(Debug, Clone)]
pub struct NdjsonBatch {
pub lines: Vec<String>,
pub next_cursor: Option<String>,
pub is_last: bool,
}
impl NdjsonBatch {
pub fn new(lines: Vec<String>) -> Self {
Self {
lines,
next_cursor: None,
is_last: false,
}
}
pub fn empty() -> Self {
Self {
lines: Vec::new(),
next_cursor: None,
is_last: true,
}
}
pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
self.next_cursor = Some(cursor.into());
self
}
pub fn as_last(mut self) -> Self {
self.is_last = true;
self.next_cursor = None;
self
}
pub fn len(&self) -> usize {
self.lines.len()
}
pub fn is_empty(&self) -> bool {
self.lines.is_empty()
}
}
#[async_trait]
pub trait BulkExportStorage: Send + Sync {
async fn start_export(
&self,
tenant: &TenantContext,
request: ExportRequest,
) -> StorageResult<ExportJobId>;
async fn get_export_status(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
) -> StorageResult<ExportProgress>;
async fn cancel_export(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
) -> StorageResult<()>;
async fn delete_export(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
) -> StorageResult<()>;
async fn get_export_manifest(
&self,
tenant: &TenantContext,
job_id: &ExportJobId,
) -> StorageResult<ExportManifest>;
async fn list_exports(
&self,
tenant: &TenantContext,
include_completed: bool,
) -> StorageResult<Vec<ExportProgress>>;
}
#[async_trait]
pub trait ExportDataProvider: Send + Sync {
async fn list_export_types(
&self,
tenant: &TenantContext,
request: &ExportRequest,
) -> StorageResult<Vec<String>>;
async fn count_export_resources(
&self,
tenant: &TenantContext,
request: &ExportRequest,
resource_type: &str,
) -> StorageResult<u64>;
async fn fetch_export_batch(
&self,
tenant: &TenantContext,
request: &ExportRequest,
resource_type: &str,
cursor: Option<&str>,
batch_size: u32,
) -> StorageResult<NdjsonBatch>;
}
#[async_trait]
pub trait PatientExportProvider: ExportDataProvider {
async fn list_patient_ids(
&self,
tenant: &TenantContext,
request: &ExportRequest,
cursor: Option<&str>,
batch_size: u32,
) -> StorageResult<(Vec<String>, Option<String>)>;
async fn fetch_patient_compartment_batch(
&self,
tenant: &TenantContext,
request: &ExportRequest,
resource_type: &str,
patient_ids: &[String],
cursor: Option<&str>,
batch_size: u32,
) -> StorageResult<NdjsonBatch>;
}
#[async_trait]
pub trait GroupExportProvider: PatientExportProvider {
async fn get_group_members(
&self,
tenant: &TenantContext,
group_id: &str,
) -> StorageResult<Vec<String>>;
async fn resolve_group_patient_ids(
&self,
tenant: &TenantContext,
group_id: &str,
) -> StorageResult<Vec<String>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_export_job_id() {
let id = ExportJobId::new();
assert!(!id.as_str().is_empty());
let id2 = ExportJobId::from_string("test-123");
assert_eq!(id2.as_str(), "test-123");
assert_eq!(id2.to_string(), "test-123");
}
#[test]
fn test_export_status() {
assert!(ExportStatus::Complete.is_terminal());
assert!(ExportStatus::Error.is_terminal());
assert!(ExportStatus::Cancelled.is_terminal());
assert!(!ExportStatus::Accepted.is_terminal());
assert!(!ExportStatus::InProgress.is_terminal());
assert!(ExportStatus::Accepted.is_active());
assert!(ExportStatus::InProgress.is_active());
assert!(!ExportStatus::Complete.is_active());
}
#[test]
fn test_export_status_display_parse() {
let status = ExportStatus::InProgress;
assert_eq!(status.to_string(), "in-progress");
let parsed: ExportStatus = "in-progress".parse().unwrap();
assert_eq!(parsed, ExportStatus::InProgress);
let parsed: ExportStatus = "in_progress".parse().unwrap();
assert_eq!(parsed, ExportStatus::InProgress);
}
#[test]
fn test_export_level() {
let system = ExportLevel::system();
assert!(matches!(system, ExportLevel::System));
let patient = ExportLevel::patient();
assert!(matches!(patient, ExportLevel::Patient));
let group = ExportLevel::group("grp-123");
assert!(matches!(group, ExportLevel::Group { group_id } if group_id == "grp-123"));
}
#[test]
fn test_export_request_builder() {
let request = ExportRequest::system()
.with_types(vec!["Patient".to_string(), "Observation".to_string()])
.with_batch_size(500)
.with_type_filter(TypeFilter::new("Observation", "code=1234"));
assert!(matches!(request.level, ExportLevel::System));
assert_eq!(request.resource_types, vec!["Patient", "Observation"]);
assert_eq!(request.batch_size, 500);
assert_eq!(request.type_filters.len(), 1);
}
#[test]
fn test_export_request_group_id() {
let request = ExportRequest::group("grp-123");
assert_eq!(request.group_id(), Some("grp-123"));
let system_request = ExportRequest::system();
assert_eq!(system_request.group_id(), None);
}
#[test]
fn test_type_export_progress() {
let progress = TypeExportProgress::new("Patient").with_total(100);
assert_eq!(progress.total_count, Some(100));
assert_eq!(progress.progress_fraction(), Some(0.0));
let mut progress = progress;
progress.exported_count = 50;
assert_eq!(progress.progress_fraction(), Some(0.5));
}
#[test]
fn test_export_manifest() {
let manifest = ExportManifest::new(Utc::now(), "https://example.com/$export")
.with_output(
ExportOutputFile::new("Patient", "/exports/Patient.ndjson").with_count(100),
)
.with_message("Export complete");
assert_eq!(manifest.output.len(), 1);
assert_eq!(manifest.output[0].resource_type, "Patient");
assert_eq!(manifest.output[0].count, Some(100));
assert!(manifest.message.is_some());
}
#[test]
fn test_ndjson_batch() {
let batch = NdjsonBatch::new(vec![
r#"{"resourceType":"Patient","id":"1"}"#.to_string(),
r#"{"resourceType":"Patient","id":"2"}"#.to_string(),
])
.with_cursor("next-page");
assert_eq!(batch.len(), 2);
assert!(!batch.is_empty());
assert!(!batch.is_last);
assert_eq!(batch.next_cursor, Some("next-page".to_string()));
let final_batch = batch.as_last();
assert!(final_batch.is_last);
assert!(final_batch.next_cursor.is_none());
}
#[test]
fn test_ndjson_batch_empty() {
let batch = NdjsonBatch::empty();
assert!(batch.is_empty());
assert!(batch.is_last);
}
}