tycho_block_util/queue/
proto.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3
4use bytes::Bytes;
5use tl_proto::{TlError, TlRead, TlWrite};
6use tycho_types::models::*;
7use tycho_types::prelude::*;
8
9use crate::tl;
10
11/// Representation of an internal messages queue diff.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub struct QueueDiff {
14    /// Computed hash of this diff.
15    ///
16    /// NOTE: This field is not serialized and can be [`HashBytes::ZERO`] for serialization.
17    pub hash: HashBytes,
18
19    /// Hash of the TL repr of the previous queue diff.
20    pub prev_hash: HashBytes,
21    /// Shard identifier of the corresponding block
22    pub shard_ident: ShardIdent,
23    /// Seqno of the corresponding block.
24    pub seqno: u32,
25    /// collator boundaries.
26    pub processed_to: BTreeMap<ShardIdent, QueueKey>,
27    /// Min message queue key.
28    pub min_message: QueueKey,
29    /// Max message queue key.
30    pub max_message: QueueKey,
31    /// List of message hashes (sorted ASC).
32    pub messages: Vec<HashBytes>,
33    /// Inbound router partitions.
34    pub router_partitions_src: RouterPartitions,
35    /// Outbound router partitions.
36    pub router_partitions_dst: RouterPartitions,
37}
38
39impl QueueDiff {
40    pub const TL_ID: u32 = tl_proto::id!("block.queueDiff", scheme = "proto.tl");
41
42    /// Recomputes the hash of the diff.
43    ///
44    /// NOTE: Since the hash is not serialized, it is NOT mandatory to call this method
45    /// if it will not be used after this.
46    pub fn recompute_hash(&mut self) {
47        self.hash = Self::compute_hash(&tl_proto::serialize(&*self));
48    }
49
50    /// Computes the hash of the serialized diff.
51    pub fn compute_hash(data: &[u8]) -> HashBytes {
52        Boc::file_hash_blake(data)
53    }
54}
55
56impl TlWrite for QueueDiff {
57    type Repr = tl_proto::Boxed;
58
59    fn max_size_hint(&self) -> usize {
60        4 + tl::hash_bytes::SIZE_HINT
61            + tl::shard_ident::SIZE_HINT
62            + 4
63            + processed_to_map::size_hint(&self.processed_to)
64            + 2 * QueueKey::SIZE_HINT
65            + messages_list::size_hint(&self.messages)
66            + router_partitions_map::size_hint(&self.router_partitions_src)
67            + router_partitions_map::size_hint(&self.router_partitions_dst)
68    }
69
70    fn write_to<P>(&self, packet: &mut P)
71    where
72        P: tl_proto::TlPacket,
73    {
74        packet.write_u32(Self::TL_ID);
75        tl::hash_bytes::write(&self.prev_hash, packet);
76        tl::shard_ident::write(&self.shard_ident, packet);
77        packet.write_u32(self.seqno);
78        processed_to_map::write(&self.processed_to, packet);
79        self.min_message.write_to(packet);
80        self.max_message.write_to(packet);
81        messages_list::write(&self.messages, packet);
82        router_partitions_map::write(&self.router_partitions_src, packet);
83        router_partitions_map::write(&self.router_partitions_dst, packet);
84    }
85}
86
87impl<'tl> TlRead<'tl> for QueueDiff {
88    type Repr = tl_proto::Boxed;
89
90    fn read_from(data: &mut &'tl [u8]) -> tl_proto::TlResult<Self> {
91        let data_before = *data;
92        if u32::read_from(data)? != Self::TL_ID {
93            return Err(tl_proto::TlError::UnknownConstructor);
94        }
95
96        let mut result = Self {
97            hash: HashBytes::ZERO,
98            prev_hash: tl::hash_bytes::read(data)?,
99            shard_ident: tl::shard_ident::read(data)?,
100            seqno: u32::read_from(data)?,
101            processed_to: processed_to_map::read(data)?,
102            min_message: QueueKey::read_from(data)?,
103            max_message: QueueKey::read_from(data)?,
104            messages: messages_list::read(data)?,
105            router_partitions_src: router_partitions_map::read(data)?,
106            router_partitions_dst: router_partitions_map::read(data)?,
107        };
108
109        if result.max_message < result.min_message {
110            return Err(tl_proto::TlError::InvalidData);
111        }
112
113        let result_bytes = data_before.len() - data.len();
114
115        // Compute the hash of the diff
116        result.hash = Self::compute_hash(&data_before[..result_bytes]);
117
118        Ok(result)
119    }
120}
121
122/// Persistent internal messages queue state.
123#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)]
124#[tl(boxed, id = "block.queueState", scheme = "proto.tl")]
125pub struct QueueState {
126    pub header: QueueStateHeader,
127
128    /// Chunks of messages in the same order as messages in `header.queue_diffs.messages`.
129    /// Only the order is guaranteed, but not the chunk sizes.
130    #[tl(with = "state_messages_list")]
131    pub messages: Vec<Bytes>,
132}
133
134/// Persistent internal messages queue state.
135///
136/// A non-owned version of [`QueueState`].
137#[derive(Clone, PartialEq, Eq, TlWrite, TlRead)]
138#[tl(boxed, id = "block.queueState", scheme = "proto.tl")]
139pub struct QueueStateRef<'tl> {
140    pub header: QueueStateHeader,
141
142    /// Chunks of messages in the same order as messages in `header.queue_diffs.messages`.
143    /// Only the order is guaranteed, but not the chunk sizes.
144    #[tl(with = "state_messages_list_ref")]
145    pub messages: Vec<&'tl [u8]>,
146}
147
148/// A header for a persistent internal messages queue state.
149#[derive(Debug, Clone, PartialEq, Eq, TlWrite, TlRead)]
150#[tl(boxed, id = "block.queueStateHeader", scheme = "proto.tl")]
151pub struct QueueStateHeader {
152    #[tl(with = "tl::shard_ident")]
153    pub shard_ident: ShardIdent,
154    pub seqno: u32,
155    #[tl(with = "queue_diffs_list")]
156    pub queue_diffs: Vec<QueueDiff>,
157}
158
159/// Queue key.
160#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)]
161pub struct QueueKey {
162    pub lt: u64,
163    #[tl(with = "tl::hash_bytes")]
164    pub hash: HashBytes,
165}
166
167impl QueueKey {
168    pub const SIZE_HINT: usize = 8 + 32;
169
170    pub const MIN: Self = Self {
171        lt: 0,
172        hash: HashBytes::ZERO,
173    };
174
175    pub const MAX: Self = Self {
176        lt: u64::MAX,
177        hash: HashBytes([0xff; 32]),
178    };
179
180    pub const fn min_for_lt(lt: u64) -> Self {
181        Self {
182            lt,
183            hash: HashBytes::ZERO,
184        }
185    }
186
187    pub const fn max_for_lt(lt: u64) -> Self {
188        Self {
189            lt,
190            hash: HashBytes([0xff; 32]),
191        }
192    }
193
194    // add min step to the key
195    pub fn next_value(&self) -> Self {
196        let mut new_lt = self.lt;
197        let mut new_hash = self.hash;
198
199        if new_hash.0 == [0xff; 32] {
200            // check if lt is already max then do nothing
201            if new_lt == u64::MAX {
202                return Self {
203                    lt: u64::MAX,
204                    hash: HashBytes([0xff; 32]),
205                };
206            } else {
207                new_lt += 1;
208                new_hash = HashBytes::ZERO;
209            }
210        } else {
211            let carry = 1u8;
212            for byte in new_hash.0.iter_mut().rev() {
213                let (res, overflow) = byte.overflowing_add(carry);
214                *byte = res;
215                if !overflow {
216                    break;
217                }
218            }
219        }
220
221        Self {
222            lt: new_lt,
223            hash: new_hash,
224        }
225    }
226
227    #[inline]
228    pub const fn split(self) -> (u64, HashBytes) {
229        (self.lt, self.hash)
230    }
231}
232
233impl From<(u64, HashBytes)> for QueueKey {
234    #[inline]
235    fn from((lt, hash): (u64, HashBytes)) -> Self {
236        Self { lt, hash }
237    }
238}
239
240impl From<QueueKey> for (u64, HashBytes) {
241    #[inline]
242    fn from(key: QueueKey) -> Self {
243        key.split()
244    }
245}
246
247impl std::fmt::Debug for QueueKey {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        std::fmt::Display::fmt(self, f)
250    }
251}
252
253impl std::fmt::Display for QueueKey {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        let short_hash = get_short_hash_string(&self.hash);
256
257        write!(f, "LT_HASH({}_{short_hash})", self.lt)
258    }
259}
260
261#[cfg(feature = "storage")]
262impl tycho_storage_traits::StoredValue for QueueKey {
263    const SIZE_HINT: usize = 8 + 32;
264
265    type OnStackSlice = [u8; Self::SIZE_HINT];
266
267    fn serialize<T: tycho_storage_traits::StoredValueBuffer>(&self, buffer: &mut T) {
268        buffer.write_raw_slice(&self.lt.to_be_bytes());
269        buffer.write_raw_slice(&self.hash.0);
270    }
271
272    fn deserialize(reader: &mut &[u8]) -> Self {
273        if reader.len() < Self::SIZE_HINT {
274            panic!("Insufficient data for deserialization")
275        }
276
277        let mut lt_bytes = [0u8; 8];
278        lt_bytes.copy_from_slice(&reader[..8]);
279        let lt = u64::from_be_bytes(lt_bytes);
280        *reader = &reader[8..];
281
282        let mut hash_bytes = [0u8; 32];
283        hash_bytes.copy_from_slice(&reader[..32]);
284        let hash = HashBytes(hash_bytes);
285        *reader = &reader[32..];
286
287        Self { lt, hash }
288    }
289}
290
291pub fn get_short_hash_string(hash: &HashBytes) -> String {
292    let mut short_hash = [0u8; 8];
293    hex::encode_to_slice(&hash.as_array()[..4], &mut short_hash).ok();
294
295    // SAFETY: output is guaranteed to contain only [0-9a-f]
296    let res = unsafe { std::str::from_utf8_unchecked(&short_hash) };
297
298    res.to_owned()
299}
300
301pub fn get_short_addr_string(addr: &IntAddr) -> String {
302    match addr {
303        IntAddr::Std(addr) => {
304            let addr_hash_short = get_short_hash_string(&addr.address);
305            format!("{}:{}", addr.workchain, addr_hash_short)
306        }
307        IntAddr::Var(_) => unreachable!(),
308    }
309}
310
311/// Std dest addr
312#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
313pub struct RouterAddr {
314    pub workchain: i8,
315    pub account: HashBytes,
316}
317
318impl RouterAddr {
319    pub const MIN: Self = Self {
320        workchain: i8::MIN,
321        account: HashBytes::ZERO,
322    };
323
324    pub const MAX: Self = Self {
325        workchain: i8::MAX,
326        account: HashBytes([0xff; 32]),
327    };
328
329    const TL_SIZE_HINT: usize = 4 + 32;
330
331    pub fn to_int_addr(&self) -> IntAddr {
332        IntAddr::Std(StdAddr {
333            anycast: None,
334            workchain: self.workchain,
335            address: self.account,
336        })
337    }
338
339    pub fn from_int_addr(addr: &IntAddr) -> Option<Self> {
340        match addr {
341            IntAddr::Std(addr) => Some(Self::from(addr)),
342            IntAddr::Var(_) => None,
343        }
344    }
345}
346
347impl From<&StdAddr> for RouterAddr {
348    fn from(value: &StdAddr) -> Self {
349        Self {
350            workchain: value.workchain,
351            account: value.address,
352        }
353    }
354}
355
356impl From<StdAddr> for RouterAddr {
357    fn from(value: StdAddr) -> Self {
358        Self::from(&value)
359    }
360}
361
362impl TlWrite for RouterAddr {
363    type Repr = tl_proto::Boxed;
364
365    fn max_size_hint(&self) -> usize {
366        Self::TL_SIZE_HINT
367    }
368
369    fn write_to<P>(&self, packet: &mut P)
370    where
371        P: tl_proto::TlPacket,
372    {
373        packet.write_i32(self.workchain as i32);
374        tl::hash_bytes::write(&self.account, packet);
375    }
376}
377
378impl<'tl> TlRead<'tl> for RouterAddr {
379    type Repr = tl_proto::Boxed;
380
381    fn read_from(data: &mut &'tl [u8]) -> tl_proto::TlResult<Self> {
382        let Ok(workchain) = i32::read_from(data)?.try_into() else {
383            return Err(TlError::InvalidData);
384        };
385        let account = tl::hash_bytes::read(data)?;
386
387        Ok(Self { workchain, account })
388    }
389}
390
391#[cfg(feature = "storage")]
392impl tycho_storage_traits::StoredValue for RouterAddr {
393    const SIZE_HINT: usize = 1 + 32;
394    type OnStackSlice = [u8; Self::SIZE_HINT];
395
396    fn serialize<T: tycho_storage_traits::StoredValueBuffer>(&self, buffer: &mut T) {
397        buffer.write_raw_slice(&[self.workchain as u8]);
398        buffer.write_raw_slice(&self.account.0);
399    }
400
401    fn deserialize(reader: &mut &[u8]) -> Self
402    where
403        Self: Sized,
404    {
405        if reader.len() < Self::SIZE_HINT {
406            panic!("Insufficient data for deserialization");
407        }
408
409        let workchain = reader[0] as i8;
410        *reader = &reader[1..];
411
412        let mut account_bytes = [0u8; 32];
413        account_bytes.copy_from_slice(&reader[..32]);
414        let account = HashBytes(account_bytes);
415        *reader = &reader[32..];
416
417        Self { workchain, account }
418    }
419}
420
421#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
422#[repr(transparent)]
423pub struct QueuePartitionIdx(pub u16);
424
425impl QueuePartitionIdx {
426    pub const ZERO: Self = Self(0);
427    pub const MIN: Self = Self(u16::MIN);
428    pub const MAX: Self = Self(u16::MAX);
429
430    pub const fn is_zero(self) -> bool {
431        self.0 == 0
432    }
433}
434
435impl PartialEq<QueuePartitionIdx> for u16 {
436    #[inline]
437    fn eq(&self, other: &QueuePartitionIdx) -> bool {
438        *self == other.0
439    }
440}
441
442impl PartialEq<&QueuePartitionIdx> for u16 {
443    #[inline]
444    fn eq(&self, other: &&QueuePartitionIdx) -> bool {
445        *self == other.0
446    }
447}
448
449impl PartialEq<u16> for QueuePartitionIdx {
450    #[inline]
451    fn eq(&self, other: &u16) -> bool {
452        self.0 == *other
453    }
454}
455
456impl PartialEq<&u16> for QueuePartitionIdx {
457    #[inline]
458    fn eq(&self, other: &&u16) -> bool {
459        self.0 == **other
460    }
461}
462
463impl std::fmt::Display for QueuePartitionIdx {
464    #[inline]
465    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
466        std::fmt::Display::fmt(&self.0, f)
467    }
468}
469
470impl From<u16> for QueuePartitionIdx {
471    #[inline]
472    fn from(value: u16) -> Self {
473        Self(value)
474    }
475}
476
477impl From<QueuePartitionIdx> for u16 {
478    #[inline]
479    fn from(value: QueuePartitionIdx) -> Self {
480        value.0
481    }
482}
483
484#[cfg(feature = "storage")]
485impl tycho_storage_traits::StoredValue for QueuePartitionIdx {
486    const SIZE_HINT: usize = std::mem::size_of::<QueuePartitionIdx>();
487
488    type OnStackSlice = [u8; Self::SIZE_HINT];
489
490    fn serialize<T: tycho_storage_traits::StoredValueBuffer>(&self, buffer: &mut T) {
491        buffer.write_raw_slice(&self.0.to_be_bytes());
492    }
493
494    fn deserialize(reader: &mut &[u8]) -> Self {
495        if reader.len() < Self::SIZE_HINT {
496            panic!("Insufficient data for deserialization");
497        }
498
499        let mut partition_bytes = [0u8; 2];
500        partition_bytes.copy_from_slice(&reader[..2]);
501        let partition = u16::from_be_bytes(partition_bytes);
502        *reader = &reader[2..];
503
504        Self(partition)
505    }
506}
507
508pub type RouterPartitions = BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>;
509
510pub mod processed_to_map {
511    use tl_proto::{TlPacket, TlResult};
512
513    use super::*;
514
515    /// We assume that the number of processed shards is limited.
516    const MAX_SIZE: usize = 1000;
517
518    pub fn size_hint(items: &BTreeMap<ShardIdent, QueueKey>) -> usize {
519        const PER_ITEM: usize = tl::shard_ident::SIZE_HINT + QueueKey::SIZE_HINT;
520
521        4 + items.len() * PER_ITEM
522    }
523
524    pub fn write<P: TlPacket>(items: &BTreeMap<ShardIdent, QueueKey>, packet: &mut P) {
525        packet.write_u32(items.len() as u32);
526
527        for (shard_ident, processed_to) in items {
528            tl::shard_ident::write(shard_ident, packet);
529            processed_to.write_to(packet);
530        }
531    }
532
533    pub fn read(data: &mut &[u8]) -> TlResult<BTreeMap<ShardIdent, QueueKey>> {
534        let len = u32::read_from(data)? as usize;
535        if len > MAX_SIZE {
536            return Err(tl_proto::TlError::InvalidData);
537        }
538
539        let mut items = BTreeMap::new();
540        let mut prev_shard = None;
541        for _ in 0..len {
542            let shard_ident = tl::shard_ident::read(data)?;
543
544            // Require that shards are sorted in ascending order.
545            if let Some(prev_shard) = prev_shard {
546                if shard_ident <= prev_shard {
547                    return Err(tl_proto::TlError::InvalidData);
548                }
549            }
550            prev_shard = Some(shard_ident);
551
552            // Read the rest of the entry.
553            items.insert(shard_ident, QueueKey::read_from(data)?);
554        }
555
556        Ok(items)
557    }
558}
559
560pub mod router_partitions_map {
561    use tl_proto::{TlPacket, TlResult};
562
563    use super::*;
564
565    const MAX_ADDRS_PER_PARTITION: u32 = 10_000_000;
566
567    pub fn size_hint(items: &BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>) -> usize {
568        let mut size = 4;
569        for accounts in items.values() {
570            size += 4 + 4 + accounts.len() * RouterAddr::TL_SIZE_HINT;
571        }
572        size
573    }
574
575    pub fn write<P>(items: &BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>, packet: &mut P)
576    where
577        P: TlPacket,
578    {
579        packet.write_u32(items.len() as u32);
580        for (partition, accounts) in items {
581            packet.write_u32(partition.0 as u32);
582            packet.write_u32(accounts.len() as u32);
583            for account in accounts {
584                account.write_to(packet);
585            }
586        }
587    }
588
589    pub fn read(data: &mut &[u8]) -> TlResult<BTreeMap<QueuePartitionIdx, BTreeSet<RouterAddr>>> {
590        let partition_count = u32::read_from(data)?;
591        if partition_count > u16::MAX as u32 + 1 {
592            return Err(TlError::InvalidData);
593        }
594
595        let mut partitions = BTreeMap::new();
596
597        let mut prev_index = None;
598        for _ in 0..partition_count {
599            let Ok::<u16, _>(partition_index) = u32::read_from(data)?.try_into() else {
600                return Err(TlError::InvalidData);
601            };
602
603            if let Some(prev_index) = prev_index {
604                if partition_index <= prev_index {
605                    return Err(TlError::InvalidData);
606                }
607            }
608            prev_index = Some(partition_index);
609
610            let account_count = u32::read_from(data)?;
611            if account_count > MAX_ADDRS_PER_PARTITION {
612                return Err(TlError::InvalidData);
613            }
614
615            let mut accounts = BTreeSet::new();
616            let mut prev_account = None;
617            for _ in 0..account_count {
618                let account = RouterAddr::read_from(data)?;
619
620                if let Some(prev_account) = &prev_account {
621                    if &account <= prev_account {
622                        return Err(TlError::InvalidData);
623                    }
624                }
625                prev_account = Some(account);
626
627                let is_unique = accounts.insert(account);
628                debug_assert!(is_unique);
629            }
630
631            partitions.insert(partition_index.into(), accounts);
632        }
633
634        Ok(partitions)
635    }
636}
637
638mod queue_diffs_list {
639    use tl_proto::{TlPacket, TlResult};
640
641    use super::*;
642
643    /// We assume that the number of queue diffs is limited.
644    const MAX_SIZE: usize = 1000;
645
646    pub fn size_hint(items: &[QueueDiff]) -> usize {
647        4 + items.iter().map(TlWrite::max_size_hint).sum::<usize>()
648    }
649
650    pub fn write<P: TlPacket>(items: &[QueueDiff], packet: &mut P) {
651        packet.write_u32(items.len() as u32);
652
653        let mut iter = items.iter();
654        let mut latest_diff = iter.next().expect("diffs list must not be empty");
655        latest_diff.write_to(packet);
656
657        for diff in iter {
658            // Require that diffs are sorted by descending seqno.
659            debug_assert!(latest_diff.seqno > diff.seqno);
660            debug_assert_eq!(latest_diff.prev_hash, diff.hash);
661            latest_diff = diff;
662
663            diff.write_to(packet);
664        }
665    }
666
667    pub fn read(data: &mut &[u8]) -> TlResult<Vec<QueueDiff>> {
668        let len = u32::read_from(data)? as usize;
669        if len > MAX_SIZE || len == 0 {
670            return Err(tl_proto::TlError::InvalidData);
671        }
672
673        let mut items = Vec::with_capacity(len);
674        items.push(QueueDiff::read_from(data)?);
675
676        let (mut latest_seqno, mut prev_hash) = {
677            let latest = items.first().expect("always not empty");
678            (latest.seqno, latest.prev_hash)
679        };
680
681        for _ in 1..len {
682            let diff = QueueDiff::read_from(data)?;
683
684            // Require that diffs are sorted by descending seqno.
685            if latest_seqno <= diff.seqno || prev_hash != diff.hash {
686                return Err(tl_proto::TlError::InvalidData);
687            }
688            latest_seqno = diff.seqno;
689            prev_hash = diff.prev_hash;
690
691            items.push(diff);
692        }
693
694        Ok(items)
695    }
696}
697
698mod messages_list {
699    use super::*;
700
701    /// We assume that the number of messages is limited.
702    const MAX_SIZE: usize = 10_000_000;
703
704    pub fn size_hint(items: &[HashBytes]) -> usize {
705        4 + items.len() * tl::hash_bytes::SIZE_HINT
706    }
707
708    pub fn write<P: tl_proto::TlPacket>(items: &[HashBytes], packet: &mut P) {
709        static ZERO_HASH: HashBytes = HashBytes::ZERO;
710
711        packet.write_u32(items.len() as u32);
712
713        let mut prev_hash = &ZERO_HASH;
714        for item in items {
715            // Require that messages are sorted by ascending hash.
716            debug_assert!(prev_hash < item);
717            prev_hash = item;
718
719            tl::hash_bytes::write(item, packet);
720        }
721    }
722
723    pub fn read(data: &mut &[u8]) -> tl_proto::TlResult<Vec<HashBytes>> {
724        let len = u32::read_from(data)? as usize;
725        if len > MAX_SIZE {
726            return Err(tl_proto::TlError::InvalidData);
727        }
728
729        let mut items = Vec::with_capacity(len);
730
731        let mut prev_hash = HashBytes::ZERO;
732        for _ in 0..len {
733            // NOTE: Assume that there are no messages with zero hash.
734            let hash = tl::hash_bytes::read(data)?;
735
736            // Require that messages are sorted by ascending hash.
737            if hash <= prev_hash {
738                return Err(tl_proto::TlError::InvalidData);
739            }
740            prev_hash = hash;
741
742            items.push(hash);
743        }
744
745        Ok(items)
746    }
747}
748
749mod state_messages_list {
750    use super::*;
751
752    /// We assume that the number of chunks is limited.
753    pub const MAX_CHUNKS: usize = 10_000_000;
754
755    pub const MAX_CHUNK_SIZE: usize = 100 << 20; // 100 MB
756
757    pub type BigBytes = tycho_util::tl::BigBytes<MAX_CHUNK_SIZE>;
758
759    pub fn size_hint(items: &[Bytes]) -> usize {
760        4 + items.iter().map(BigBytes::size_hint).sum::<usize>()
761    }
762
763    pub fn write<P: tl_proto::TlPacket>(items: &[Bytes], packet: &mut P) {
764        packet.write_u32(items.len() as u32);
765        for item in items {
766            BigBytes::write(item, packet);
767        }
768    }
769
770    pub fn read(data: &mut &[u8]) -> tl_proto::TlResult<Vec<Bytes>> {
771        let len = u32::read_from(data)? as usize;
772        if len > MAX_CHUNKS {
773            return Err(tl_proto::TlError::InvalidData);
774        }
775
776        let mut items = Vec::with_capacity(len);
777        for _ in 0..len {
778            items.push(BigBytes::read(data)?);
779        }
780
781        Ok(items)
782    }
783}
784
785mod state_messages_list_ref {
786    use super::state_messages_list::{MAX_CHUNK_SIZE, MAX_CHUNKS};
787    use super::*;
788
789    type BigBytesRef = tycho_util::tl::BigBytesRef<MAX_CHUNK_SIZE>;
790
791    pub fn size_hint(items: &[&[u8]]) -> usize {
792        4 + items.iter().map(BigBytesRef::size_hint).sum::<usize>()
793    }
794
795    pub fn write<P: tl_proto::TlPacket>(items: &[&[u8]], packet: &mut P) {
796        packet.write_u32(items.len() as u32);
797        for item in items {
798            BigBytesRef::write(item, packet);
799        }
800    }
801
802    pub fn read<'tl>(data: &mut &'tl [u8]) -> tl_proto::TlResult<Vec<&'tl [u8]>> {
803        let len = u32::read_from(data)? as usize;
804        if len > MAX_CHUNKS {
805            return Err(tl_proto::TlError::InvalidData);
806        }
807
808        let mut items = Vec::with_capacity(len);
809        for _ in 0..len {
810            items.push(BigBytesRef::read(data)?);
811        }
812
813        Ok(items)
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820
821    #[test]
822    fn queue_diff_binary_repr() {
823        let addr1 = RouterAddr {
824            workchain: 0,
825            account: HashBytes::from([0x01; 32]),
826        };
827
828        let addr2 = RouterAddr {
829            workchain: 1,
830            account: HashBytes::from([0x02; 32]),
831        };
832
833        let mut diff = QueueDiff {
834            hash: HashBytes::ZERO, // NOTE: Uninitialized
835            prev_hash: HashBytes::from([0x33; 32]),
836            shard_ident: ShardIdent::MASTERCHAIN,
837            seqno: 123,
838            processed_to: BTreeMap::from([
839                (ShardIdent::MASTERCHAIN, QueueKey {
840                    lt: 1,
841                    hash: HashBytes::from([0x11; 32]),
842                }),
843                (ShardIdent::BASECHAIN, QueueKey {
844                    lt: 123123,
845                    hash: HashBytes::from([0x22; 32]),
846                }),
847            ]),
848            min_message: QueueKey {
849                lt: 1,
850                hash: HashBytes::from([0x11; 32]),
851            },
852            max_message: QueueKey {
853                lt: 123,
854                hash: HashBytes::from([0x22; 32]),
855            },
856            messages: vec![
857                HashBytes::from([0x01; 32]),
858                HashBytes::from([0x02; 32]),
859                HashBytes::from([0x03; 32]),
860            ],
861            router_partitions_src: Default::default(),
862            router_partitions_dst: BTreeMap::from([(1.into(), BTreeSet::from([addr1, addr2]))]),
863        };
864
865        let bytes = tl_proto::serialize(&diff);
866        assert_eq!(bytes.len(), diff.max_size_hint());
867
868        let parsed = tl_proto::deserialize::<QueueDiff>(&bytes).unwrap();
869        assert_eq!(parsed.hash, QueueDiff::compute_hash(&bytes));
870
871        diff.hash = parsed.hash;
872        assert_eq!(diff, parsed);
873    }
874
875    #[test]
876    fn queue_state_binary_repr() {
877        let addr1 = RouterAddr {
878            workchain: 0,
879            account: HashBytes::from([0x01; 32]),
880        };
881
882        let addr2 = RouterAddr {
883            workchain: 1,
884            account: HashBytes::from([0x02; 32]),
885        };
886
887        let mut queue_diffs = Vec::<QueueDiff>::new();
888        for seqno in 1..=10 {
889            let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default();
890
891            let mut diff = QueueDiff {
892                hash: HashBytes::ZERO, // NOTE: Uninitialized
893                prev_hash,
894                shard_ident: ShardIdent::MASTERCHAIN,
895                seqno,
896                processed_to: BTreeMap::from([
897                    (ShardIdent::MASTERCHAIN, QueueKey {
898                        lt: 10 * seqno as u64,
899                        hash: HashBytes::from([seqno as u8; 32]),
900                    }),
901                    (ShardIdent::BASECHAIN, QueueKey {
902                        lt: 123123 * seqno as u64,
903                        hash: HashBytes::from([seqno as u8 * 2; 32]),
904                    }),
905                ]),
906                min_message: QueueKey {
907                    lt: 1,
908                    hash: HashBytes::from([0x11; 32]),
909                },
910                max_message: QueueKey {
911                    lt: 123,
912                    hash: HashBytes::from([0x22; 32]),
913                },
914                messages: vec![
915                    HashBytes::from([0x01; 32]),
916                    HashBytes::from([0x02; 32]),
917                    HashBytes::from([0x03; 32]),
918                ],
919                router_partitions_src: Default::default(),
920                router_partitions_dst: BTreeMap::from([(1.into(), BTreeSet::from([addr1, addr2]))]),
921            };
922
923            // NOTE: We need this for the hash computation.
924            diff.recompute_hash();
925
926            queue_diffs.push(diff);
927        }
928
929        // We store diffs in descending order.
930        queue_diffs.reverse();
931
932        let state = QueueStateHeader {
933            shard_ident: ShardIdent::MASTERCHAIN,
934            seqno: 10,
935            queue_diffs,
936        };
937
938        let bytes = tl_proto::serialize(&state);
939        assert_eq!(bytes.len(), state.max_size_hint());
940
941        let parsed = tl_proto::deserialize::<QueueStateHeader>(&bytes).unwrap();
942        assert_eq!(state, parsed);
943    }
944
945    #[test]
946    fn test_next_value() {
947        // 1) Check increment when hash is all zeros
948        let key_zero = QueueKey {
949            lt: 5,
950            hash: HashBytes::ZERO,
951        };
952        let next_zero = key_zero.next_value();
953        // Expect that lt remains unchanged and hash is [0..0, 1]
954        assert_eq!(
955            next_zero.lt, 5,
956            "LT must remain the same if hash is not full 0xFF"
957        );
958        let mut expected_hash_zero = [0u8; 32];
959        expected_hash_zero[31] = 1;
960        assert_eq!(
961            next_zero.hash.0, expected_hash_zero,
962            "Hash should increment by 1"
963        );
964
965        // 2) Check increment when hash has partial 0xFF at the end
966        //    e.g., last two bytes are 0xFF, but not the whole array
967        let mut partial_ff = [0u8; 32];
968        partial_ff[30] = 0xFF;
969        partial_ff[31] = 0xFF;
970        let key_partial_ff = QueueKey {
971            lt: 10,
972            hash: HashBytes(partial_ff),
973        };
974        let next_partial_ff = key_partial_ff.next_value();
975        // Expected result: carry rolls over the last two 0xFF bytes
976        // and increments the next byte
977        let mut expected_hash_partial = [0u8; 32];
978        expected_hash_partial[29] = 0x01; // incremented by carry
979        // bytes 30, 31 become 0x00
980        assert_eq!(
981            next_partial_ff.lt, 10,
982            "LT must remain the same with partial 0xFF"
983        );
984        assert_eq!(
985            next_partial_ff.hash.0, expected_hash_partial,
986            "Hash should be incremented correctly with carry"
987        );
988
989        // 3) Check increment when hash is fully 0xFF
990        let key_full_ff = QueueKey {
991            lt: 999,
992            hash: HashBytes([0xFF; 32]),
993        };
994        let next_full_ff = key_full_ff.next_value();
995        // Expect that hash resets to zero and LT increments by 1
996        assert_eq!(
997            next_full_ff.lt, 1000,
998            "LT must increment if hash was all 0xFF"
999        );
1000        assert_eq!(next_full_ff.hash.0, [0u8; 32], "Hash should reset to zero");
1001
1002        // 4) A quick check of mid-range increment with carry:
1003        //    Example: [.., 0x01, 0xFF, 0xFF]
1004        let mut mid_hash = [0u8; 32];
1005        mid_hash[29] = 0x01;
1006        mid_hash[30] = 0xFF;
1007        mid_hash[31] = 0xFF;
1008        let key_mid = QueueKey {
1009            lt: 50,
1010            hash: HashBytes(mid_hash),
1011        };
1012        let next_mid = key_mid.next_value();
1013        // We expect that byte 29 increments to 0x02 and the last two bytes become 0x00
1014        let mut expected_mid_hash = [0u8; 32];
1015        expected_mid_hash[29] = 0x02;
1016        assert_eq!(
1017            next_mid.lt, 50,
1018            "LT should remain the same for a mid-range carry"
1019        );
1020        assert_eq!(
1021            next_mid.hash.0, expected_mid_hash,
1022            "Hash should increment the correct byte after partial 0xFF"
1023        );
1024    }
1025}