use std::collections::BTreeMap;
use ahash::HashMapExt;
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr};
use tycho_storage::kv::StoredValue;
use tycho_types::models::{BlockId, IntAddr, ShardIdent};
use tycho_util::{FastHashMap, FastHashSet};
use weedb::{OwnedRawIterator, OwnedSnapshot};
use super::INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY;
use super::db::InternalQueueDB;
use super::iterator::InternalQueueMessagesIter;
use super::models::{
CommitPointerKey, CommitPointerValue, DiffInfoKey, DiffTailKey, ShardsInternalMessagesKey,
StatKey,
};
pub type AccountStatistics = FastHashMap<IntAddr, u64>;
pub type SeparatedStatisticsByPartitions =
FastHashMap<QueuePartitionIdx, BTreeMap<QueueKey, AccountStatistics>>;
pub struct InternalQueueSnapshot {
pub db: InternalQueueDB,
pub snapshot: OwnedSnapshot,
}
impl InternalQueueSnapshot {
pub fn get_last_applied_diff_seqno(
&self,
shard_ident: &ShardIdent,
) -> anyhow::Result<Option<u32>> {
let table = &self.db.internal_message_diff_info;
let mut read_config = table.new_read_config();
read_config.set_snapshot(&self.snapshot);
let from = DiffInfoKey::new(*shard_ident, 0);
read_config.set_iterate_lower_bound(from.to_vec().to_vec());
let to = DiffInfoKey::new(*shard_ident, u32::MAX);
read_config.set_iterate_upper_bound(to.to_vec().to_vec());
let cf = table.cf();
let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);
let key = DiffInfoKey::new(*shard_ident, u32::MAX);
iter.seek_for_prev(key.to_vec().as_slice());
let value = match iter.key() {
Some(value) => DiffInfoKey::from_slice(value).seqno,
None => return Ok(None),
};
Ok(Some(value))
}
pub fn iter_messages(
&self,
from: ShardsInternalMessagesKey,
to: ShardsInternalMessagesKey,
) -> InternalQueueMessagesIter {
let table = &self.db.shard_internal_messages;
let mut read_config = table.new_read_config();
read_config.set_snapshot(&self.snapshot);
read_config.set_iterate_lower_bound(from.to_vec().to_vec());
read_config.set_iterate_upper_bound(to.to_vec().to_vec());
let db = self.db.rocksdb();
let iter = db.raw_iterator_cf_opt(&table.cf(), read_config);
InternalQueueMessagesIter {
inner: unsafe { OwnedRawIterator::new(db.clone(), iter) },
first: true,
}
}
pub fn calc_diffs_tail(&self, from: &DiffTailKey) -> u32 {
let table = &self.db.internal_message_diffs_tail;
let mut read_config = table.new_read_config();
read_config.set_snapshot(&self.snapshot);
let from_bytes = from.to_vec();
read_config.set_iterate_lower_bound(from_bytes.as_slice());
let to = DiffTailKey {
shard_ident: from.shard_ident,
max_message: QueueKey::MAX,
};
read_config.set_iterate_upper_bound(to.to_vec().to_vec());
let cf = table.cf();
let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);
iter.seek(&from_bytes);
let mut count = 0;
while let Some((_, _)) = iter.item() {
count += 1;
iter.next();
}
count
}
pub fn get_diff_info(&self, key: &DiffInfoKey) -> anyhow::Result<Option<Vec<u8>>> {
let table = &self.db.internal_message_diff_info;
let mut read_config = table.new_read_config();
read_config.set_snapshot(&self.snapshot);
let cf = table.cf();
let data = self.db.rocksdb().get_cf(&cf, key.to_vec().as_slice())?;
Ok(data)
}
pub fn collect_stats_in_range(
&self,
shard_ident: &ShardIdent,
partition: QueuePartitionIdx,
from: &QueueKey,
to: &QueueKey,
result: &mut FastHashMap<IntAddr, u64>,
) -> anyhow::Result<()> {
let mut read_config = self.db.internal_message_stats.new_read_config();
read_config.set_snapshot(&self.snapshot);
let from = StatKey {
shard_ident: *shard_ident,
partition,
max_message: *from,
dest: RouterAddr::MIN,
};
let to = StatKey {
shard_ident: *shard_ident,
partition,
max_message: *to,
dest: RouterAddr::MAX,
};
let from_bytes = from.to_vec();
let to_bytes = to.to_vec();
read_config.set_iterate_lower_bound(&from_bytes[..StatKey::PREFIX_SIZE]);
read_config.set_iterate_upper_bound(&to_bytes[..StatKey::PREFIX_SIZE]);
let cf = self.db.internal_message_stats.cf();
let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);
iter.seek_to_first();
loop {
let (key, value) = match iter.item() {
Some(item) => item,
None => match iter.status() {
Ok(()) => break,
Err(e) => return Err(e.into()),
},
};
let current_key = StatKey::from_slice(key);
let count = u64::from_le_bytes(value.try_into().unwrap());
let entry = result.entry(current_key.dest.to_int_addr()).or_insert(0);
*entry += count;
iter.next();
}
Ok(())
}
pub fn collect_separated_stats_in_range_for_partitions(
&self,
shard_ident: &ShardIdent,
partitions: &FastHashSet<QueuePartitionIdx>,
from: &QueueKey,
to: &QueueKey,
) -> anyhow::Result<SeparatedStatisticsByPartitions> {
let mut result = SeparatedStatisticsByPartitions::new();
for &partition in partitions {
let mut read_config = self.db.internal_message_stats.new_read_config();
read_config.set_snapshot(&self.snapshot);
let from_key = StatKey {
shard_ident: *shard_ident,
partition,
max_message: *from,
dest: RouterAddr::MIN,
};
let to_key = StatKey {
shard_ident: *shard_ident,
partition,
max_message: *to,
dest: RouterAddr::MAX,
};
let from_bytes = from_key.to_vec();
let to_bytes = to_key.to_vec();
read_config.set_iterate_lower_bound(&from_bytes[..StatKey::PREFIX_SIZE]);
read_config.set_iterate_upper_bound(&to_bytes[..StatKey::PREFIX_SIZE]);
let cf = self.db.internal_message_stats.cf();
let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);
iter.seek_to_first();
let partition_stats = result.entry(partition).or_default();
loop {
let (key_bytes, value_bytes) = match iter.item() {
Some(item) => item,
None => {
match iter.status() {
Ok(()) => break,
Err(e) => return Err(e.into()),
}
}
};
let current_key = StatKey::from_slice(key_bytes);
let count = u64::from_le_bytes(value_bytes.try_into().unwrap());
partition_stats
.entry(current_key.max_message)
.or_default()
.entry(current_key.dest.to_int_addr())
.and_modify(|c| *c += count)
.or_insert(count);
iter.next();
}
}
Ok(result)
}
pub fn read_commit_pointers(
&self,
) -> anyhow::Result<FastHashMap<ShardIdent, CommitPointerValue>> {
let mut result = FastHashMap::default();
let commit_pointers_cf = self.db.internal_message_commit_pointer.cf();
let mut read_config = self.db.internal_message_commit_pointer.new_read_config();
read_config.set_snapshot(&self.snapshot);
let mut iter = self
.db
.rocksdb()
.raw_iterator_cf_opt(&commit_pointers_cf, read_config);
iter.seek_to_first();
while iter.valid() {
let (raw_key, raw_value) = match iter.item() {
Some(item) => item,
None => {
break;
}
};
let cp_key = CommitPointerKey::from_slice(raw_key);
let cp_val = CommitPointerValue::from_slice(raw_value);
result.insert(cp_key.shard_ident, cp_val);
iter.next();
}
iter.status()?;
Ok(result)
}
pub fn get_last_committed_mc_block_id(&self) -> anyhow::Result<Option<BlockId>> {
let cf = self.db.internal_message_var.cf();
let res = self
.db
.rocksdb()
.get_cf(&cf, INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY)?
.map(|bytes| BlockId::from_slice(&bytes));
Ok(res)
}
}