use crate::progress::{Job, JobStatus, JobType};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
pub use crate::progress::{create_tracker as create_progress_tracker, ProgressTracker, ProgressTracker as IngestionProgressStore};
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, PartialEq)]
pub enum IngestionStep {
ValidatingConfig,
PreparingSchemas,
FlatteningData,
GettingAIRecommendation,
SettingUpSchema,
GeneratingMutations,
ExecutingMutations,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct IngestionResults {
pub schema_name: String,
pub new_schema_created: bool,
pub mutations_generated: usize,
pub mutations_executed: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct IngestionProgress {
pub id: String,
pub current_step: IngestionStep,
pub progress_percentage: u8,
pub status_message: String,
pub is_complete: bool,
pub is_failed: bool,
pub error_message: Option<String>,
pub results: Option<IngestionResults>,
pub started_at: u64,
pub completed_at: Option<u64>,
}
impl From<Job> for IngestionProgress {
fn from(job: Job) -> Self {
let current_step: IngestionStep = if let Some(step_val) = job.metadata.get("step") {
serde_json::from_value(step_val.clone()).unwrap_or(IngestionStep::ValidatingConfig)
} else {
match job.status {
JobStatus::Completed => IngestionStep::Completed,
JobStatus::Failed => IngestionStep::Failed,
_ => IngestionStep::ValidatingConfig,
}
};
IngestionProgress {
id: job.id,
current_step,
progress_percentage: job.progress_percentage,
status_message: job.message,
is_complete: matches!(job.status, JobStatus::Completed | JobStatus::Failed),
is_failed: matches!(job.status, JobStatus::Failed),
error_message: job.error,
results: job.result.and_then(|r| serde_json::from_value(r).ok()),
started_at: job.created_at,
completed_at: job.completed_at,
}
}
}
#[derive(Clone)]
pub struct ProgressService {
tracker: ProgressTracker,
}
impl ProgressService {
pub fn new(tracker: ProgressTracker) -> Self {
Self { tracker }
}
pub async fn start_progress(&self, id: String) -> IngestionProgress {
let mut job = Job::new(id, JobType::Ingestion);
let user_id = crate::logging::core::get_current_user_id().unwrap_or_else(|| "default".to_string());
job = job.with_user(user_id);
job.metadata = serde_json::json!({
"step": IngestionStep::ValidatingConfig
});
job.progress_percentage = 5;
job.message = "Starting ingestion process...".to_string();
let _ = self.tracker.save(&job).await;
job.into()
}
pub async fn update_progress(
&self,
id: &str,
step: IngestionStep,
message: String,
) -> Option<IngestionProgress> {
if let Ok(Some(mut job)) = self.tracker.load(id).await {
job.update_progress(Self::step_to_percentage(&step), message);
if let Ok(step_json) = serde_json::to_value(&step) {
if let serde_json::Value::Object(ref mut map) = job.metadata {
map.insert("step".to_string(), step_json);
} else {
job.metadata = serde_json::json!({ "step": step_json });
}
}
let _ = self.tracker.save(&job).await;
Some(job.into())
} else {
None
}
}
pub async fn update_progress_with_percentage(
&self,
id: &str,
step: IngestionStep,
message: String,
percentage: u8,
) -> Option<IngestionProgress> {
if let Ok(Some(mut job)) = self.tracker.load(id).await {
job.update_progress(percentage, message);
if let Ok(step_json) = serde_json::to_value(&step) {
if let serde_json::Value::Object(ref mut map) = job.metadata {
map.insert("step".to_string(), step_json);
} else {
job.metadata = serde_json::json!({ "step": step_json });
}
}
let _ = self.tracker.save(&job).await;
Some(job.into())
} else {
None
}
}
pub async fn complete_progress(
&self,
id: &str,
results: IngestionResults,
) -> Option<IngestionProgress> {
if let Ok(Some(mut job)) = self.tracker.load(id).await {
let result_json = serde_json::to_value(results).ok();
job.complete(result_json);
let step = IngestionStep::Completed;
if let Ok(step_json) = serde_json::to_value(&step) {
if let serde_json::Value::Object(ref mut map) = job.metadata {
map.insert("step".to_string(), step_json);
} else {
job.metadata = serde_json::json!({ "step": step_json });
}
}
let _ = self.tracker.save(&job).await;
Some(job.into())
} else {
None
}
}
pub async fn fail_progress(
&self,
id: &str,
error_message: String,
) -> Option<IngestionProgress> {
if let Ok(Some(mut job)) = self.tracker.load(id).await {
job.fail(error_message);
let step = IngestionStep::Failed;
if let Ok(step_json) = serde_json::to_value(&step) {
if let serde_json::Value::Object(ref mut map) = job.metadata {
map.insert("step".to_string(), step_json);
} else {
job.metadata = serde_json::json!({ "step": step_json });
}
}
let _ = self.tracker.save(&job).await;
Some(job.into())
} else {
None
}
}
pub async fn get_progress(&self, id: &str) -> Option<IngestionProgress> {
self.tracker.load(id).await.unwrap_or(None).map(|j| j.into())
}
pub async fn remove_progress(&self, id: &str) -> Option<IngestionProgress> {
if let Ok(Some(job)) = self.tracker.load(id).await {
let _ = self.tracker.delete(id).await;
Some(job.into())
} else {
None
}
}
pub async fn get_all_progress(&self) -> Vec<IngestionProgress> {
let user_id = crate::logging::core::get_current_user_id().unwrap_or_else(|| "default".to_string());
self.tracker.list_by_user(&user_id).await.unwrap_or_default()
.into_iter()
.filter(|j| matches!(j.job_type, JobType::Ingestion))
.map(|j| j.into())
.collect()
}
fn step_to_percentage(step: &IngestionStep) -> u8 {
match step {
IngestionStep::ValidatingConfig => 5,
IngestionStep::PreparingSchemas => 15,
IngestionStep::FlatteningData => 25,
IngestionStep::GettingAIRecommendation => 40,
IngestionStep::SettingUpSchema => 55,
IngestionStep::GeneratingMutations => 75,
IngestionStep::ExecutingMutations => 90,
IngestionStep::Completed => 100,
IngestionStep::Failed => 100,
}
}
}