use std::collections::BTreeMap;
use tl_proto::{TlError, TlPacket, TlRead, TlResult, TlWrite};
use tycho_block_util::queue::{
QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions, processed_to_map,
router_partitions_map,
};
use tycho_block_util::tl;
use tycho_storage::kv::{StoredValue, StoredValueBuffer};
use tycho_types::cell::HashBytes;
use tycho_types::models::ShardIdent;
use tycho_util::FastHashMap;
pub struct InternalQueueMessage<'a> {
pub key: ShardsInternalMessagesKey,
pub workchain: i8,
pub prefix: u64,
pub message_boc: &'a [u8],
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ShardsInternalMessagesKey {
pub partition: QueuePartitionIdx,
pub shard_ident: ShardIdent,
pub internal_message_key: QueueKey,
}
impl ShardsInternalMessagesKey {
pub fn new(
partition: QueuePartitionIdx,
shard_ident: ShardIdent,
internal_message_key: QueueKey,
) -> Self {
Self {
partition,
shard_ident,
internal_message_key,
}
}
}
impl From<&[u8]> for ShardsInternalMessagesKey {
fn from(bytes: &[u8]) -> Self {
let mut reader = bytes;
Self::deserialize(&mut reader)
}
}
impl StoredValue for ShardsInternalMessagesKey {
const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT;
type OnStackSlice = [u8; Self::SIZE_HINT];
fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
self.partition.serialize(buffer);
self.shard_ident.serialize(buffer);
self.internal_message_key.serialize(buffer);
}
fn deserialize(reader: &mut &[u8]) -> Self {
if reader.len() < Self::SIZE_HINT {
panic!("Insufficient data for deserialization")
}
let partition = QueuePartitionIdx::deserialize(reader);
let shard_ident = ShardIdent::deserialize(reader);
let internal_message_key = QueueKey::deserialize(reader);
Self {
partition,
shard_ident,
internal_message_key,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct StatKey {
pub shard_ident: ShardIdent,
pub partition: QueuePartitionIdx,
pub max_message: QueueKey,
pub dest: RouterAddr,
}
impl StatKey {
pub(crate) const PREFIX_SIZE: usize =
ShardIdent::SIZE_HINT + QueuePartitionIdx::SIZE_HINT + QueueKey::SIZE_HINT;
pub fn new(
shard_ident: ShardIdent,
partition: QueuePartitionIdx,
max_message: QueueKey,
dest: RouterAddr,
) -> Self {
Self {
shard_ident,
partition,
max_message,
dest,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct DiffTailKey {
pub shard_ident: ShardIdent,
pub max_message: QueueKey,
}
impl DiffTailKey {
pub fn new(shard_ident: ShardIdent, max_message: QueueKey) -> Self {
Self {
shard_ident,
max_message,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct DiffInfoKey {
pub shard_ident: ShardIdent,
pub seqno: u32,
}
impl DiffInfoKey {
pub fn new(shard_ident: ShardIdent, seqno: u32) -> Self {
Self { shard_ident, seqno }
}
}
impl StoredValue for StatKey {
const SIZE_HINT: usize = ShardIdent::SIZE_HINT
+ QueuePartitionIdx::SIZE_HINT
+ QueueKey::SIZE_HINT
+ RouterAddr::SIZE_HINT;
type OnStackSlice = [u8; Self::SIZE_HINT];
fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
self.shard_ident.serialize(buffer);
self.partition.serialize(buffer);
self.max_message.serialize(buffer);
self.dest.serialize(buffer);
}
fn deserialize(reader: &mut &[u8]) -> Self {
if reader.len() < Self::SIZE_HINT {
panic!("Insufficient data for deserialization");
}
let shard_ident = ShardIdent::deserialize(reader);
let partition = QueuePartitionIdx::deserialize(reader);
let max_message = QueueKey::deserialize(reader);
let dest = RouterAddr::deserialize(reader);
Self {
shard_ident,
partition,
max_message,
dest,
}
}
}
impl StoredValue for DiffTailKey {
const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT;
type OnStackSlice = [u8; Self::SIZE_HINT];
fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
self.shard_ident.serialize(buffer);
self.max_message.serialize(buffer);
}
fn deserialize(reader: &mut &[u8]) -> Self
where
Self: Sized,
{
if reader.len() < Self::SIZE_HINT {
panic!("Insufficient data for deserialization");
}
let shard_ident = ShardIdent::deserialize(reader);
let max_message = QueueKey::deserialize(reader);
Self {
shard_ident,
max_message,
}
}
}
impl StoredValue for DiffInfoKey {
const SIZE_HINT: usize = ShardIdent::SIZE_HINT + 4;
type OnStackSlice = [u8; Self::SIZE_HINT];
fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
self.shard_ident.serialize(buffer);
buffer.write_raw_slice(&self.seqno.to_be_bytes());
}
fn deserialize(reader: &mut &[u8]) -> Self
where
Self: Sized,
{
if reader.len() < Self::SIZE_HINT {
panic!("Insufficient data for deserialization");
}
let shard_ident = ShardIdent::deserialize(reader);
let mut seqno_bytes = [0u8; 4];
seqno_bytes.copy_from_slice(&reader[..4]);
let seqno = u32::from_be_bytes(seqno_bytes);
*reader = &reader[4..];
Self { shard_ident, seqno }
}
}
#[derive(Debug)]
pub struct QueueRange {
pub shard_ident: ShardIdent,
pub partition: QueuePartitionIdx,
pub from: QueueKey,
pub to: QueueKey,
}
#[test]
fn diff_info_key_serialization() {
let key = DiffInfoKey::new(ShardIdent::MASTERCHAIN, 10);
let mut buffer = Vec::with_capacity(DiffInfoKey::SIZE_HINT);
key.serialize(&mut buffer);
let key2 = DiffInfoKey::deserialize(&mut buffer.as_slice());
assert_eq!(key, key2);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DiffInfo {
pub min_message: QueueKey,
pub max_message: QueueKey,
pub shards_messages_count: FastHashMap<ShardIdent, u64>,
pub hash: HashBytes,
pub processed_to: BTreeMap<ShardIdent, QueueKey>,
pub router_partitions_src: RouterPartitions,
pub router_partitions_dst: RouterPartitions,
pub seqno: u32,
}
impl DiffInfo {
pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
self.shards_messages_count
.get(shard_ident)
.copied()
.unwrap_or_default()
}
}
impl TlWrite for DiffInfo {
type Repr = tl_proto::Boxed;
fn max_size_hint(&self) -> usize {
QueueKey::SIZE_HINT
+ QueueKey::SIZE_HINT
+ QueueKey::SIZE_HINT
+ 4
+ self.shards_messages_count.len() * (tl::shard_ident::SIZE_HINT + 8)
+ tl::hash_bytes::SIZE_HINT
}
fn write_to<P: TlPacket>(&self, packet: &mut P) {
self.min_message.write_to(packet);
self.max_message.write_to(packet);
packet.write_u32(self.shards_messages_count.len() as u32);
for (shard_ident, count) in &self.shards_messages_count {
tl::shard_ident::write(shard_ident, packet);
packet.write_u64(*count);
}
tl::hash_bytes::write(&self.hash, packet);
processed_to_map::write(&self.processed_to, packet);
router_partitions_map::write(&self.router_partitions_src, packet);
router_partitions_map::write(&self.router_partitions_dst, packet);
packet.write_u32(self.seqno);
}
}
impl<'tl> TlRead<'tl> for DiffInfo {
type Repr = tl_proto::Boxed;
fn read_from(data: &mut &'tl [u8]) -> TlResult<Self> {
let min_message = QueueKey::read_from(data)?;
let max_message = QueueKey::read_from(data)?;
let len = u32::read_from(data)? as usize;
if len > 10_000_000 {
return Err(TlError::InvalidData);
}
let mut shards_messages_count =
FastHashMap::with_capacity_and_hasher(len, Default::default());
for _ in 0..len {
let shard_ident = tl::shard_ident::read(data)?;
let count = u64::read_from(data)?;
shards_messages_count.insert(shard_ident, count);
}
let hash = tl::hash_bytes::read(data)?;
let processed_to = processed_to_map::read(data)?;
let router_partitions_src = router_partitions_map::read(data)?;
let router_partitions_dst = router_partitions_map::read(data)?;
let seqno = u32::read_from(data)?;
Ok(DiffInfo {
min_message,
max_message,
shards_messages_count,
hash,
processed_to,
router_partitions_src,
router_partitions_dst,
seqno,
})
}
}
#[derive(Debug, Clone)]
pub struct CommitPointerKey {
pub shard_ident: ShardIdent,
}
#[derive(Debug, Clone, Default)]
pub struct CommitPointerValue {
pub queue_key: QueueKey,
pub seqno: u32,
}
impl StoredValue for CommitPointerKey {
const SIZE_HINT: usize = ShardIdent::SIZE_HINT;
type OnStackSlice = [u8; Self::SIZE_HINT];
fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
self.shard_ident.serialize(buffer);
}
fn deserialize(reader: &mut &[u8]) -> Self {
if reader.len() < Self::SIZE_HINT {
panic!("Insufficient data for deserialization");
}
let shard_ident = ShardIdent::deserialize(reader);
Self { shard_ident }
}
}
impl StoredValue for CommitPointerValue {
const SIZE_HINT: usize = QueueKey::SIZE_HINT + 4;
type OnStackSlice = [u8; Self::SIZE_HINT];
fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
self.queue_key.serialize(buffer);
buffer.write_raw_slice(&self.seqno.to_be_bytes());
}
fn deserialize(reader: &mut &[u8]) -> Self {
if reader.len() < Self::SIZE_HINT {
panic!("Insufficient data for deserialization");
}
let queue_key = QueueKey::deserialize(reader);
let mut seqno_bytes = [0u8; 4];
seqno_bytes.copy_from_slice(&reader[..4]);
let seqno = u32::from_be_bytes(seqno_bytes);
Self { queue_key, seqno }
}
}