use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder,
QuerySelect, RelationTrait,
};
use serde_json::json;
use uuid::Uuid;
use crate::conversions::{domain_status_to_entity, entity_status_to_domain};
use crate::entities::{dataset, pipeline_run, pipeline_run_payload_field};
use crate::types::{DatabaseError, PipelineRun, PipelineRunStatus};
use crate::uuid_hex;
use super::repository::{PipelineRunRepository, PipelineRunRow, PipelineRunWithAttributionRow};
pub struct SeaOrmPipelineRunRepository {
db: Arc<DatabaseConnection>,
}
impl SeaOrmPipelineRunRepository {
pub fn new(db: Arc<DatabaseConnection>) -> Self {
Self { db }
}
}
#[async_trait]
impl PipelineRunRepository for SeaOrmPipelineRunRepository {
async fn log_pipeline_run(
&self,
pipeline_run_id: Uuid,
pipeline_id: Uuid,
pipeline_name: &str,
dataset_id: Option<Uuid>,
status: PipelineRunStatus,
run_info: Option<serde_json::Value>,
) -> Result<Uuid, DatabaseError> {
let row_id = Uuid::new_v4();
let active = pipeline_run::ActiveModel {
id: sea_orm::ActiveValue::Set(uuid_hex::to_hex(row_id)),
created_at: sea_orm::ActiveValue::Set(Utc::now()),
status: sea_orm::ActiveValue::Set(domain_status_to_entity(status)),
pipeline_run_id: sea_orm::ActiveValue::Set(uuid_hex::to_hex(pipeline_run_id)),
pipeline_name: sea_orm::ActiveValue::Set(pipeline_name.to_string()),
pipeline_id: sea_orm::ActiveValue::Set(uuid_hex::to_hex(pipeline_id)),
dataset_id: sea_orm::ActiveValue::Set(uuid_hex::to_hex_opt(dataset_id)),
run_info: sea_orm::ActiveValue::Set(run_info),
};
active.insert(self.db.as_ref()).await.map_err(|e| {
DatabaseError::QueryError(format!("log_pipeline_run insert failed: {e}"))
})?;
Ok(row_id)
}
async fn latest_status(
&self,
dataset_ids: &[Uuid],
pipeline_name: &str,
) -> Result<HashMap<Uuid, PipelineRunStatus>, DatabaseError> {
if dataset_ids.is_empty() {
return Ok(HashMap::new());
}
let hex_ids: Vec<String> = dataset_ids.iter().map(|id| uuid_hex::to_hex(*id)).collect();
let rows = pipeline_run::Entity::find()
.filter(pipeline_run::Column::PipelineName.eq(pipeline_name))
.filter(pipeline_run::Column::DatasetId.is_in(hex_ids))
.order_by_desc(pipeline_run::Column::CreatedAt)
.all(self.db.as_ref())
.await
.map_err(|e| DatabaseError::QueryError(format!("latest_status query failed: {e}")))?;
let mut result: HashMap<Uuid, PipelineRunStatus> = HashMap::new();
for row in rows {
let run: PipelineRun = row.into();
if let Some(did) = run.dataset_id {
result.entry(did).or_insert(run.status);
}
}
Ok(result)
}
async fn list_recent(
&self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Result<Vec<PipelineRunRow>, DatabaseError> {
let mut query = pipeline_run::Entity::find()
.order_by_desc(pipeline_run::Column::CreatedAt)
.limit(u64::from(limit));
if let Some(did) = dataset_id {
query = query.filter(pipeline_run::Column::DatasetId.eq(uuid_hex::to_hex(did)));
}
let rows = query
.all(self.db.as_ref())
.await
.map_err(|e| DatabaseError::QueryError(format!("list_recent query failed: {e}")))?;
Ok(rows.into_iter().map(PipelineRun::from).collect())
}
async fn list_recent_with_attribution(
&self,
dataset_id: Option<Uuid>,
limit: u32,
) -> Result<Vec<PipelineRunWithAttributionRow>, DatabaseError> {
use sea_orm::JoinType;
let mut query = pipeline_run::Entity::find()
.select_only()
.column(pipeline_run::Column::Id)
.column(pipeline_run::Column::CreatedAt)
.column(pipeline_run::Column::Status)
.column(pipeline_run::Column::PipelineRunId)
.column(pipeline_run::Column::PipelineName)
.column(pipeline_run::Column::PipelineId)
.column(pipeline_run::Column::DatasetId)
.column_as(dataset::Column::Name, "dataset_name")
.column_as(dataset::Column::OwnerId, "dataset_owner_id")
.join(JoinType::LeftJoin, pipeline_run::Relation::Dataset.def())
.order_by_desc(pipeline_run::Column::CreatedAt)
.limit(u64::from(limit));
if let Some(did) = dataset_id {
query = query.filter(pipeline_run::Column::DatasetId.eq(uuid_hex::to_hex(did)));
}
let raw = query
.into_tuple::<(
String,
chrono::DateTime<Utc>,
pipeline_run::PipelineRunStatus,
String,
String,
String,
Option<String>,
Option<String>,
Option<String>,
)>()
.all(self.db.as_ref())
.await
.map_err(|e| {
DatabaseError::QueryError(format!("list_recent_with_attribution query failed: {e}"))
})?;
let mut rows = Vec::with_capacity(raw.len());
for (
id_hex,
created_at,
status,
pipeline_run_hex,
pipeline_name,
pipeline_id_hex,
dataset_id_hex,
dataset_name,
owner_id_hex,
) in raw
{
let dataset_uuid = dataset_id_hex
.as_deref()
.and_then(|s| uuid_hex::from_hex(s).ok());
let owner_uuid = owner_id_hex
.as_deref()
.and_then(|s| uuid_hex::from_hex(s).ok());
let (dataset_id_field, dataset_name_field) = if dataset_name.is_some() {
(dataset_uuid, dataset_name)
} else {
(dataset_uuid, None)
};
rows.push(PipelineRunWithAttributionRow {
id: uuid_hex::from_hex(&id_hex)
.map_err(|e| DatabaseError::QueryError(format!("invalid id hex: {e}")))?,
created_at,
status: entity_status_to_domain(status),
pipeline_run_id: uuid_hex::from_hex(&pipeline_run_hex).map_err(|e| {
DatabaseError::QueryError(format!("invalid pipeline_run_id hex: {e}"))
})?,
pipeline_name,
pipeline_id: uuid_hex::from_hex(&pipeline_id_hex).map_err(|e| {
DatabaseError::QueryError(format!("invalid pipeline_id hex: {e}"))
})?,
dataset_id: dataset_id_field,
dataset_name: dataset_name_field,
owner_id: owner_uuid,
owner_email: None,
});
}
Ok(rows)
}
async fn reset_orphans(&self, reason: &str) -> Result<u64, DatabaseError> {
let all_rows = pipeline_run::Entity::find()
.order_by_desc(pipeline_run::Column::CreatedAt)
.all(self.db.as_ref())
.await
.map_err(|e| DatabaseError::QueryError(format!("reset_orphans fetch failed: {e}")))?;
let mut latest_per_run: HashMap<String, pipeline_run::Model> = HashMap::new();
for row in all_rows {
latest_per_run
.entry(row.pipeline_run_id.clone())
.or_insert(row);
}
let orphan_ids: Vec<String> = latest_per_run
.into_values()
.filter(|row| {
matches!(
row.status,
pipeline_run::PipelineRunStatus::Initiated
| pipeline_run::PipelineRunStatus::Started
)
})
.map(|row| row.id)
.collect();
if orphan_ids.is_empty() {
return Ok(0);
}
let reason_info = json!({"reason": reason});
let mut count = 0u64;
for orphan_id in &orphan_ids {
let orphan_opt = pipeline_run::Entity::find_by_id(orphan_id.clone())
.one(self.db.as_ref())
.await
.map_err(|e| {
DatabaseError::QueryError(format!("reset_orphans fetch orphan failed: {e}"))
})?;
if let Some(orphan) = orphan_opt {
let new_id = Uuid::new_v4();
let active = pipeline_run::ActiveModel {
id: sea_orm::ActiveValue::Set(uuid_hex::to_hex(new_id)),
created_at: sea_orm::ActiveValue::Set(Utc::now()),
status: sea_orm::ActiveValue::Set(pipeline_run::PipelineRunStatus::Errored),
pipeline_run_id: sea_orm::ActiveValue::Set(orphan.pipeline_run_id),
pipeline_name: sea_orm::ActiveValue::Set(orphan.pipeline_name),
pipeline_id: sea_orm::ActiveValue::Set(orphan.pipeline_id),
dataset_id: sea_orm::ActiveValue::Set(orphan.dataset_id),
run_info: sea_orm::ActiveValue::Set(Some(reason_info.clone())),
};
active.insert(self.db.as_ref()).await.map_err(|e| {
DatabaseError::QueryError(format!("reset_orphans insert failed: {e}"))
})?;
count += 1;
}
}
Ok(count)
}
async fn set_payload_field(
&self,
run_id: Uuid,
key: &str,
value: serde_json::Value,
) -> Result<(), DatabaseError> {
use sea_orm::sea_query::OnConflict;
let now = Utc::now();
let model = pipeline_run_payload_field::ActiveModel {
pipeline_run_id: sea_orm::ActiveValue::Set(uuid_hex::to_hex(run_id)),
key: sea_orm::ActiveValue::Set(key.to_owned()),
value: sea_orm::ActiveValue::Set(value),
created_at: sea_orm::ActiveValue::Set(now),
updated_at: sea_orm::ActiveValue::Set(now),
};
pipeline_run_payload_field::Entity::insert(model)
.on_conflict(
OnConflict::columns([
pipeline_run_payload_field::Column::PipelineRunId,
pipeline_run_payload_field::Column::Key,
])
.update_columns([
pipeline_run_payload_field::Column::Value,
pipeline_run_payload_field::Column::UpdatedAt,
])
.to_owned(),
)
.exec(self.db.as_ref())
.await
.map_err(|e| {
DatabaseError::QueryError(format!("set_payload_field upsert failed: {e}"))
})?;
Ok(())
}
async fn get_payload(
&self,
run_id: Uuid,
) -> Result<serde_json::Map<String, serde_json::Value>, DatabaseError> {
let rows = pipeline_run_payload_field::Entity::find()
.filter(pipeline_run_payload_field::Column::PipelineRunId.eq(uuid_hex::to_hex(run_id)))
.all(self.db.as_ref())
.await
.map_err(|e| DatabaseError::QueryError(format!("get_payload query failed: {e}")))?;
Ok(rows.into_iter().map(|m| (m.key, m.value)).collect())
}
async fn get_pipeline_run(
&self,
pipeline_run_id: Uuid,
) -> Result<Option<PipelineRun>, DatabaseError> {
let row = pipeline_run::Entity::find()
.filter(pipeline_run::Column::PipelineRunId.eq(uuid_hex::to_hex(pipeline_run_id)))
.order_by_desc(pipeline_run::Column::CreatedAt)
.one(self.db.as_ref())
.await
.map_err(|e| {
DatabaseError::QueryError(format!("get_pipeline_run query failed: {e}"))
})?;
Ok(row.map(PipelineRun::from))
}
async fn get_pipeline_run_by_dataset(
&self,
dataset_id: Uuid,
pipeline_name: &str,
) -> Result<Option<PipelineRun>, DatabaseError> {
let row = pipeline_run::Entity::find()
.filter(pipeline_run::Column::DatasetId.eq(uuid_hex::to_hex(dataset_id)))
.filter(pipeline_run::Column::PipelineName.eq(pipeline_name))
.order_by_desc(pipeline_run::Column::CreatedAt)
.one(self.db.as_ref())
.await
.map_err(|e| {
DatabaseError::QueryError(format!("get_pipeline_run_by_dataset query failed: {e}"))
})?;
Ok(row.map(PipelineRun::from))
}
async fn get_pipeline_runs_by_dataset(
&self,
dataset_id: Uuid,
) -> Result<Vec<PipelineRun>, DatabaseError> {
let rows = pipeline_run::Entity::find()
.filter(pipeline_run::Column::DatasetId.eq(uuid_hex::to_hex(dataset_id)))
.order_by_desc(pipeline_run::Column::CreatedAt)
.all(self.db.as_ref())
.await
.map_err(|e| {
DatabaseError::QueryError(format!("get_pipeline_runs_by_dataset query failed: {e}"))
})?;
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut out = Vec::new();
for row in rows {
if seen.insert(row.pipeline_name.clone()) {
out.push(PipelineRun::from(row));
}
}
Ok(out)
}
}