tycho_block_util/queue/
proto.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3
4use bytes::Bytes;
5use tl_proto::{TlError, TlRead, TlWrite};
6use tycho_types::models::*;
7use tycho_types::prelude::*;
8
9use crate::tl;
10
11/// Representation of an internal messages queue diff.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct QueueDiff {
14    /// Computed hash of this diff.
15    ///
16    /// NOTE: This field is not serialized and can be [`HashBytes::ZERO`] for serialization.
17    pub hash: HashBytes,
18
19    /// Hash of the TL repr of the previous queue diff.
20    pub prev_hash: HashBytes,
21    /// Shard identifier of the corresponding block
22    pub shard_ident: ShardIdent,
23    /// Seqno of the corresponding block.
24    pub seqno: u32,
25    /// collator boundaries.
26    pub processed_to: BTreeMap<ShardIdent, QueueKey>,
27    /// Min message queue key.
28    pub min_message: QueueKey,
29    /// Max message queue key.
30    pub max_message: QueueKey,
31    /// List of message hashes (sorted ASC).
32    pub messages: Vec<HashBytes>,
33    /// Inbound router partitions.
34    pub router_partitions_src: RouterPartitions,
35    /// Outbound router partitions.
36    pub router_partitions_dst: RouterPartitions,
37}
38
39impl QueueDiff {
40    pub const TL_ID: u32 = tl_proto::id!("block.queueDiff", scheme = "proto.tl");
41
42    /// Recomputes the hash of the diff.
43    ///
44    /// NOTE: Since the hash is not serialized, it is NOT mandatory to call this method
45    /// if it will not be used after this.
46    pub fn recompute_hash(&mut self) {
47        self.hash = Self::compute_hash(&tl_proto::serialize(&*self));
48    }
49
50    /// Computes the hash of the serialized diff.
51    pub fn compute_hash(data: &[u8]) -> HashBytes {
52        Boc::file_hash_blake(data)
53    }
54}
55
56impl TlWrite for QueueDiff {
57    type Repr = tl_proto::Boxed;
58
59    fn max_size_hint(&self) -> usize {
60        4 + tl::hash_bytes::SIZE_HINT
61            + tl::shard_ident::SIZE_HINT
62            + 4
63            + processed_to_map::size_hint(&self.processed_to)
64            + 2 * QueueKey::SIZE_HINT
65            + messages_list::size_hint(&self.messages)
66            + router_partitions_map::size_hint(&self.router_partitions_src)
67            + router_partitions_map::size_hint(&self.router_partitions_dst)
68    }
69
70    fn write_to<P>(&self, packet: &mut P)
71    where
72        P: tl_proto::TlPacket,
73    {
74        packet.write_u32(Self::TL_ID);
75        tl::hash_bytes::write(&self.prev_hash, packet);
76        tl::shard_ident::write(&self.shard_ident, packet);
77        packet.write_u32(self.seqno);
78        processed_to_map::write(&self.processed_to, packet);
79        self.min_message.write_to(packet);
80        self.max_message.write_to(packet);
81        messages_list::write(&self.messages, packet);
82        router_partitions_map::write(&self.router_partitions_src, packet);
83        router_partitions_map::write(&self.router_partitions_dst, packet);
84    }
85}
86
87impl<'tl> TlRead<'tl> for QueueDiff {
88    type Repr = tl_proto::Boxed;
89
90    fn read_from(data: &mut &'tl [u8]) -> tl_proto::TlResult<Self> {
91        let data_before = *data;
92        if u32::read_from(data)? != Self::TL_ID {
93            return Err(tl_proto::TlError::UnknownConstructor);
94        }
95
96        let mut result = Self {
97            hash: HashBytes::ZERO,
98            prev_hash: tl::hash_bytes::read(data)?,
99            shard_ident: tl::shard_ident::read(data)?,
100            seqno: u32::read_from(data)?,
101            processed_to: processed_to_map::read(data)?,
102            min_message: QueueKey::read_from(data)?,
103            max_message: QueueKey::read_from(data)?,
104            messages: messages_list::read(data)?,
105            router_partitions_src: router_partitions_map::read(data)?,
106            router_partitions_dst: router_partitions_map::read(data)?,
107        };
108
109        if result.max_message < result.min_message {
110            return Err(tl_proto::TlError::InvalidData);
111        }
112
113        let result_bytes = data_before.len() - data.len();
114
115        // Compute the hash of the diff
116        result.hash = Self::compute_hash(&data_before[..result_bytes]);
117
118        Ok(result)
119    }
120}
121
122/// Persistent internal messages queue state.
123#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)]
124#[tl(boxed, id = "block.queueState", scheme = "proto.tl")]
125pub struct QueueState {
126    pub header: QueueStateHeader,
127
128    /// Chunks of messages in the same order as messages in `header.queue_diffs.messages`.
129    /// Only the order is guaranteed, but not the chunk sizes.
130    #[tl(with = "state_messages_list")]
131    pub messages: Vec<Bytes>,
132}
133
134/// Persistent internal messages queue state.
135///
136/// A non-owned version of [`QueueState`].
137#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)]
138#[tl(boxed, id = "block.queueState", scheme = "proto.tl")]
139pub struct QueueStateRef<'tl> {
140    pub header: QueueStateHeader,
141
142    /// Chunks of messages in the same order as messages in `header.queue_diffs.messages`.
143    /// Only the order is guaranteed, but not the chunk sizes.
144    #[tl(with = "state_messages_list_ref")]
145    pub messages: Vec<&'tl [u8]>,
146}
147
148/// A header for a persistent internal messages queue state.
149#[derive(Debug, Clone, PartialEq, Eq, TlWrite, TlRead)]
150#[tl(boxed, id = "block.queueStateHeader", scheme = "proto.tl")]
151pub struct QueueStateHeader {
152    #[tl(with = "tl::shard_ident")]
153    pub shard_ident: ShardIdent,
154    pub seqno: u32,
155    #[tl(with = "queue_diffs_list")]
156    pub queue_diffs: Vec<QueueDiff>,
157}
158
159/// Queue key.
160#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)]
161pub struct QueueKey {
162    pub lt: u64,
163    #[tl(with = "tl::hash_bytes")]
164    pub hash: HashBytes,
165}
166
167impl QueueKey {
168    pub const SIZE_HINT: usize = 8 + 32;
169
170    pub const MIN: Self = Self {
171        lt: 0,
172        hash: HashBytes::ZERO,
173    };
174
175    pub const MAX: Self = Self {
176        lt: u64::MAX,
177        hash: HashBytes([0xff; 32]),
178    };
179
180    pub const fn min_for_lt(lt: u64) -> Self {
181        Self {
182            lt,
183            hash: HashBytes::ZERO,
184        }
185    }
186
187    pub const fn max_for_lt(lt: u64) -> Self {
188        Self {
189            lt,
190            hash: HashBytes([0xff; 32]),
191        }
192    }
193
194    // add min step to the key
195    pub fn next_value(&self) -> Self {
196        let mut new_lt = self.lt;
197        let mut new_hash = self.hash;
198
199        if new_hash.0 == [0xff; 32] {
200            // check if lt is already max then do nothing
201            if new_lt == u64::MAX {
202                return Self {
203                    lt: u64::MAX,
204                    hash: HashBytes([0xff; 32]),
205                };
206            } else {
207                new_lt += 1;
208                new_hash = HashBytes::ZERO;
209            }
210        } else {
211            let carry = 1u8;
212            for byte in new_hash.0.iter_mut().rev() {
213                let (res, overflow) = byte.overflowing_add(carry);
214                *byte = res;
215                if !overflow {
216                    break;
217                }
218            }
219        }
220
221        Self {
222            lt: new_lt,
223            hash: new_hash,
224        }
225    }
226
227    #[inline]
228    pub const fn split(self) -> (u64, HashBytes) {
229        (self.lt, self.hash)
230    }
231}
232
233impl From<(u64, HashBytes)> for QueueKey {
234    #[inline]
235    fn from((lt, hash): (u64, HashBytes)) -> Self {
236        Self { lt, hash }
237    }
238}
239
240impl From<QueueKey> for (u64, HashBytes) {
241    #[inline]
242    fn from(key: QueueKey) -> Self {
243        key.split()
244    }
245}
246
247impl std::fmt::Debug for QueueKey {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        std::fmt::Display::fmt(self, f)
250    }
251}
252
253impl std::fmt::Display for QueueKey {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        let short_hash = get_short_hash_string(&self.hash);
256
257        write!(f, "LT_HASH({}_{short_hash})", self.lt)
258    }
259}
260
261#[cfg(feature = "storage")]
262impl tycho_storage_traits::StoredValue for QueueKey {
263    const SIZE_HINT: usize = 8 + 32;
264
265    type OnStackSlice = [u8; Self::SIZE_HINT];
266
267    fn serialize<T: tycho_storage_traits::StoredValueBuffer>(&self, buffer: &mut T) {
268        buffer.write_raw_slice(&self.lt.to_be_bytes());
269        buffer.write_raw_slice(&self.hash.0);
270    }
271
272    fn deserialize(reader: &mut &[u8]) -> Self {
273        if reader.len() < Self::SIZE_HINT {
274            panic!("Insufficient data for deserialization")
275        }
276
277        let mut lt_bytes = [0u8; 8];
278        lt_bytes.copy_from_slice(&reader[..8]);
279        let lt = u64::from_be_bytes(lt_bytes);
280        *reader = &reader[8..];
281
282        let mut hash_bytes = [0u8; 32];
283        hash_bytes.copy_from_slice(&reader[..32]);
284        let hash = HashBytes(hash_bytes);
285        *reader = &reader[32..];
286
287        Self { lt, hash }
288    }
289}
290
291pub fn get_short_hash_string(hash: &HashBytes) -> String {
292    let mut short_hash = [0u8; 8];
293    hex::encode_to_slice(&hash.as_array()[..4], &mut short_hash).ok();
294
295    // SAFETY: output is guaranteed to contain only [0-9a-f]
296    let res = unsafe { std::str::from_utf8_unchecked(&short_hash) };
297
298    res.to_owned()
299}
300
301pub fn get_short_addr_string(addr: &IntAddr) -> String {
302    match addr {
303        IntAddr::Std(addr) => {
304            let addr_hash_short = get_short_hash_string(&addr.address);
305            format!("{}:{}", addr.workchain, addr_hash_short)
306        }
307        IntAddr::Var(_) => unreachable!(),
308    }
309}
310
311/// Std dest addr
312#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
313pub struct RouterAddr {
314    pub workchain: i8,
315    pub account: HashBytes,
316}
317
318impl RouterAddr {
319    pub const MIN: Self = Self {
320        workchain: i8::MIN,
321        account: HashBytes::ZERO,
322    };
323
324    pub const MAX: Self = Self {
325        workchain: i8::MAX,
326        account: HashBytes([0xff; 32]),
327    };
328
329    const TL_SIZE_HINT: usize = 4 + 32;
330
331    pub fn to_int_addr(&self) -> IntAddr {
332        IntAddr::Std(StdAddr {
333            anycast: None,
334            workchain: self.workchain,
335            address: self.account,
336        })
337    }
338
339    pub fn from_int_addr(addr: &IntAddr) -> Option<Self> {
340        match addr {
341            IntAddr::Std(addr) => Some(Self::from(addr)),
342            IntAddr::Var(_) => None,
343        }
344    }
345}
346
347impl From<&StdAddr> for RouterAddr {
348    fn from(value: &StdAddr) -> Self {
349        Self {
350            workchain: value.workchain,
351            account: value.address,
352        }
353    }
354}
355
356impl From<StdAddr> for RouterAddr {
357    fn from(value: StdAddr) -> Self {
358        Self::from(&value)
359    }
360}
361
362impl TlWrite for RouterAddr {
363    type Repr = tl_proto::Boxed;
364
365    fn max_size_hint(&self) -> usize {
366        Self::TL_SIZE_HINT
367    }
368
369    fn write_to<P>(&self, packet: &mut P)
370    where
371        P: tl_proto::TlPacket,
372    {
373        packet.write_i32(self.workchain as i32);
374        tl::hash_bytes::write(&self.account, packet);
375    }
376}
377
378impl<'tl> TlRead<'tl> for RouterAddr {
379    type Repr = tl_proto::Boxed;
380
381    fn read_from(data: &mut &'tl [u8]) -> tl_proto::TlResult<Self> {
382        let Ok(workchain) = i32::read_from(data)?.try_into() else {
383            return Err(TlError::InvalidData);
384        };
385        let account = tl::hash_bytes::read(data)?;
386
387        Ok(Self { workchain, account })
388    }
389}
390
391#[cfg(feature = "storage")]
392impl tycho_storage_traits::StoredValue for RouterAddr {
393    const SIZE_HINT: usize = 1 + 32;
394    type OnStackSlice = [u8; Self::SIZE_HINT];
395
396    fn serialize<T: tycho_storage_traits::StoredValueBuffer>(&self, buffer: &mut T) {
397        buffer.write_raw_slice(&[self.workchain as u8]);
398        buffer.write_raw_slice(&self.account.0);
399    }
400
401    fn deserialize(reader: &mut &[u8]) -> Self
402    where
403        Self: Sized,
404    {
405        if reader.len() < Self::SIZE_HINT {
406            panic!("Insufficient data for deserialization");
407        }
408
409        let workchain = reader[0] as i8;
410        *reader = &reader[1..];
411
412        let mut account_bytes = [0u8; 32];
413        account_bytes.copy_from_slice(&reader[..32]);
414        let account = HashBytes(account_bytes);
415        *reader = &reader[32..];
416
417        Self { workchain, account }
418    }
419}
420
421#[derive(Default, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
422#[repr(transparent)]
423pub struct QueuePartitionIdx(pub u16);
424
425impl QueuePartitionIdx {
426    pub const ZERO: Self = Self(0);
427    pub const MIN: Self = Self(u16::MIN);
428    pub const MAX: Self = Self(u16::MAX);
429
430    pub const fn is_zero(self) -> bool {
431        self.0 == 0
432    }
433}
434
435impl PartialEq<QueuePartitionIdx> for u16 {
436    #[inline]
437    fn eq(&self, other: &QueuePartitionIdx) -> bool {
438        *self == other.0
439    }
440}
441
442impl PartialEq<&QueuePartitionIdx> for u16 {
443    #[inline]
444    fn eq(&self, other: &&QueuePartitionIdx) -> bool {
445        *self == other.0
446    }
447}
448
449impl PartialEq<u16> for QueuePartitionIdx {
450    #[inline]
451    fn eq(&self, other: &u16) -> bool {
452        self.0 == *other
453    }
454}
455
456impl PartialEq<&u16> for QueuePartitionIdx {
457    #[inline]
458    fn eq(&self, other: &&u16) -> bool {
459        self.0 == **other
460    }
461}
462
463impl std::fmt::Display for QueuePartitionIdx {
464    #[inline]
465    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
466        std::fmt::Display::fmt(&self.0, f)
467    }
468}
469
470impl std::fmt::Debug for QueuePartitionIdx {
471    #[inline]
472    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473        std::fmt::Display::fmt(self, f)
474    }
475}
476
477impl From<u16> for QueuePartitionIdx {
478    #[inline]
479    fn from(value: u16) -> Self {
480        Self(value)
481    }
482}
483
484impl From<QueuePartitionIdx> for u16 {
485    #[inline]
486    fn from(value: QueuePartitionIdx) -> Self {
487        value.0
488    }
489}
490
491#[cfg(feature = "storage")]
492impl tycho_storage_traits::StoredValue for QueuePartitionIdx {
493    const SIZE_HINT: usize = std::mem::size_of::<QueuePartitionIdx>();
494
495    type OnStackSlice = [u8; Self::SIZE_HINT];
496
497    fn serialize<T: tycho_storage_traits::StoredValueBuffer>(&self, buffer: &mut T) {
498        buffer.write_raw_slice(&self.0.to_be_bytes());
499    }
500
501    fn deserialize(reader: &mut &[u8]) -> Self {
502        if reader.len() < Self::SIZE_HINT {
503            panic!("Insufficient data for deserialization");
504        }
505
506        let mut partition_bytes = [0u8; 2];
507        partition_bytes.copy_from_slice(&reader[..2]);
508        let partition = u16::from_be_bytes(partition_bytes);
509        *reader = &reader[2..];
510
511        Self(partition)
512    }
513}
514
515pub type RouterPartitions = BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>;
516
517pub mod processed_to_map {
518    use tl_proto::{TlPacket, TlResult};
519
520    use super::*;
521
522    /// We assume that the number of processed shards is limited.
523    const MAX_SIZE: usize = 1000;
524
525    pub fn size_hint(items: &BTreeMap<ShardIdent, QueueKey>) -> usize {
526        const PER_ITEM: usize = tl::shard_ident::SIZE_HINT + QueueKey::SIZE_HINT;
527
528        4 + items.len() * PER_ITEM
529    }
530
531    pub fn write<P: TlPacket>(items: &BTreeMap<ShardIdent, QueueKey>, packet: &mut P) {
532        packet.write_u32(items.len() as u32);
533
534        for (shard_ident, processed_to) in items {
535            tl::shard_ident::write(shard_ident, packet);
536            processed_to.write_to(packet);
537        }
538    }
539
540    pub fn read(data: &mut &[u8]) -> TlResult<BTreeMap<ShardIdent, QueueKey>> {
541        let len = u32::read_from(data)? as usize;
542        if len > MAX_SIZE {
543            return Err(tl_proto::TlError::InvalidData);
544        }
545
546        let mut items = BTreeMap::new();
547        let mut prev_shard = None;
548        for _ in 0..len {
549            let shard_ident = tl::shard_ident::read(data)?;
550
551            // Require that shards are sorted in ascending order.
552            if let Some(prev_shard) = prev_shard
553                && shard_ident <= prev_shard
554            {
555                return Err(tl_proto::TlError::InvalidData);
556            }
557            prev_shard = Some(shard_ident);
558
559            // Read the rest of the entry.
560            items.insert(shard_ident, QueueKey::read_from(data)?);
561        }
562
563        Ok(items)
564    }
565}
566
567pub mod router_partitions_map {
568    use tl_proto::{TlPacket, TlResult};
569
570    use super::*;
571
572    const MAX_ADDRS_PER_PARTITION: u32 = 10_000_000;
573
574    pub fn size_hint(items: &BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>) -> usize {
575        let mut size = 4;
576        for accounts in items.values() {
577            size += 4 + 4 + accounts.len() * RouterAddr::TL_SIZE_HINT;
578        }
579        size
580    }
581
582    pub fn write<P>(items: &BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>, packet: &mut P)
583    where
584        P: TlPacket,
585    {
586        packet.write_u32(items.len() as u32);
587        for (partition, accounts) in items {
588            packet.write_u32(partition.0 as u32);
589            packet.write_u32(accounts.len() as u32);
590            for account in accounts {
591                account.write_to(packet);
592            }
593        }
594    }
595
596    pub fn read(data: &mut &[u8]) -> TlResult<BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>> {
597        let partition_count = u32::read_from(data)?;
598        if partition_count > u16::MAX as u32 + 1 {
599            return Err(TlError::InvalidData);
600        }
601
602        let mut partitions = BTreeMap::new();
603
604        let mut prev_index = None;
605        for _ in 0..partition_count {
606            let Ok::<u16, _>(partition_index) = u32::read_from(data)?.try_into() else {
607                return Err(TlError::InvalidData);
608            };
609
610            if let Some(prev_index) = prev_index
611                && partition_index <= prev_index
612            {
613                return Err(TlError::InvalidData);
614            }
615            prev_index = Some(partition_index);
616
617            let account_count = u32::read_from(data)?;
618            if account_count > MAX_ADDRS_PER_PARTITION {
619                return Err(TlError::InvalidData);
620            }
621
622            let mut accounts = BTreeSet::new();
623            let mut prev_account = None;
624            for _ in 0..account_count {
625                let account = RouterAddr::read_from(data)?;
626
627                if let Some(prev_account) = &prev_account
628                    && &account <= prev_account
629                {
630                    return Err(TlError::InvalidData);
631                }
632                prev_account = Some(account);
633
634                let is_unique = accounts.insert(account);
635                debug_assert!(is_unique);
636            }
637
638            partitions.insert(partition_index.into(), accounts);
639        }
640
641        Ok(partitions)
642    }
643}
644
645mod queue_diffs_list {
646    use tl_proto::{TlPacket, TlResult};
647
648    use super::*;
649
650    /// We assume that the number of queue diffs is limited.
651    const MAX_SIZE: usize = 1000;
652
653    pub fn size_hint(items: &[QueueDiff]) -> usize {
654        4 + items.iter().map(TlWrite::max_size_hint).sum::<usize>()
655    }
656
657    pub fn write<P: TlPacket>(items: &[QueueDiff], packet: &mut P) {
658        packet.write_u32(items.len() as u32);
659
660        let mut iter = items.iter();
661        let mut latest_diff = iter.next().expect("diffs list must not be empty");
662        latest_diff.write_to(packet);
663
664        for diff in iter {
665            // Require that diffs are sorted by descending seqno.
666            debug_assert!(latest_diff.seqno > diff.seqno);
667            debug_assert_eq!(latest_diff.prev_hash, diff.hash);
668            latest_diff = diff;
669
670            diff.write_to(packet);
671        }
672    }
673
674    pub fn read(data: &mut &[u8]) -> TlResult<Vec<QueueDiff>> {
675        let len = u32::read_from(data)? as usize;
676        if len > MAX_SIZE || len == 0 {
677            return Err(tl_proto::TlError::InvalidData);
678        }
679
680        let mut items = Vec::with_capacity(len);
681        items.push(QueueDiff::read_from(data)?);
682
683        let (mut latest_seqno, mut prev_hash) = {
684            let latest = items.first().expect("always not empty");
685            (latest.seqno, latest.prev_hash)
686        };
687
688        for _ in 1..len {
689            let diff = QueueDiff::read_from(data)?;
690
691            // Require that diffs are sorted by descending seqno.
692            if latest_seqno <= diff.seqno || prev_hash != diff.hash {
693                return Err(tl_proto::TlError::InvalidData);
694            }
695            latest_seqno = diff.seqno;
696            prev_hash = diff.prev_hash;
697
698            items.push(diff);
699        }
700
701        Ok(items)
702    }
703}
704
705mod messages_list {
706    use super::*;
707
708    /// We assume that the number of messages is limited.
709    const MAX_SIZE: usize = 10_000_000;
710
711    pub fn size_hint(items: &[HashBytes]) -> usize {
712        4 + items.len() * tl::hash_bytes::SIZE_HINT
713    }
714
715    pub fn write<P: tl_proto::TlPacket>(items: &[HashBytes], packet: &mut P) {
716        static ZERO_HASH: HashBytes = HashBytes::ZERO;
717
718        packet.write_u32(items.len() as u32);
719
720        let mut prev_hash = &ZERO_HASH;
721        for item in items {
722            // Require that messages are sorted by ascending hash.
723            debug_assert!(prev_hash < item);
724            prev_hash = item;
725
726            tl::hash_bytes::write(item, packet);
727        }
728    }
729
730    pub fn read(data: &mut &[u8]) -> tl_proto::TlResult<Vec<HashBytes>> {
731        let len = u32::read_from(data)? as usize;
732        if len > MAX_SIZE {
733            return Err(tl_proto::TlError::InvalidData);
734        }
735
736        let mut items = Vec::with_capacity(len);
737
738        let mut prev_hash = HashBytes::ZERO;
739        for _ in 0..len {
740            // NOTE: Assume that there are no messages with zero hash.
741            let hash = tl::hash_bytes::read(data)?;
742
743            // Require that messages are sorted by ascending hash.
744            if hash <= prev_hash {
745                return Err(tl_proto::TlError::InvalidData);
746            }
747            prev_hash = hash;
748
749            items.push(hash);
750        }
751
752        Ok(items)
753    }
754}
755
756mod state_messages_list {
757    use super::*;
758
759    /// We assume that the number of chunks is limited.
760    pub const MAX_CHUNKS: usize = 10_000_000;
761
762    pub const MAX_CHUNK_SIZE: usize = 100 << 20; // 100 MB
763
764    pub type BigBytes = tycho_util::tl::BigBytes<MAX_CHUNK_SIZE>;
765
766    pub fn size_hint(items: &[Bytes]) -> usize {
767        4 + items.iter().map(BigBytes::size_hint).sum::<usize>()
768    }
769
770    pub fn write<P: tl_proto::TlPacket>(items: &[Bytes], packet: &mut P) {
771        packet.write_u32(items.len() as u32);
772        for item in items {
773            BigBytes::write(item, packet);
774        }
775    }
776
777    pub fn read(data: &mut &[u8]) -> tl_proto::TlResult<Vec<Bytes>> {
778        let len = u32::read_from(data)? as usize;
779        if len > MAX_CHUNKS {
780            return Err(tl_proto::TlError::InvalidData);
781        }
782
783        let mut items = Vec::with_capacity(len);
784        for _ in 0..len {
785            items.push(BigBytes::read(data)?);
786        }
787
788        Ok(items)
789    }
790}
791
792mod state_messages_list_ref {
793    use super::state_messages_list::{MAX_CHUNK_SIZE, MAX_CHUNKS};
794    use super::*;
795
796    type BigBytesRef = tycho_util::tl::BigBytesRef<MAX_CHUNK_SIZE>;
797
798    pub fn size_hint(items: &[&[u8]]) -> usize {
799        4 + items.iter().map(BigBytesRef::size_hint).sum::<usize>()
800    }
801
802    pub fn write<P: tl_proto::TlPacket>(items: &[&[u8]], packet: &mut P) {
803        packet.write_u32(items.len() as u32);
804        for item in items {
805            BigBytesRef::write(item, packet);
806        }
807    }
808
809    pub fn read<'tl>(data: &mut &'tl [u8]) -> tl_proto::TlResult<Vec<&'tl [u8]>> {
810        let len = u32::read_from(data)? as usize;
811        if len > MAX_CHUNKS {
812            return Err(tl_proto::TlError::InvalidData);
813        }
814
815        let mut items = Vec::with_capacity(len);
816        for _ in 0..len {
817            items.push(BigBytesRef::read(data)?);
818        }
819
820        Ok(items)
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827
828    #[test]
829    fn queue_diff_binary_repr() {
830        let addr1 = RouterAddr {
831            workchain: 0,
832            account: HashBytes::from([0x01; 32]),
833        };
834
835        let addr2 = RouterAddr {
836            workchain: 1,
837            account: HashBytes::from([0x02; 32]),
838        };
839
840        let mut diff = QueueDiff {
841            hash: HashBytes::ZERO, // NOTE: Uninitialized
842            prev_hash: HashBytes::from([0x33; 32]),
843            shard_ident: ShardIdent::MASTERCHAIN,
844            seqno: 123,
845            processed_to: BTreeMap::from([
846                (ShardIdent::MASTERCHAIN, QueueKey {
847                    lt: 1,
848                    hash: HashBytes::from([0x11; 32]),
849                }),
850                (ShardIdent::BASECHAIN, QueueKey {
851                    lt: 123123,
852                    hash: HashBytes::from([0x22; 32]),
853                }),
854            ]),
855            min_message: QueueKey {
856                lt: 1,
857                hash: HashBytes::from([0x11; 32]),
858            },
859            max_message: QueueKey {
860                lt: 123,
861                hash: HashBytes::from([0x22; 32]),
862            },
863            messages: vec![
864                HashBytes::from([0x01; 32]),
865                HashBytes::from([0x02; 32]),
866                HashBytes::from([0x03; 32]),
867            ],
868            router_partitions_src: Default::default(),
869            router_partitions_dst: BTreeMap::from([(1.into(), BTreeSet::from([addr1, addr2]))]),
870        };
871
872        let bytes = tl_proto::serialize(&diff);
873        assert_eq!(bytes.len(), diff.max_size_hint());
874
875        let parsed = tl_proto::deserialize::<QueueDiff>(&bytes).unwrap();
876        assert_eq!(parsed.hash, QueueDiff::compute_hash(&bytes));
877
878        diff.hash = parsed.hash;
879        assert_eq!(diff, parsed);
880    }
881
882    #[test]
883    fn queue_state_binary_repr() {
884        let addr1 = RouterAddr {
885            workchain: 0,
886            account: HashBytes::from([0x01; 32]),
887        };
888
889        let addr2 = RouterAddr {
890            workchain: 1,
891            account: HashBytes::from([0x02; 32]),
892        };
893
894        let mut queue_diffs = Vec::<QueueDiff>::new();
895        for seqno in 1..=10 {
896            let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default();
897
898            let mut diff = QueueDiff {
899                hash: HashBytes::ZERO, // NOTE: Uninitialized
900                prev_hash,
901                shard_ident: ShardIdent::MASTERCHAIN,
902                seqno,
903                processed_to: BTreeMap::from([
904                    (ShardIdent::MASTERCHAIN, QueueKey {
905                        lt: 10 * seqno as u64,
906                        hash: HashBytes::from([seqno as u8; 32]),
907                    }),
908                    (ShardIdent::BASECHAIN, QueueKey {
909                        lt: 123123 * seqno as u64,
910                        hash: HashBytes::from([seqno as u8 * 2; 32]),
911                    }),
912                ]),
913                min_message: QueueKey {
914                    lt: 1,
915                    hash: HashBytes::from([0x11; 32]),
916                },
917                max_message: QueueKey {
918                    lt: 123,
919                    hash: HashBytes::from([0x22; 32]),
920                },
921                messages: vec![
922                    HashBytes::from([0x01; 32]),
923                    HashBytes::from([0x02; 32]),
924                    HashBytes::from([0x03; 32]),
925                ],
926                router_partitions_src: Default::default(),
927                router_partitions_dst: BTreeMap::from([(1.into(), BTreeSet::from([addr1, addr2]))]),
928            };
929
930            // NOTE: We need this for the hash computation.
931            diff.recompute_hash();
932
933            queue_diffs.push(diff);
934        }
935
936        // We store diffs in descending order.
937        queue_diffs.reverse();
938
939        let state = QueueStateHeader {
940            shard_ident: ShardIdent::MASTERCHAIN,
941            seqno: 10,
942            queue_diffs,
943        };
944
945        let bytes = tl_proto::serialize(&state);
946        assert_eq!(bytes.len(), state.max_size_hint());
947
948        let parsed = tl_proto::deserialize::<QueueStateHeader>(&bytes).unwrap();
949        assert_eq!(state, parsed);
950    }
951
952    #[test]
953    fn test_next_value() {
954        // 1) Check increment when hash is all zeros
955        let key_zero = QueueKey {
956            lt: 5,
957            hash: HashBytes::ZERO,
958        };
959        let next_zero = key_zero.next_value();
960        // Expect that lt remains unchanged and hash is [0..0, 1]
961        assert_eq!(
962            next_zero.lt, 5,
963            "LT must remain the same if hash is not full 0xFF"
964        );
965        let mut expected_hash_zero = [0u8; 32];
966        expected_hash_zero[31] = 1;
967        assert_eq!(
968            next_zero.hash.0, expected_hash_zero,
969            "Hash should increment by 1"
970        );
971
972        // 2) Check increment when hash has partial 0xFF at the end
973        //    e.g., last two bytes are 0xFF, but not the whole array
974        let mut partial_ff = [0u8; 32];
975        partial_ff[30] = 0xFF;
976        partial_ff[31] = 0xFF;
977        let key_partial_ff = QueueKey {
978            lt: 10,
979            hash: HashBytes(partial_ff),
980        };
981        let next_partial_ff = key_partial_ff.next_value();
982        // Expected result: carry rolls over the last two 0xFF bytes
983        // and increments the next byte
984        let mut expected_hash_partial = [0u8; 32];
985        expected_hash_partial[29] = 0x01; // incremented by carry
986        // bytes 30, 31 become 0x00
987        assert_eq!(
988            next_partial_ff.lt, 10,
989            "LT must remain the same with partial 0xFF"
990        );
991        assert_eq!(
992            next_partial_ff.hash.0, expected_hash_partial,
993            "Hash should be incremented correctly with carry"
994        );
995
996        // 3) Check increment when hash is fully 0xFF
997        let key_full_ff = QueueKey {
998            lt: 999,
999            hash: HashBytes([0xFF; 32]),
1000        };
1001        let next_full_ff = key_full_ff.next_value();
1002        // Expect that hash resets to zero and LT increments by 1
1003        assert_eq!(
1004            next_full_ff.lt, 1000,
1005            "LT must increment if hash was all 0xFF"
1006        );
1007        assert_eq!(next_full_ff.hash.0, [0u8; 32], "Hash should reset to zero");
1008
1009        // 4) A quick check of mid-range increment with carry:
1010        //    Example: [.., 0x01, 0xFF, 0xFF]
1011        let mut mid_hash = [0u8; 32];
1012        mid_hash[29] = 0x01;
1013        mid_hash[30] = 0xFF;
1014        mid_hash[31] = 0xFF;
1015        let key_mid = QueueKey {
1016            lt: 50,
1017            hash: HashBytes(mid_hash),
1018        };
1019        let next_mid = key_mid.next_value();
1020        // We expect that byte 29 increments to 0x02 and the last two bytes become 0x00
1021        let mut expected_mid_hash = [0u8; 32];
1022        expected_mid_hash[29] = 0x02;
1023        assert_eq!(
1024            next_mid.lt, 50,
1025            "LT should remain the same for a mid-range carry"
1026        );
1027        assert_eq!(
1028            next_mid.hash.0, expected_mid_hash,
1029            "Hash should increment the correct byte after partial 0xFF"
1030        );
1031    }
1032}