use sha2::{Digest, Sha256};
use sqlx::Row;
use crate::db::DbPool;
use crate::engine::state::WorkflowState;
use crate::error::{AppError, AppResult};
const AGGREGATE_TYPE: &str = "orchestrator_workflow_state";
pub struct LoadedSnapshot {
pub state: WorkflowState,
pub version: i64,
pub applied_count: i64,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub routing_meta: Option<serde_json::Value>,
}
pub async fn save(
pool: &DbPool,
execution_id: i64,
version: i64,
applied_count: i64,
routing_meta: Option<&serde_json::Value>,
state: &WorkflowState,
) -> AppResult<()> {
let snapshot = serde_json::to_value(state)
.map_err(|e| AppError::Internal(format!("orch_snapshot.save: serialise: {e}")))?;
let checksum = {
let bytes = serde_json::to_vec(&snapshot).unwrap_or_default();
hex::encode(Sha256::digest(&bytes))
};
let meta = serde_json::json!({
"applied_count": applied_count,
"routing_meta": routing_meta,
});
sqlx::query(
r#"
INSERT INTO noetl.projection_snapshot
(aggregate_id, aggregate_type, version, snapshot, checksum, meta, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, now())
ON CONFLICT (tenant_id, organization_id, aggregate_type, aggregate_id)
DO UPDATE SET
version = EXCLUDED.version,
snapshot = EXCLUDED.snapshot,
checksum = EXCLUDED.checksum,
meta = EXCLUDED.meta,
updated_at = now()
"#,
)
.bind(execution_id.to_string())
.bind(AGGREGATE_TYPE)
.bind(version)
.bind(&snapshot)
.bind(&checksum)
.bind(&meta)
.execute(pool)
.await
.map_err(|e| AppError::Internal(format!("orch_snapshot.save: upsert: {e}")))?;
Ok(())
}
pub async fn load_latest(pool: &DbPool, execution_id: i64) -> AppResult<Option<LoadedSnapshot>> {
let row = sqlx::query(
r#"
SELECT version, snapshot, meta, updated_at
FROM noetl.projection_snapshot
WHERE aggregate_type = $1 AND aggregate_id = $2
AND tenant_id = 'default' AND organization_id = 'default'
"#,
)
.bind(AGGREGATE_TYPE)
.bind(execution_id.to_string())
.fetch_optional(pool)
.await
.map_err(|e| AppError::Internal(format!("orch_snapshot.load_latest: query: {e}")))?;
let Some(row) = row else {
return Ok(None);
};
let version: i64 = row.try_get("version").unwrap_or(0);
let updated_at: chrono::DateTime<chrono::Utc> =
row.try_get("updated_at").unwrap_or_else(|_| chrono::Utc::now());
let snapshot: serde_json::Value = row
.try_get("snapshot")
.map_err(|e| AppError::Internal(format!("orch_snapshot.load_latest: snapshot col: {e}")))?;
let meta: serde_json::Value = row.try_get("meta").unwrap_or(serde_json::Value::Null);
let applied_count = meta
.get("applied_count")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let routing_meta = meta
.get("routing_meta")
.filter(|v| !v.is_null())
.cloned();
let state: WorkflowState = match serde_json::from_value(snapshot) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
execution_id,
version,
%e,
"orch_snapshot.load_latest: snapshot deserialise failed; ignoring (full rebuild)"
);
return Ok(None);
}
};
Ok(Some(LoadedSnapshot {
state,
version,
applied_count,
updated_at,
routing_meta,
}))
}