use crate::cluster::node;
use crate::cluster::partition::Partition;
use crate::cluster::Cluster;
use crate::errors::{Error, Result};
use crate::policy::Replica;
use crate::policy::StreamPolicy;
use crate::query::NodePartitions;
use crate::query::PartitionFilter;
use crate::query::PartitionStatus;
use crate::Key;
use crate::Node;
use aerospike_rt::{
time::{Duration, Instant},
Mutex,
};
use std::cmp::max;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub struct PartitionTracker {
partitions_capacity: usize,
partition_begin: usize,
node_capacity: usize,
node_filter: Option<Arc<Node>>,
partition_filter: Option<Arc<Mutex<PartitionFilter>>>,
#[allow(dead_code)]
replica: Replica,
node_partitions_list: Vec<Arc<Mutex<NodePartitions>>>,
record_count: AtomicUsize,
max_records: u64,
sleep_between_retries: Option<Duration>,
socket_timeout: u32,
total_timeout: u32,
iteration: AtomicUsize, deadline: Option<Instant>,
}
impl PartitionTracker {
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn new(
policy: impl StreamPolicy,
partition_filter: Arc<Mutex<PartitionFilter>>,
nodes: Vec<Arc<Node>>,
) -> Result<Self> {
let mut pt = {
let mut partition_filter = partition_filter.lock().await;
if partition_filter.begin >= node::PARTITIONS {
return Err(Error::InvalidArgument(format!(
"Invalid partition begin {} . Valid range: 0-{}",
partition_filter.begin,
node::PARTITIONS - 1
)));
}
if partition_filter.count == 0 {
return Err(Error::InvalidArgument(format!(
"Invalid partition count {}",
partition_filter.count
)));
}
if (partition_filter.begin + partition_filter.count) > node::PARTITIONS {
return Err(Error::InvalidArgument(format!(
"Invalid partition range ({},{})",
partition_filter.begin,
partition_filter.begin + partition_filter.count
)));
}
let node_capacity = max(1, nodes.len());
let pt = PartitionTracker {
partitions_capacity: partition_filter.count,
partition_begin: partition_filter.begin,
node_capacity,
node_filter: None,
partition_filter: None,
replica: policy.replica(),
node_partitions_list: vec![],
record_count: AtomicUsize::new(0),
max_records: policy.max_records().unwrap_or(0),
sleep_between_retries: None,
socket_timeout: 0,
total_timeout: 0,
iteration: AtomicUsize::new(1),
deadline: None,
};
if partition_filter.partitions.is_none() {
let begin = partition_filter.begin;
let count = partition_filter.count;
let digest = partition_filter.digest;
partition_filter
.set_partitions(PartitionTracker::init_partitions(begin, count, digest));
partition_filter.retry.store(true, Ordering::Relaxed);
} else {
if policy.max_records().is_none() {
partition_filter.retry.store(true, Ordering::Relaxed);
}
partition_filter.reset_partition_status().await;
}
pt
};
pt.partition_filter = Some(partition_filter);
pt.init(policy);
Ok(pt)
}
pub(crate) fn node_partitions_list(&self) -> &[Arc<Mutex<NodePartitions>>] {
&self.node_partitions_list
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn assign_partitions_to_nodes(
&mut self,
cluster: Arc<Cluster>,
namespace: &str,
) -> Result<()> {
let mut list = Vec::<Arc<Mutex<NodePartitions>>>::with_capacity(self.node_capacity);
let retry = self.partition_filter.is_none()
|| self
.partition_filter
.as_ref()
.unwrap()
.lock()
.await
.retry
.load(Ordering::Relaxed)
&& (self.iteration() == 1);
let partition_filter = self.partition_filter.as_mut().unwrap().lock().await;
let partitions = partition_filter.partitions.as_ref().unwrap();
for part in partitions {
let (part_retry, part_id) = {
let part = part.lock().await;
(part.retry, part.id)
};
if retry || part_retry {
let partition = Partition::new(namespace, part_id as usize);
let last_tried = part.lock().await.node.as_ref().map(Arc::clone);
let node = cluster.get_node(&partition, self.replica, last_tried)?;
if let Some(node_filter) = self.node_filter.as_ref() {
if node_filter.name() != node.name() {
continue;
}
}
let np = Self::find_node(&list, node.clone()).await;
if let Some(np) = np {
let mut np = np.lock().await;
np.add_partition(part.clone()).await;
} else {
let mut np = NodePartitions::new(node.clone(), self.partitions_capacity);
np.add_partition(part.clone()).await;
list.push(Arc::new(Mutex::new(np)));
}
}
}
let node_size = list.len();
if node_size == 0 {
return Err(Error::ClientError("No nodes were assigned".into()));
}
partition_filter.retry.store(true, Ordering::Relaxed);
self.record_count.store(0, Ordering::Relaxed);
if self.max_records > 0 {
if self.max_records >= node_size as u64 {
let max = self.max_records / node_size as u64;
let rem = self.max_records - (max * node_size as u64);
for (i, np) in list.iter().enumerate() {
let mut np = np.lock().await;
if (i as u64) < rem {
np.record_max = max + 1;
} else {
np.record_max = max;
}
}
} else {
for np in &list {
let mut np = np.lock().await;
np.record_max = 1;
}
self.record_count.store(0, Ordering::Relaxed);
}
}
self.node_partitions_list = list;
Ok(())
}
fn init(&mut self, policy: impl StreamPolicy) {
self.sleep_between_retries = policy.sleep_between_retries();
self.socket_timeout = policy.socket_timeout();
self.total_timeout = policy.total_timeout();
self.deadline = policy.deadline();
}
pub(crate) async fn find_node(
list: &Vec<Arc<Mutex<NodePartitions>>>,
node: Arc<Node>,
) -> Option<Arc<Mutex<NodePartitions>>> {
for node_partition in list {
if node_partition.lock().await.node == node {
return Some(node_partition.clone());
}
}
None
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn partition_unavailable(
&self,
node_partitions: &mut NodePartitions,
partition_id: u16,
) {
let pf = self.partition_filter.as_ref().unwrap().lock().await;
let partitions = pf.partitions.as_ref().unwrap();
if let Some(ps) = partitions.get(partition_id as usize - self.partition_begin) {
let mut ps = ps.lock().await;
ps.retry = true;
if let Some(ref mut seq) = ps.sequence {
*seq += 1;
}
}
node_partitions.parts_unavailable += 1;
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn set_digest(
&self,
node_partitions: &mut NodePartitions,
key: &Key,
) -> Result<()> {
let pf = self.partition_filter.as_ref().unwrap().lock().await;
let partitions = pf.partitions.as_ref();
let partition_id = key.partition_id();
if let Some(partitions) = partitions {
if let Some(ps) = partitions.get(partition_id - self.partition_begin) {
let mut ps = ps.lock().await;
ps.digest = Some(key.digest);
} else {
return Err(Error::ClientError(format!(
"Partition mismatch: key.partition_id: {}, partition_begin: {}",
partition_id, self.partition_begin
)));
}
}
node_partitions.record_count += 1;
Ok(())
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn set_last(
&self,
node_partitions: &mut NodePartitions,
key: &Key,
bval: Option<u64>,
) -> Result<()> {
let partition_id = key.partition_id();
if (partition_id as i64) - (self.partition_begin as i64) < 0 {
return Err(Error::ClientError(format!(
"Partition mismatch: key.partition_id: {}, partition_begin: {}",
partition_id, self.partition_begin
)));
}
let pf = self.partition_filter.as_ref().unwrap().lock().await;
let partitions = pf.partitions.as_ref();
if let Some(partitions) = partitions {
if let Some(ps) = partitions.get(partition_id - self.partition_begin) {
let mut ps = ps.lock().await;
ps.digest = Some(key.digest);
if bval.is_some() {
ps.bval = bval;
}
}
}
node_partitions.record_count += 1;
Ok(())
}
pub(crate) fn allow_record(&self, np: &mut NodePartitions) -> bool {
if self.max_records == 0 {
return true;
}
let record_count = self.record_count.fetch_add(1, Ordering::SeqCst) + 1;
if self.max_records > 0 && record_count as u64 <= self.max_records {
return true;
}
np.disallowed_count += 1;
false
}
#[allow(clippy::significant_drop_tightening)]
pub(crate) async fn is_complete(
&mut self,
policy: impl StreamPolicy,
timed_out: bool,
) -> Result<bool> {
let mut record_count: u64 = 0;
let mut parts_unavailable = 0;
for np in &self.node_partitions_list {
let np = np.lock().await;
record_count += np.record_count;
parts_unavailable += np.parts_unavailable;
}
if !timed_out && parts_unavailable == 0 {
if self.max_records == 0 {
if let Some(pf) = &self.partition_filter {
let pf = pf.lock().await;
pf.retry.store(false, Ordering::Relaxed);
pf.done.store(true, Ordering::Relaxed);
}
} else if self.iteration() > 1 {
if let Some(pf) = &self.partition_filter {
let pf = pf.lock().await;
pf.retry.store(true, Ordering::Relaxed);
pf.done.store(false, Ordering::Relaxed);
}
} else {
let mut done = true;
for np in &self.node_partitions_list {
let np = np.lock().await;
if np.record_count + np.disallowed_count >= np.record_max {
self.mark_retry(&np).await;
done = false;
}
}
if let Some(pf) = &self.partition_filter {
let pf = pf.lock().await;
pf.retry.store(false, Ordering::Relaxed);
pf.done.store(done, Ordering::Relaxed);
}
}
return Ok(true);
}
if self.max_records > 0 && record_count >= self.max_records {
return Ok(true);
}
if policy.max_retries() > 0 && self.iteration() > policy.max_retries() {
return Err(Error::ClientError(format!(
"Max retries exceeded: {}",
policy.max_retries()
)));
}
if let Some(deadline) = self.deadline {
if Instant::now()
+ self
.sleep_between_retries
.unwrap_or(Duration::from_millis(0))
> deadline
{
return Err(Error::Timeout("Scan/Query timed out".into()));
}
let total_timeout = u64::from(policy.total_timeout());
if deadline < Instant::now() + Duration::from_millis(total_timeout) {
self.total_timeout = (deadline - Instant::now()).as_millis() as u32;
if self.socket_timeout > self.total_timeout {
self.socket_timeout = self.total_timeout;
}
}
}
if self.max_records > 0 {
self.max_records -= record_count;
}
self.iteration.fetch_add(1, Ordering::Relaxed);
Ok(false)
}
pub(crate) async fn mark_retry(&self, node_partitions: &NodePartitions) {
for ps in &node_partitions.parts_full {
let mut ps = ps.lock().await;
ps.retry = true;
}
for ps in &node_partitions.parts_partial {
let mut ps = ps.lock().await;
ps.retry = true;
}
}
pub(crate) async fn partition_error(&self) {
if let Some(ref pf) = self.partition_filter {
let pf = pf.lock().await;
pf.retry.store(true, Ordering::Relaxed);
}
}
pub(crate) fn init_partitions(
partition_begin: usize,
partition_count: usize,
digest: Option<[u8; 20]>,
) -> Vec<Arc<Mutex<PartitionStatus>>> {
let mut parts_all = Vec::<Arc<Mutex<PartitionStatus>>>::with_capacity(partition_count);
for i in 0..partition_count {
let mut part = PartitionStatus::new(partition_begin + i);
if i == 0 && digest.is_some() {
part.set_digest(digest);
}
parts_all.push(Arc::new(Mutex::new(part)));
}
parts_all
}
pub(crate) fn extract_partition_filter(&mut self) -> Option<PartitionFilter> {
self.partition_filter
.take()
.and_then(|pf| Arc::try_unwrap(pf).ok().map(Mutex::into_inner))
}
pub(crate) const fn server_timeout(&self) -> u32 {
if self.total_timeout > 0 {
self.socket_timeout
} else {
0
}
}
pub(crate) fn iteration(&self) -> usize {
self.iteration.load(std::sync::atomic::Ordering::Relaxed)
}
}