use sqlx::Row;
use crate::db::DbPool;
use crate::error::AppResult;
#[derive(Debug, Clone)]
pub struct ObjectRow {
pub digest: String,
pub media_type: String,
pub bytes: Vec<u8>,
}
pub async fn ensure_table(pool: &DbPool) -> AppResult<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS noetl.object_store (
object_key TEXT PRIMARY KEY,
digest TEXT NOT NULL,
media_type TEXT NOT NULL DEFAULT 'application/octet-stream',
bytes BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn put(
pool: &DbPool,
object_key: &str,
digest: &str,
media_type: &str,
bytes: &[u8],
) -> AppResult<()> {
sqlx::query(
r#"
INSERT INTO noetl.object_store (object_key, digest, media_type, bytes)
VALUES ($1, $2, $3, $4)
ON CONFLICT (object_key) DO UPDATE
SET digest = EXCLUDED.digest,
media_type = EXCLUDED.media_type,
bytes = EXCLUDED.bytes,
created_at = now()
"#,
)
.bind(object_key)
.bind(digest)
.bind(media_type)
.bind(bytes)
.execute(pool)
.await?;
Ok(())
}
pub async fn get(pool: &DbPool, object_key: &str) -> AppResult<Option<ObjectRow>> {
let rows = sqlx::query(
r#"
SELECT digest, media_type, bytes
FROM noetl.object_store
WHERE object_key = $1
LIMIT 1
"#,
)
.bind(object_key)
.fetch_all(pool)
.await?;
Ok(rows.into_iter().next().map(|r| ObjectRow {
digest: r.get::<String, _>("digest"),
media_type: r.get::<String, _>("media_type"),
bytes: r.get::<Vec<u8>, _>("bytes"),
}))
}