use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use crate::config::constants::DEDUP_STALE_SECS;
const CLAIMS: TableDefinition<&str, &str> = TableDefinition::new("dedup_claims");
#[derive(Debug, thiserror::Error)]
pub enum DedupError {
#[error("dedup store open failed: {0}")]
Open(String),
#[error("dedup store transaction failed: {0}")]
Transaction(String),
#[error("dedup store (de)serialisation failed: {0}")]
Serde(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ClaimState {
InProgress,
Completed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ClaimRecord {
state: ClaimState,
updated_at: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClaimOutcome {
Claimed,
Skipped,
}
fn open_dedup_db_or_recreate(path: &Path) -> Result<Database, DedupError> {
match Database::create(path) {
Ok(db) => Ok(db),
Err(e) if super::redb_error_is_incompatible_format(&e) => {
let mut backup = path.as_os_str().to_os_string();
backup.push(".v2-incompatible");
let backup = std::path::PathBuf::from(backup);
std::fs::rename(path, &backup).map_err(|io| {
DedupError::Open(format!(
"incompatible-format dedup redb at {} could not be backed up: {io}",
path.display()
))
})?;
tracing::error!(
path = %path.display(),
backup = %backup.display(),
error = %e,
"dedup redb is in an incompatible/old format (redb 2.x); moved it aside and \
creating a fresh empty dedup store"
);
Database::create(path).map_err(|e| DedupError::Open(e.to_string()))
}
Err(e) => Err(DedupError::Open(e.to_string())),
}
}
pub struct DedupStore {
db: Database,
}
impl DedupStore {
pub fn open(path: &Path) -> Result<Self, DedupError> {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let db = open_dedup_db_or_recreate(path)?;
{
let write = db
.begin_write()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
{
write
.open_table(CLAIMS)
.map_err(|e| DedupError::Transaction(e.to_string()))?;
}
write
.commit()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
}
Ok(Self { db })
}
pub fn claim(
&self,
owner: &str,
repo: &str,
pr: u64,
head_sha: &str,
) -> Result<ClaimOutcome, DedupError> {
let key = Self::key(owner, repo, pr, head_sha);
let now = now_secs();
let write = self
.db
.begin_write()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
let outcome = {
let mut table = write
.open_table(CLAIMS)
.map_err(|e| DedupError::Transaction(e.to_string()))?;
let existing = table
.get(key.as_str())
.map_err(|e| DedupError::Transaction(e.to_string()))?
.map(|v| v.value().to_string());
let should_claim = match existing {
None => true,
Some(raw) => {
let rec: ClaimRecord =
serde_json::from_str(&raw).map_err(|e| DedupError::Serde(e.to_string()))?;
match rec.state {
ClaimState::Completed => false,
ClaimState::InProgress => {
now.saturating_sub(rec.updated_at) > DEDUP_STALE_SECS
}
}
}
};
if should_claim {
let rec = ClaimRecord {
state: ClaimState::InProgress,
updated_at: now,
};
let json =
serde_json::to_string(&rec).map_err(|e| DedupError::Serde(e.to_string()))?;
table
.insert(key.as_str(), json.as_str())
.map_err(|e| DedupError::Transaction(e.to_string()))?;
ClaimOutcome::Claimed
} else {
ClaimOutcome::Skipped
}
};
write
.commit()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
Ok(outcome)
}
pub fn complete(
&self,
owner: &str,
repo: &str,
pr: u64,
head_sha: &str,
) -> Result<(), DedupError> {
self.write_state(owner, repo, pr, head_sha, ClaimState::Completed)
}
pub fn release(
&self,
owner: &str,
repo: &str,
pr: u64,
head_sha: &str,
) -> Result<(), DedupError> {
let key = Self::key(owner, repo, pr, head_sha);
let write = self
.db
.begin_write()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
{
let mut table = write
.open_table(CLAIMS)
.map_err(|e| DedupError::Transaction(e.to_string()))?;
table
.remove(key.as_str())
.map_err(|e| DedupError::Transaction(e.to_string()))?;
}
write
.commit()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
Ok(())
}
fn write_state(
&self,
owner: &str,
repo: &str,
pr: u64,
head_sha: &str,
state: ClaimState,
) -> Result<(), DedupError> {
let key = Self::key(owner, repo, pr, head_sha);
let rec = ClaimRecord {
state,
updated_at: now_secs(),
};
let json = serde_json::to_string(&rec).map_err(|e| DedupError::Serde(e.to_string()))?;
let write = self
.db
.begin_write()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
{
let mut table = write
.open_table(CLAIMS)
.map_err(|e| DedupError::Transaction(e.to_string()))?;
table
.insert(key.as_str(), json.as_str())
.map_err(|e| DedupError::Transaction(e.to_string()))?;
}
write
.commit()
.map_err(|e| DedupError::Transaction(e.to_string()))?;
Ok(())
}
fn key(owner: &str, repo: &str, pr: u64, head_sha: &str) -> String {
format!("{owner}/{repo}/{pr}/{head_sha}")
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_store() -> (DedupStore, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("dedup.redb");
let store = DedupStore::open(&path).expect("open store");
(store, dir)
}
#[test]
fn open_creates_file() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("nested").join("dedup.redb");
let _store = DedupStore::open(&path).expect("open");
assert!(path.exists(), "redb file must be created");
}
#[test]
fn incompatible_dedup_db_is_recreated() {
use std::io::Write;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("dedup.redb");
std::fs::File::create(&path)
.and_then(|mut f| f.write_all(&[0xABu8; 4096]))
.unwrap();
let store = DedupStore::open(&path).expect("incompatible dedup db must recover, not error");
assert!(
path.with_file_name("dedup.redb.v2-incompatible").exists(),
"incompatible dedup file must be backed up"
);
assert_eq!(
store.claim("o", "r", 1, "sha").unwrap(),
ClaimOutcome::Claimed
);
}
#[test]
fn first_claim_succeeds() {
let (store, _d) = temp_store();
let outcome = store.claim("acme", "backend", 42, "sha-abc").unwrap();
assert_eq!(outcome, ClaimOutcome::Claimed);
}
#[test]
fn concurrent_in_progress_skips() {
let (store, _d) = temp_store();
assert_eq!(
store.claim("acme", "backend", 42, "sha-abc").unwrap(),
ClaimOutcome::Claimed
);
assert_eq!(
store.claim("acme", "backend", 42, "sha-abc").unwrap(),
ClaimOutcome::Skipped
);
}
#[test]
fn claim_then_skip_after_complete() {
let (store, _d) = temp_store();
assert_eq!(
store.claim("acme", "backend", 42, "sha-abc").unwrap(),
ClaimOutcome::Claimed
);
store.complete("acme", "backend", 42, "sha-abc").unwrap();
assert_eq!(
store.claim("acme", "backend", 42, "sha-abc").unwrap(),
ClaimOutcome::Skipped
);
}
#[test]
fn claim_allows_after_release() {
let (store, _d) = temp_store();
assert_eq!(
store.claim("acme", "backend", 42, "sha-abc").unwrap(),
ClaimOutcome::Claimed
);
store.release("acme", "backend", 42, "sha-abc").unwrap();
assert_eq!(
store.claim("acme", "backend", 42, "sha-abc").unwrap(),
ClaimOutcome::Claimed
);
}
#[test]
fn different_sha_not_skipped() {
let (store, _d) = temp_store();
store.claim("acme", "backend", 42, "sha-abc").unwrap();
store.complete("acme", "backend", 42, "sha-abc").unwrap();
assert_eq!(
store.claim("acme", "backend", 42, "sha-def").unwrap(),
ClaimOutcome::Claimed
);
}
#[test]
fn stale_in_progress_is_reclaimable() {
let (store, _d) = temp_store();
let key = DedupStore::key("acme", "backend", 42, "sha-stale");
let stale = ClaimRecord {
state: ClaimState::InProgress,
updated_at: now_secs().saturating_sub(DEDUP_STALE_SECS + 10),
};
let json = serde_json::to_string(&stale).unwrap();
let write = store.db.begin_write().unwrap();
{
let mut t = write.open_table(CLAIMS).unwrap();
t.insert(key.as_str(), json.as_str()).unwrap();
}
write.commit().unwrap();
assert_eq!(
store.claim("acme", "backend", 42, "sha-stale").unwrap(),
ClaimOutcome::Claimed,
"a stale in-progress claim must be reclaimable"
);
}
}