use crate::adapters::database::records::BackupRecord;
use crate::domain::backup::Backup;
use crate::error::Result;
use sqlx::PgConnection;
use time::OffsetDateTime;
use uuid::Uuid;
#[derive(Clone, Debug, Default)]
pub struct BackupRepository {}
impl BackupRepository {
#[must_use]
pub const fn new() -> Self {
Self {}
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn find_by_user_id(&self, conn: &mut PgConnection, user_id: Uuid) -> Result<Option<Backup>> {
let record = sqlx::query_as::<_, BackupRecord>("SELECT * FROM backups WHERE user_id = $1")
.bind(user_id)
.fetch_optional(conn)
.await?;
Ok(record.map(Into::into))
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn create_if_not_exists(&self, conn: &mut PgConnection, user_id: Uuid) -> Result<Backup> {
let record = sqlx::query_as::<_, BackupRecord>(
r#"
INSERT INTO backups (user_id)
VALUES ($1)
ON CONFLICT (user_id) DO UPDATE SET user_id = EXCLUDED.user_id
RETURNING *
"#,
)
.bind(user_id)
.fetch_one(conn)
.await?;
Ok(record.into())
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn reserve_slot(
&self,
conn: &mut PgConnection,
user_id: Uuid,
expected_version: i32,
) -> Result<Option<Backup>> {
let record = sqlx::query_as::<_, BackupRecord>(
r#"
UPDATE backups
SET
state = 'UPLOADING',
pending_version = current_version + 1,
pending_at = NOW()
WHERE user_id = $1 AND current_version = $2 AND state = 'ACTIVE'
RETURNING *
"#,
)
.bind(user_id)
.bind(expected_version)
.fetch_optional(conn)
.await?;
Ok(record.map(Into::into))
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn reserve_slot_force(&self, conn: &mut PgConnection, user_id: Uuid) -> Result<Backup> {
let record = sqlx::query_as::<_, BackupRecord>(
r#"
UPDATE backups
SET
state = 'UPLOADING',
pending_version = current_version + 1,
pending_at = NOW()
WHERE user_id = $1
RETURNING *
"#,
)
.bind(user_id)
.fetch_one(conn)
.await?;
Ok(record.into())
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn commit_version(
&self,
conn: &mut PgConnection,
user_id: Uuid,
pending_version: i32,
) -> Result<()> {
sqlx::query(
r#"
UPDATE backups
SET
current_version = $2,
pending_version = NULL,
state = 'ACTIVE',
updated_at = NOW(),
pending_at = NULL
WHERE user_id = $1 AND pending_version = $2 AND state = 'UPLOADING'
"#,
)
.bind(user_id)
.bind(pending_version)
.execute(conn)
.await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn fetch_stale_uploads(
&self,
conn: &mut PgConnection,
threshold: OffsetDateTime,
limit: i64,
) -> Result<Vec<Backup>> {
let records = sqlx::query_as::<_, BackupRecord>(
"SELECT * FROM backups WHERE state = 'UPLOADING' AND pending_at < $1 LIMIT $2",
)
.bind(threshold)
.bind(limit)
.fetch_all(conn)
.await?;
Ok(records.into_iter().map(Into::into).collect())
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) async fn reset_stale(&self, conn: &mut PgConnection, user_id: Uuid) -> Result<()> {
sqlx::query(
"UPDATE backups SET state = 'ACTIVE', pending_version = NULL, pending_at = NULL WHERE user_id = $1",
)
.bind(user_id)
.execute(conn)
.await?;
Ok(())
}
}