use aion_store::{PackageRecord, PackageRouteRecord, StoreError};
use chrono::{DateTime, SecondsFormat, Utc};
use libsql::TransactionBehavior;
const UPSERT_PACKAGE_SQL: &str = "
INSERT INTO packages (workflow_type, content_hash, archive, deployed_at)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(workflow_type, content_hash)
DO UPDATE SET archive = excluded.archive, deployed_at = excluded.deployed_at";
const UPSERT_ROUTE_SQL: &str = "
INSERT INTO package_routes (workflow_type, content_hash)
VALUES (?1, ?2)
ON CONFLICT(workflow_type) DO UPDATE SET content_hash = excluded.content_hash";
const LIST_PACKAGES_SQL: &str = "
SELECT workflow_type, content_hash, archive, deployed_at
FROM packages
ORDER BY deployed_at ASC, workflow_type ASC, content_hash ASC";
const DELETE_PACKAGE_SQL: &str = "
DELETE FROM packages WHERE workflow_type = ?1 AND content_hash = ?2";
const LIST_ROUTES_SQL: &str = "
SELECT workflow_type, content_hash FROM package_routes ORDER BY workflow_type ASC";
pub(crate) async fn put_package(
conn: &libsql::Connection,
record: PackageRecord,
) -> Result<(), StoreError> {
let tx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.await
.map_err(|error| crate::error::libsql_error(&error))?;
let result = async {
tx.execute(
UPSERT_PACKAGE_SQL,
libsql::params![
record.workflow_type.clone(),
record.content_hash.clone(),
record.archive,
encode_deployed_at(record.deployed_at),
],
)
.await
.map_err(|error| crate::error::libsql_error(&error))?;
tx.execute(
UPSERT_ROUTE_SQL,
libsql::params![record.workflow_type, record.content_hash],
)
.await
.map_err(|error| crate::error::libsql_error(&error))?;
Ok(())
}
.await;
match result {
Ok(()) => tx
.commit()
.await
.map_err(|error| crate::error::libsql_error(&error)),
Err(error) => {
rollback(tx).await?;
Err(error)
}
}
}
pub(crate) async fn list_packages(
conn: &libsql::Connection,
) -> Result<Vec<PackageRecord>, StoreError> {
let mut rows = conn
.query(LIST_PACKAGES_SQL, ())
.await
.map_err(|error| crate::error::libsql_error(&error))?;
let mut records = Vec::new();
while let Some(row) = rows
.next()
.await
.map_err(|error| crate::error::libsql_error(&error))?
{
let workflow_type: String = row
.get(0)
.map_err(|error| crate::error::libsql_error(&error))?;
let content_hash: String = row
.get(1)
.map_err(|error| crate::error::libsql_error(&error))?;
let archive: Vec<u8> = row
.get(2)
.map_err(|error| crate::error::libsql_error(&error))?;
let deployed_at: String = row
.get(3)
.map_err(|error| crate::error::libsql_error(&error))?;
records.push(PackageRecord {
workflow_type,
content_hash,
archive,
deployed_at: decode_deployed_at(&deployed_at)?,
});
}
Ok(records)
}
pub(crate) async fn delete_package(
conn: &libsql::Connection,
workflow_type: &str,
content_hash: &str,
) -> Result<(), StoreError> {
conn.execute(
DELETE_PACKAGE_SQL,
libsql::params![workflow_type, content_hash],
)
.await
.map_err(|error| crate::error::libsql_error(&error))?;
Ok(())
}
pub(crate) async fn put_package_route(
conn: &libsql::Connection,
workflow_type: &str,
content_hash: &str,
) -> Result<(), StoreError> {
conn.execute(
UPSERT_ROUTE_SQL,
libsql::params![workflow_type, content_hash],
)
.await
.map_err(|error| crate::error::libsql_error(&error))?;
Ok(())
}
pub(crate) async fn list_package_routes(
conn: &libsql::Connection,
) -> Result<Vec<PackageRouteRecord>, StoreError> {
let mut rows = conn
.query(LIST_ROUTES_SQL, ())
.await
.map_err(|error| crate::error::libsql_error(&error))?;
let mut routes = Vec::new();
while let Some(row) = rows
.next()
.await
.map_err(|error| crate::error::libsql_error(&error))?
{
let workflow_type: String = row
.get(0)
.map_err(|error| crate::error::libsql_error(&error))?;
let content_hash: String = row
.get(1)
.map_err(|error| crate::error::libsql_error(&error))?;
routes.push(PackageRouteRecord {
workflow_type,
content_hash,
});
}
Ok(routes)
}
async fn rollback(tx: libsql::Transaction) -> Result<(), StoreError> {
tx.rollback()
.await
.map_err(|error| crate::error::libsql_error(&error))
}
fn encode_deployed_at(deployed_at: DateTime<Utc>) -> String {
deployed_at.to_rfc3339_opts(SecondsFormat::Nanos, true)
}
fn decode_deployed_at(value: &str) -> Result<DateTime<Utc>, StoreError> {
DateTime::parse_from_rfc3339(value)
.map(|date_time| date_time.with_timezone(&Utc))
.map_err(|error| StoreError::Serialization(error.to_string()))
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use aion_store::{PackageRecord, StoreError};
use chrono::{TimeZone, Utc};
use crate::config::{LibSqlConfig, LibSqlMode};
#[tokio::test]
async fn deployed_at_round_trips_through_persisted_row() -> Result<(), StoreError> {
let conn = open_test_connection("round-trip").await?;
let deployed_at = Utc
.with_ymd_and_hms(2026, 6, 12, 9, 30, 0)
.single()
.ok_or_else(|| StoreError::Serialization(String::from("invalid test instant")))?;
let record = PackageRecord {
workflow_type: "checkout".to_owned(),
content_hash: "a".repeat(64),
archive: b"archive-bytes".to_vec(),
deployed_at,
};
super::put_package(&conn, record.clone()).await?;
let listed = super::list_packages(&conn).await?;
assert_eq!(listed, vec![record]);
Ok(())
}
async fn open_test_connection(name: &str) -> Result<libsql::Connection, StoreError> {
let config = LibSqlConfig {
mode: LibSqlMode::Embedded {
path: unique_temp_path(name),
},
journal_mode: None,
synchronous: None,
sync_interval_seconds: None,
};
let conn = crate::connection::open_connection(&config)
.await?
.connection;
crate::schema::ensure_schema(&conn).await?;
Ok(conn)
}
fn unique_temp_path(name: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos());
std::env::temp_dir().join(format!(
"aion-store-libsql-package-{name}-{}-{nanos}.db",
std::process::id()
))
}
}