use std::{sync::Arc, time::Duration};
use chrono::Utc;
use thiserror::Error;
use crate::{
storage::StorageError,
supertable::{
handle::Supertable,
wal::{
lease::{self, LeaseError},
persistence::{Etag, WalStore, WalStoreError},
pipeline::{
self, AppendPhaseError, AppendPhaseOutcome, TombstonePhaseError,
TombstonePhaseOutcome,
},
state_doc::{OpKind, SupertableHandleId, WalId, WalState, WalStateDoc},
},
},
};
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct RecoveryReport {
pub n_scanned: usize,
pub n_already_complete: usize,
pub n_held_by_peer: usize,
pub n_full_pipeline_completed: usize,
pub n_tombstone_only_completed: usize,
pub n_vanished_during_scan: usize,
pub n_cas_lost: usize,
pub total_targets_tombstoned: usize,
pub total_targets_not_found: usize,
}
#[derive(Debug, Error)]
pub enum RecoveryError {
#[error("recovery sweep requires storage; supertable has none attached")]
NoStorageAttached,
#[error("failed to list WAL state docs: {0}")]
ListFailed(#[from] WalStoreError),
}
pub async fn scan_and_recover(
supertable: &Supertable,
owner: SupertableHandleId,
lease_duration: Duration,
) -> Result<RecoveryReport, RecoveryError> {
let inner = supertable.inner();
if inner.options.storage.is_none() {
return Err(RecoveryError::NoStorageAttached);
}
let storage = inner
.options
.storage
.as_ref()
.expect("checked above")
.clone();
let wal_store = WalStore::new(Arc::clone(&storage));
let mut report = RecoveryReport::default();
let wal_ids = wal_store.list_wal_ids().await?;
for wal_id in wal_ids {
report.n_scanned += 1;
match recover_one(supertable, &wal_store, wal_id, owner, lease_duration).await {
Ok(outcome) => {
outcome.fold_into(&mut report);
}
Err(SweepStep::Vanished) => {
report.n_vanished_during_scan += 1;
}
Err(SweepStep::HeldByPeer) => {
report.n_held_by_peer += 1;
}
Err(SweepStep::CasLost) => {
report.n_cas_lost += 1;
}
}
}
Ok(report)
}
enum OneWalOutcome {
AlreadyComplete,
FullPipeline {
n_tombstoned: usize,
n_not_found: usize,
},
TombstoneOnly {
n_tombstoned: usize,
n_not_found: usize,
},
}
impl OneWalOutcome {
fn fold_into(self, report: &mut RecoveryReport) {
match self {
OneWalOutcome::AlreadyComplete => {
report.n_already_complete += 1;
}
OneWalOutcome::FullPipeline {
n_tombstoned,
n_not_found,
} => {
report.n_full_pipeline_completed += 1;
report.total_targets_tombstoned += n_tombstoned;
report.total_targets_not_found += n_not_found;
}
OneWalOutcome::TombstoneOnly {
n_tombstoned,
n_not_found,
} => {
report.n_tombstone_only_completed += 1;
report.total_targets_tombstoned += n_tombstoned;
report.total_targets_not_found += n_not_found;
}
}
}
}
enum SweepStep {
Vanished,
HeldByPeer,
CasLost,
}
async fn recover_one(
supertable: &Supertable,
wal_store: &WalStore,
wal_id: WalId,
owner: SupertableHandleId,
lease_duration: Duration,
) -> Result<OneWalOutcome, SweepStep> {
let doc = match wal_store.read(wal_id).await {
Ok((d, _etag)) => d,
Err(WalStoreError::Storage {
source: StorageError::NotFound { .. },
..
}) => return Err(SweepStep::Vanished),
Err(_) => return Err(SweepStep::Vanished),
};
if doc.state == WalState::Complete {
return Ok(OneWalOutcome::AlreadyComplete);
}
let now = Utc::now();
let (doc, etag) = match lease::try_acquire(wal_store, wal_id, owner, now, lease_duration).await
{
Ok((d, e)) => (d, e),
Err(LeaseError::Conflict { .. }) => return Err(SweepStep::HeldByPeer),
Err(LeaseError::CasLost) => return Err(SweepStep::CasLost),
Err(LeaseError::InvalidPreState { .. }) => {
return Ok(OneWalOutcome::AlreadyComplete);
}
Err(_) => return Err(SweepStep::CasLost),
};
drive_to_complete(supertable, wal_store, doc, etag).await
}
async fn drive_to_complete(
supertable: &Supertable,
wal_store: &WalStore,
doc: WalStateDoc,
etag: Etag,
) -> Result<OneWalOutcome, SweepStep> {
let (post_doc, post_etag, append_ran) = match (doc.op_kind, doc.state) {
(OpKind::Update, WalState::Intent) => {
match pipeline::run_append_phase(supertable, wal_store, &doc, &etag).await {
Ok((_, d, e)) => (d, e, true),
Err(AppendPhaseError::WalStore(WalStoreError::CasFailed { .. })) => {
return Err(SweepStep::CasLost);
}
Err(_) => return Err(SweepStep::CasLost),
}
}
(OpKind::Update, WalState::Appended) | (OpKind::Delete, WalState::Intent) => {
(doc, etag, false)
}
_ => return Err(SweepStep::CasLost),
};
let outcome =
match pipeline::run_tombstone_phase(supertable, wal_store, &post_doc, &post_etag).await {
Ok((
TombstonePhaseOutcome::Applied {
n_tombstoned,
n_not_found,
},
_,
_,
))
| Ok((
TombstonePhaseOutcome::AlreadyComplete {
n_tombstoned,
n_not_found,
},
_,
_,
)) => (n_tombstoned, n_not_found),
Err(TombstonePhaseError::WalStore(WalStoreError::CasFailed { .. })) => {
return Err(SweepStep::CasLost);
}
Err(_) => return Err(SweepStep::CasLost),
};
let _ = AppendPhaseOutcome::Applied;
if append_ran {
Ok(OneWalOutcome::FullPipeline {
n_tombstoned: outcome.0,
n_not_found: outcome.1,
})
} else {
Ok(OneWalOutcome::TombstoneOnly {
n_tombstoned: outcome.0,
n_not_found: outcome.1,
})
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use tempfile::TempDir;
use uuid::Uuid;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
supertable::{
Supertable,
wal::state_doc::{
Lease, OpKind, RowId, SCHEMA_VERSION, SupertableHandleId, TombstoneEntry,
TombstoneOutcome,
},
},
test_helpers::default_supertable_options,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn scan_empty_supertable_reports_zero_work() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let report = scan_and_recover(&st, SupertableHandleId(0xCAFE), Duration::from_secs(30))
.await
.expect("sweep");
assert_eq!(report, RecoveryReport::default());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_drives_intent_delete_wal_to_complete() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let wal_doc = WalStateDoc {
wal_id: WalId(1234),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "test".into(),
target_ids: vec![RowId(99)],
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(99),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
}],
};
ws.create(&wal_doc).await.expect("seed");
let owner = SupertableHandleId(0xC0DE);
let report = scan_and_recover(&st, owner, Duration::from_secs(30))
.await
.expect("sweep");
assert_eq!(report.n_scanned, 1);
assert_eq!(report.n_tombstone_only_completed, 1);
assert_eq!(report.total_targets_not_found, 1);
assert_eq!(report.total_targets_tombstoned, 0);
let (doc_after, _etag) = ws.read(WalId(1234)).await.expect("read");
assert_eq!(doc_after.state, WalState::Complete);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_skips_complete_walls_without_touching_state() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let mut wal_doc = WalStateDoc {
wal_id: WalId(5678),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Complete,
created_at: Utc::now(),
lease: None,
predicate_repr: "test".into(),
target_ids: vec![RowId(1)],
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Tombstoned,
tombstoned_in_superfile: Some(Uuid::from_u128(0xCAFE)),
}],
};
wal_doc.predicate_repr = "noop".into();
let etag_before = ws.create(&wal_doc).await.expect("seed");
let report = scan_and_recover(&st, SupertableHandleId(0xABCD), Duration::from_secs(30))
.await
.expect("sweep");
assert_eq!(report.n_scanned, 1);
assert_eq!(report.n_already_complete, 1);
assert_eq!(report.n_tombstone_only_completed, 0);
let (_, etag_after) = ws.read(WalId(5678)).await.expect("read");
assert_eq!(etag_after, etag_before);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_skips_wal_held_by_live_peer() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let ws = WalStore::new(Arc::clone(&storage));
let now = Utc::now();
let wal_doc = WalStateDoc {
wal_id: WalId(9999),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Intent,
created_at: now,
lease: Some(Lease {
owner: SupertableHandleId(0xDEAD_DEAD),
acquired_at: now,
expires_at: now + chrono::Duration::seconds(120),
}),
predicate_repr: "held".into(),
target_ids: vec![RowId(1)],
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
}],
};
ws.create(&wal_doc).await.expect("seed");
let report = scan_and_recover(&st, SupertableHandleId(0xBEEF), Duration::from_secs(30))
.await
.expect("sweep");
assert_eq!(report.n_scanned, 1);
assert_eq!(report.n_held_by_peer, 1);
assert_eq!(report.n_tombstone_only_completed, 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sweep_against_in_memory_supertable_errors_cleanly() {
let st = Supertable::create(default_supertable_options()).expect("create");
let err = scan_and_recover(&st, SupertableHandleId(0xC0DE), Duration::from_secs(30))
.await
.expect_err("must error");
assert!(matches!(err, RecoveryError::NoStorageAttached));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn run_recovery_sweep_once_hatch_reports_zero_work_on_empty_storage() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let st =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
let report = st.run_recovery_sweep_once().await.expect("sweep");
assert_eq!(report.n_scanned, 0);
assert_eq!(report.n_already_complete, 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn run_recovery_sweep_once_hatch_errors_on_in_memory_supertable() {
let st = Supertable::create(default_supertable_options()).expect("create");
let err = st.run_recovery_sweep_once().await.expect_err("must error");
assert!(matches!(err, RecoveryError::NoStorageAttached));
}
}