use std::{
collections::HashMap,
fmt::Debug,
ops::Range,
sync::{atomic::Ordering, Arc},
};
use clap::ValueEnum;
use foyer_common::{
code::{HashBuilder, StorageKey, StorageValue},
metrics::Metrics,
};
use futures::future::try_join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use super::{
generic::GenericLargeStorageConfig,
indexer::{EntryAddress, Indexer},
};
use crate::{
device::RegionId,
error::{Error, Result},
large::{
indexer::HashedEntryAddress,
scanner::{EntryInfo, RegionScanner},
serde::{AtomicSequence, Sequence},
tombstone::Tombstone,
},
region::{Region, RegionManager},
runtime::Runtime,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ValueEnum)]
pub enum RecoverMode {
None,
Quiet,
Strict,
}
#[derive(Debug)]
pub struct RecoverRunner;
impl RecoverRunner {
#[expect(clippy::too_many_arguments)]
pub async fn run<K, V, S>(
config: &GenericLargeStorageConfig<K, V, S>,
regions: Range<RegionId>,
sequence: &AtomicSequence,
indexer: &Indexer,
region_manager: &RegionManager,
tombstones: &[Tombstone],
metrics: Arc<Metrics>,
runtime: Runtime,
) -> Result<()>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
let semaphore = Arc::new(Semaphore::new(config.recover_concurrency));
let mode = config.recover_mode;
let handles = regions.map(|id| {
let semaphore = semaphore.clone();
let region = region_manager.region(id).clone();
let metrics = metrics.clone();
runtime.user().spawn(async move {
let permit = semaphore.acquire().await;
let res = RegionRecoverRunner::run(mode, region, metrics).await;
drop(permit);
res
})
});
let total = try_join_all(handles).await.unwrap();
let (total, errs): (Vec<_>, Vec<_>) = total.into_iter().partition(|res| res.is_ok());
if !errs.is_empty() {
let errs = errs.into_iter().map(|r| r.unwrap_err()).collect_vec();
return Err(Error::multiple(errs));
}
#[derive(Debug)]
enum EntryAddressOrTombstone {
EntryAddress(EntryAddress),
Tombstone,
}
let mut latest_sequence = 0;
let mut indices: HashMap<u64, Vec<(Sequence, EntryAddressOrTombstone)>> = HashMap::new();
let mut clean_regions = vec![];
let mut evictable_regions = vec![];
for (region, infos) in total.into_iter().map(|r| r.unwrap()).enumerate() {
let region = region as RegionId;
if infos.is_empty() {
clean_regions.push(region);
} else {
evictable_regions.push(region);
}
for EntryInfo { hash, sequence, addr } in infos {
latest_sequence = latest_sequence.max(sequence);
indices
.entry(hash)
.or_default()
.push((sequence, EntryAddressOrTombstone::EntryAddress(addr)));
}
}
tombstones.iter().for_each(|tombstone| {
latest_sequence = latest_sequence.max(tombstone.sequence);
indices
.entry(tombstone.hash)
.or_default()
.push((tombstone.sequence, EntryAddressOrTombstone::Tombstone))
});
let indices = indices
.into_iter()
.filter_map(|(hash, mut versions)| {
versions.sort_by_key(|(sequence, _)| *sequence);
tracing::trace!("[recover runner]: hash {hash} has versions: {versions:?}");
match versions.pop() {
None => None,
Some((_, EntryAddressOrTombstone::Tombstone)) => None,
Some((_, EntryAddressOrTombstone::EntryAddress(address))) => {
Some(HashedEntryAddress { hash, address })
}
}
})
.collect_vec();
let permits = config.clean_region_threshold.saturating_sub(clean_regions.len());
let countdown = clean_regions.len().saturating_sub(config.clean_region_threshold);
tracing::info!(
"Recovers {e} regions with data, {c} clean regions, {t} total entries with max sequence as {s}, initial reclaim permits is {p}.",
e = evictable_regions.len(),
c = clean_regions.len(),
t = indices.len(),
s = latest_sequence,
p = permits,
);
indexer.insert_batch(indices);
sequence.store(latest_sequence + 1, Ordering::Release);
for region in clean_regions {
region_manager.mark_clean(region).await;
}
for region in evictable_regions {
region_manager.mark_evictable(region);
}
region_manager.reclaim_semaphore().add_permits(permits);
region_manager.reclaim_semaphore_countdown().reset(countdown);
Ok(())
}
}
#[derive(Debug)]
struct RegionRecoverRunner;
impl RegionRecoverRunner {
async fn run(mode: RecoverMode, region: Region, metrics: Arc<Metrics>) -> Result<Vec<EntryInfo>> {
if mode == RecoverMode::None {
return Ok(vec![]);
}
let mut infos = vec![];
let id = region.id();
let mut iter = RegionScanner::new(region, metrics);
loop {
let r = iter.next().await;
match r {
Err(e) => {
if mode == RecoverMode::Strict {
return Err(e);
} else {
tracing::warn!("error raised when recovering region {id}, skip further recovery for {id}.");
break;
}
}
Ok(Some(info)) if info.sequence < infos.last().map(|last: &EntryInfo| last.sequence).unwrap_or(0) => {
break
}
Ok(Some(info)) => infos.push(info),
Ok(None) => break,
}
}
Ok(infos)
}
}