use sqlx::Row;
use crate::db::DbPool;
use crate::error::AppResult;
#[derive(Debug, Clone)]
pub struct PluginModuleRow {
pub digest: String,
pub media_type: String,
pub bytes: Vec<u8>,
}
pub async fn ensure_table(pool: &DbPool) -> AppResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS noetl.plugin_module (
path TEXT NOT NULL,
version INTEGER NOT NULL,
digest TEXT NOT NULL,
media_type TEXT NOT NULL DEFAULT 'application/wasm',
bytes BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (path, version)
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn upsert(
pool: &DbPool,
path: &str,
version: i32,
digest: &str,
media_type: &str,
bytes: &[u8],
) -> AppResult<()> {
sqlx::query(
r#"
INSERT INTO noetl.plugin_module (path, version, digest, media_type, bytes)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (path, version) DO UPDATE
SET digest = EXCLUDED.digest,
media_type = EXCLUDED.media_type,
bytes = EXCLUDED.bytes,
created_at = now()
"#,
)
.bind(path)
.bind(version)
.bind(digest)
.bind(media_type)
.bind(bytes)
.execute(pool)
.await?;
Ok(())
}
pub async fn get(pool: &DbPool, path: &str, version: i32) -> AppResult<Option<PluginModuleRow>> {
let rows = sqlx::query(
r#"
SELECT digest, media_type, bytes
FROM noetl.plugin_module
WHERE path = $1 AND version = $2
LIMIT 1
"#,
)
.bind(path)
.bind(version)
.fetch_all(pool)
.await?;
Ok(rows.into_iter().next().map(|r| PluginModuleRow {
digest: r.get::<String, _>("digest"),
media_type: r.get::<String, _>("media_type"),
bytes: r.get::<Vec<u8>, _>("bytes"),
}))
}