use crate::{
backoff::{Backoff, DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
pd::PdClient,
region::RegionVerId,
request::Plan,
timestamp::TimestampExt,
transaction::requests,
Error, Result,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
pub async fn resolve_locks(
locks: Vec<kvrpcpb::LockInfo>,
pd_client: Arc<impl PdClient>,
) -> Result<bool> {
let ts = pd_client.clone().get_timestamp().await?;
let mut has_live_locks = false;
let expired_locks = locks.into_iter().filter(|lock| {
let expired = ts.physical - Timestamp::from_version(lock.lock_version).physical
>= lock.lock_ttl as i64;
if !expired {
has_live_locks = true;
}
expired
});
let mut commit_versions: HashMap<u64, u64> = HashMap::new();
let mut clean_regions: HashMap<u64, HashSet<RegionVerId>> = HashMap::new();
for lock in expired_locks {
let region_ver_id = pd_client
.region_for_key(&lock.primary_lock.clone().into())
.await?
.ver_id();
if clean_regions
.get(&lock.lock_version)
.map(|regions| regions.contains(®ion_ver_id))
.unwrap_or(false)
{
continue;
}
let commit_version = match commit_versions.get(&lock.lock_version) {
Some(&commit_version) => commit_version,
None => {
let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.post_process_default()
.plan();
let commit_version = plan.execute().await?;
commit_versions.insert(lock.lock_version, commit_version);
commit_version
}
};
let cleaned_region = resolve_lock_with_retry(
&lock.key,
lock.lock_version,
commit_version,
pd_client.clone(),
)
.await?;
clean_regions
.entry(lock.lock_version)
.or_insert_with(HashSet::new)
.insert(cleaned_region);
}
Ok(!has_live_locks)
}
async fn resolve_lock_with_retry(
#[allow(clippy::ptr_arg)] key: &Vec<u8>,
start_version: u64,
commit_version: u64,
pd_client: Arc<impl PdClient>,
) -> Result<RegionVerId> {
let mut error = None;
for _ in 0..RESOLVE_LOCK_RETRY_LIMIT {
let store = pd_client.clone().store_for_key(key.into()).await?;
let ver_id = store.region.ver_id();
let request = requests::new_resolve_lock_request(start_version, commit_version);
let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
.single_region_with_store(store)
.await?
.resolve_lock(Backoff::no_backoff())
.retry_region(Backoff::no_backoff())
.extract_error()
.plan();
match plan.execute().await {
Ok(_) => {
return Ok(ver_id);
}
Err(e @ Error::RegionError(_)) => {
error = Some(e);
continue;
}
Err(e) => return Err(e),
}
}
Err(error.expect("no error is impossible"))
}
pub trait HasLocks {
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
Vec::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::{MockKvClient, MockPdClient};
use futures::executor;
use std::any::Any;
use tikv_client_proto::errorpb;
#[test]
fn test_resolve_lock_with_retry() {
fail::cfg("region-error", "9*return").unwrap();
let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
|_: &dyn Any| {
fail::fail_point!("region-error", |_| {
let mut resp = kvrpcpb::ResolveLockResponse::default();
resp.region_error = Some(errorpb::Error::default());
Ok(Box::new(resp) as Box<dyn Any>)
});
Ok(Box::new(kvrpcpb::ResolveLockResponse::default()) as Box<dyn Any>)
},
)));
let key = vec![1];
let region1 = MockPdClient::region1();
let resolved_region =
executor::block_on(resolve_lock_with_retry(&key, 1, 2, client.clone())).unwrap();
assert_eq!(region1.ver_id(), resolved_region);
fail::cfg("region-error", "10*return").unwrap();
let key = vec![100];
executor::block_on(resolve_lock_with_retry(&key, 3, 4, client))
.expect_err("should return error");
}
}