use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use utoipa::ToSchema;
#[cfg(feature = "aws-backend")]
use aws_sdk_dynamodb::types::AttributeValue;
#[cfg(feature = "aws-backend")]
use std::time::SystemTime;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
pub enum JobType {
Ingestion,
Backfill,
Indexing,
Other(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
pub enum JobStatus {
Queued,
Running,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct Job {
pub id: String,
pub job_type: JobType,
pub status: JobStatus,
pub progress_percentage: u8,
pub message: String,
#[serde(default)]
pub user_id: Option<String>,
pub metadata: serde_json::Value,
pub result: Option<serde_json::Value>,
pub created_at: u64,
pub updated_at: u64,
pub completed_at: Option<u64>,
pub error: Option<String>,
}
impl Job {
pub fn new(id: String, job_type: JobType) -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
id,
job_type,
status: JobStatus::Queued,
progress_percentage: 0,
message: "Initialized".to_string(),
user_id: None,
metadata: serde_json::Value::Null,
result: None,
created_at: now,
updated_at: now,
completed_at: None,
error: None,
}
}
pub fn with_user(mut self, user_id: String) -> Self {
self.user_id = Some(user_id);
self
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = metadata;
self
}
pub fn update_progress(&mut self, percentage: u8, message: String) {
self.progress_percentage = percentage.min(100);
self.message = message;
self.status = JobStatus::Running;
self.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
}
pub fn complete(&mut self, result: Option<serde_json::Value>) {
self.status = JobStatus::Completed;
self.progress_percentage = 100;
self.message = "Completed".to_string();
self.result = result;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.updated_at = now;
self.completed_at = Some(now);
}
pub fn fail(&mut self, error: String) {
self.status = JobStatus::Failed;
self.error = Some(error.clone());
self.message = format!("Failed: {}", error);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.updated_at = now;
self.completed_at = Some(now);
}
}
#[async_trait]
pub trait ProgressStore: Send + Sync {
async fn save(&self, job: &Job) -> Result<(), String>;
async fn load(&self, id: &str) -> Result<Option<Job>, String>;
async fn list_by_user(&self, user_id: &str) -> Result<Vec<Job>, String>;
async fn delete(&self, id: &str) -> Result<(), String>;
}
pub struct InMemoryProgressStore {
store: Mutex<HashMap<String, Job>>,
}
impl Default for InMemoryProgressStore {
fn default() -> Self {
Self::new()
}
}
impl InMemoryProgressStore {
pub fn new() -> Self {
Self {
store: Mutex::new(HashMap::new()),
}
}
}
#[async_trait]
impl ProgressStore for InMemoryProgressStore {
async fn save(&self, job: &Job) -> Result<(), String> {
let mut store = self.store.lock().unwrap();
store.insert(job.id.clone(), job.clone());
Ok(())
}
async fn load(&self, id: &str) -> Result<Option<Job>, String> {
let store = self.store.lock().unwrap();
Ok(store.get(id).cloned())
}
async fn list_by_user(&self, user_id: &str) -> Result<Vec<Job>, String> {
let store = self.store.lock().unwrap();
Ok(store.values()
.filter(|j| j.user_id.as_deref() == Some(user_id) || j.user_id.is_none())
.cloned()
.collect())
}
async fn delete(&self, id: &str) -> Result<(), String> {
let mut store = self.store.lock().unwrap();
store.remove(id);
Ok(())
}
}
#[cfg(feature = "aws-backend")]
pub struct DynamoDbProgressStore {
client: aws_sdk_dynamodb::Client,
table_name: String,
}
#[cfg(feature = "aws-backend")]
impl DynamoDbProgressStore {
pub fn new(client: aws_sdk_dynamodb::Client, table_name: String) -> Self {
Self { client, table_name }
}
pub async fn from_config(table_name: String, region: String) -> Self {
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(aws_sdk_dynamodb::config::Region::new(region))
.load()
.await;
let client = aws_sdk_dynamodb::Client::new(&config);
Self { client, table_name }
}
fn item_to_job(&self, item: &HashMap<String, AttributeValue>) -> Option<Job> {
let json = item.get("data")?.as_s().ok()?;
serde_json::from_str(json).ok()
}
}
#[cfg(feature = "aws-backend")]
#[async_trait]
impl ProgressStore for DynamoDbProgressStore {
async fn save(&self, job: &Job) -> Result<(), String> {
let pk = job.user_id.clone().unwrap_or_else(|| "global".to_string());
let json = serde_json::to_string(job).map_err(|e| e.to_string())?;
let ttl = (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
+ (24 * 60 * 60)) as i64;
self.client
.put_item()
.table_name(&self.table_name)
.item("PK", AttributeValue::S(pk))
.item("SK", AttributeValue::S(job.id.clone()))
.item("data", AttributeValue::S(json))
.item("ttl", AttributeValue::N(ttl.to_string()))
.send()
.await
.map(|_| ())
.map_err(|e| e.to_string())
}
async fn load(&self, id: &str) -> Result<Option<Job>, String> {
let user_id = crate::logging::core::get_current_user_id().unwrap_or_else(|| "default".to_string());
let result = self.client
.get_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(user_id))
.key("SK", AttributeValue::S(id.to_string()))
.send()
.await
.map_err(|e| e.to_string())?;
if let Some(item) = result.item {
Ok(self.item_to_job(&item))
} else {
Ok(None)
}
}
async fn list_by_user(&self, user_id: &str) -> Result<Vec<Job>, String> {
let result = self.client
.query()
.table_name(&self.table_name)
.key_condition_expression("PK = :uid")
.expression_attribute_values(":uid", AttributeValue::S(user_id.to_string()))
.send()
.await
.map_err(|e| e.to_string())?;
let items = result.items.unwrap_or_default();
Ok(items
.iter()
.filter_map(|i| self.item_to_job(i))
.collect())
}
async fn delete(&self, id: &str) -> Result<(), String> {
let user_id = crate::logging::core::get_current_user_id().unwrap_or_else(|| "default".to_string());
self.client
.delete_item()
.table_name(&self.table_name)
.key("PK", AttributeValue::S(user_id))
.key("SK", AttributeValue::S(id.to_string()))
.send()
.await
.map(|_| ())
.map_err(|e| e.to_string())
}
}
pub type ProgressTracker = Arc<dyn ProgressStore>;
pub async fn create_tracker(dynamo_config: Option<(String, String)>) -> ProgressTracker {
if let Some((table_name, region)) = dynamo_config {
#[cfg(feature = "aws-backend")]
{
let store = DynamoDbProgressStore::from_config(table_name.clone(), region).await;
Arc::new(store)
}
#[cfg(not(feature = "aws-backend"))]
{
let _ = (table_name, region);
Arc::new(InMemoryProgressStore::new())
}
} else {
Arc::new(InMemoryProgressStore::new())
}
}