use chrono::{DateTime, Utc};
use sqlx::Row;
use crate::db::DbPool;
use crate::error::AppResult;
#[derive(Debug, Clone)]
pub struct ResultStoreRow {
pub result_id: i64,
pub execution_id: i64,
pub name: String,
pub scope: String,
pub source_step: Option<String>,
pub data: serde_json::Value,
pub bytes: i64,
pub sha256: String,
pub media_type: String,
pub created_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
}
pub async fn ensure_table(pool: &DbPool) -> AppResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS noetl.result_store (
result_id BIGINT PRIMARY KEY,
execution_id BIGINT NOT NULL,
name TEXT NOT NULL,
scope TEXT NOT NULL,
source_step TEXT,
data JSONB NOT NULL,
bytes BIGINT NOT NULL,
sha256 TEXT NOT NULL,
media_type TEXT NOT NULL DEFAULT 'application/json',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS result_store_eid_name
ON noetl.result_store (execution_id, name)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS result_store_created_at
ON noetl.result_store (created_at)
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn insert(pool: &DbPool, row: &ResultStoreRow) -> AppResult<()> {
sqlx::query(
r#"
INSERT INTO noetl.result_store (
result_id, execution_id, name, scope, source_step,
data, bytes, sha256, media_type, created_at, expires_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (result_id) DO NOTHING
"#,
)
.bind(row.result_id)
.bind(row.execution_id)
.bind(&row.name)
.bind(&row.scope)
.bind(row.source_step.as_deref())
.bind(&row.data)
.bind(row.bytes)
.bind(&row.sha256)
.bind(&row.media_type)
.bind(row.created_at)
.bind(row.expires_at)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_by_ref(
pool: &DbPool,
execution_id: i64,
name: &str,
result_id: i64,
) -> AppResult<Option<ResultStoreRow>> {
let rows = sqlx::query(
r#"
SELECT
result_id, execution_id, name, scope, source_step,
data, bytes, sha256, media_type, created_at, expires_at
FROM noetl.result_store
WHERE execution_id = $1
AND name = $2
AND result_id = $3
LIMIT 1
"#,
)
.bind(execution_id)
.bind(name)
.bind(result_id)
.fetch_all(pool)
.await?;
Ok(rows.into_iter().next().map(|r| ResultStoreRow {
result_id: r.get::<i64, _>("result_id"),
execution_id: r.get::<i64, _>("execution_id"),
name: r.get::<String, _>("name"),
scope: r.get::<String, _>("scope"),
source_step: r.get::<Option<String>, _>("source_step"),
data: r.get::<serde_json::Value, _>("data"),
bytes: r.get::<i64, _>("bytes"),
sha256: r.get::<String, _>("sha256"),
media_type: r.get::<String, _>("media_type"),
created_at: r.get::<DateTime<Utc>, _>("created_at"),
expires_at: r.get::<Option<DateTime<Utc>>, _>("expires_at"),
}))
}