Skip to main content

tycho_collator/storage/
models.rs

1use std::collections::BTreeMap;
2
3use tl_proto::{TlError, TlPacket, TlRead, TlResult, TlWrite};
4use tycho_block_util::queue::{
5    QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions, processed_to_map,
6    router_partitions_map,
7};
8use tycho_block_util::tl;
9use tycho_storage::kv::{StoredValue, StoredValueBuffer};
10use tycho_types::cell::HashBytes;
11use tycho_types::models::ShardIdent;
12use tycho_util::FastHashMap;
13
14pub struct InternalQueueMessage<'a> {
15    pub key: ShardsInternalMessagesKey,
16    pub workchain: i8,
17    pub prefix: u64,
18    pub message_boc: &'a [u8],
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
22pub struct ShardsInternalMessagesKey {
23    pub partition: QueuePartitionIdx,
24    pub shard_ident: ShardIdent,
25    pub internal_message_key: QueueKey,
26}
27
28impl ShardsInternalMessagesKey {
29    pub fn new(
30        partition: QueuePartitionIdx,
31        shard_ident: ShardIdent,
32        internal_message_key: QueueKey,
33    ) -> Self {
34        Self {
35            partition,
36            shard_ident,
37            internal_message_key,
38        }
39    }
40}
41
42impl From<&[u8]> for ShardsInternalMessagesKey {
43    fn from(bytes: &[u8]) -> Self {
44        let mut reader = bytes;
45        Self::deserialize(&mut reader)
46    }
47}
48
49impl StoredValue for ShardsInternalMessagesKey {
50    const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT;
51
52    type OnStackSlice = [u8; Self::SIZE_HINT];
53
54    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
55        self.partition.serialize(buffer);
56        self.shard_ident.serialize(buffer);
57        self.internal_message_key.serialize(buffer);
58    }
59
60    fn deserialize(reader: &mut &[u8]) -> Self {
61        if reader.len() < Self::SIZE_HINT {
62            panic!("Insufficient data for deserialization")
63        }
64
65        let partition = QueuePartitionIdx::deserialize(reader);
66        let shard_ident = ShardIdent::deserialize(reader);
67        let internal_message_key = QueueKey::deserialize(reader);
68
69        Self {
70            partition,
71            shard_ident,
72            internal_message_key,
73        }
74    }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
78pub struct StatKey {
79    pub shard_ident: ShardIdent,
80    pub partition: QueuePartitionIdx,
81    pub max_message: QueueKey,
82    pub dest: RouterAddr,
83}
84
85impl StatKey {
86    pub(crate) const PREFIX_SIZE: usize =
87        ShardIdent::SIZE_HINT + QueuePartitionIdx::SIZE_HINT + QueueKey::SIZE_HINT;
88    pub fn new(
89        shard_ident: ShardIdent,
90        partition: QueuePartitionIdx,
91        max_message: QueueKey,
92        dest: RouterAddr,
93    ) -> Self {
94        Self {
95            shard_ident,
96            partition,
97            max_message,
98            dest,
99        }
100    }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104pub struct DiffTailKey {
105    pub shard_ident: ShardIdent,
106    pub max_message: QueueKey,
107}
108
109impl DiffTailKey {
110    pub fn new(shard_ident: ShardIdent, max_message: QueueKey) -> Self {
111        Self {
112            shard_ident,
113            max_message,
114        }
115    }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
119pub struct DiffInfoKey {
120    pub shard_ident: ShardIdent,
121    pub seqno: u32,
122}
123
124impl DiffInfoKey {
125    pub fn new(shard_ident: ShardIdent, seqno: u32) -> Self {
126        Self { shard_ident, seqno }
127    }
128}
129
130impl StoredValue for StatKey {
131    const SIZE_HINT: usize = ShardIdent::SIZE_HINT
132        + QueuePartitionIdx::SIZE_HINT
133        + QueueKey::SIZE_HINT
134        + RouterAddr::SIZE_HINT;
135
136    type OnStackSlice = [u8; Self::SIZE_HINT];
137
138    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
139        self.shard_ident.serialize(buffer);
140        self.partition.serialize(buffer);
141        self.max_message.serialize(buffer);
142        self.dest.serialize(buffer);
143    }
144
145    fn deserialize(reader: &mut &[u8]) -> Self {
146        if reader.len() < Self::SIZE_HINT {
147            panic!("Insufficient data for deserialization");
148        }
149
150        let shard_ident = ShardIdent::deserialize(reader);
151        let partition = QueuePartitionIdx::deserialize(reader);
152        let max_message = QueueKey::deserialize(reader);
153        let dest = RouterAddr::deserialize(reader);
154
155        Self {
156            shard_ident,
157            partition,
158            max_message,
159            dest,
160        }
161    }
162}
163
164impl StoredValue for DiffTailKey {
165    const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT;
166    type OnStackSlice = [u8; Self::SIZE_HINT];
167
168    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
169        self.shard_ident.serialize(buffer);
170        self.max_message.serialize(buffer);
171    }
172
173    fn deserialize(reader: &mut &[u8]) -> Self
174    where
175        Self: Sized,
176    {
177        if reader.len() < Self::SIZE_HINT {
178            panic!("Insufficient data for deserialization");
179        }
180
181        let shard_ident = ShardIdent::deserialize(reader);
182        let max_message = QueueKey::deserialize(reader);
183
184        Self {
185            shard_ident,
186            max_message,
187        }
188    }
189}
190
191impl StoredValue for DiffInfoKey {
192    const SIZE_HINT: usize = ShardIdent::SIZE_HINT + 4;
193    type OnStackSlice = [u8; Self::SIZE_HINT];
194
195    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
196        self.shard_ident.serialize(buffer);
197        buffer.write_raw_slice(&self.seqno.to_be_bytes());
198    }
199
200    fn deserialize(reader: &mut &[u8]) -> Self
201    where
202        Self: Sized,
203    {
204        if reader.len() < Self::SIZE_HINT {
205            panic!("Insufficient data for deserialization");
206        }
207
208        let shard_ident = ShardIdent::deserialize(reader);
209        let mut seqno_bytes = [0u8; 4];
210        seqno_bytes.copy_from_slice(&reader[..4]);
211        let seqno = u32::from_be_bytes(seqno_bytes);
212        *reader = &reader[4..];
213
214        Self { shard_ident, seqno }
215    }
216}
217
218#[derive(Debug)]
219pub struct QueueRange {
220    pub shard_ident: ShardIdent,
221    pub partition: QueuePartitionIdx,
222    pub from: QueueKey,
223    pub to: QueueKey,
224}
225
226#[test]
227fn diff_info_key_serialization() {
228    let key = DiffInfoKey::new(ShardIdent::MASTERCHAIN, 10);
229    let mut buffer = Vec::with_capacity(DiffInfoKey::SIZE_HINT);
230    key.serialize(&mut buffer);
231    let key2 = DiffInfoKey::deserialize(&mut buffer.as_slice());
232    assert_eq!(key, key2);
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
236pub struct DiffInfo {
237    pub min_message: QueueKey,
238    pub max_message: QueueKey,
239    pub shards_messages_count: FastHashMap<ShardIdent, u64>,
240    pub hash: HashBytes,
241    pub processed_to: BTreeMap<ShardIdent, QueueKey>,
242    pub router_partitions_src: RouterPartitions,
243    pub router_partitions_dst: RouterPartitions,
244    pub seqno: u32,
245}
246
247impl DiffInfo {
248    pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
249        self.shards_messages_count
250            .get(shard_ident)
251            .copied()
252            .unwrap_or_default()
253    }
254}
255
256impl TlWrite for DiffInfo {
257    type Repr = tl_proto::Boxed;
258
259    fn max_size_hint(&self) -> usize {
260        QueueKey::SIZE_HINT
261            + QueueKey::SIZE_HINT
262            + QueueKey::SIZE_HINT
263            + 4
264            + self.shards_messages_count.len() * (tl::shard_ident::SIZE_HINT + 8)
265            + tl::hash_bytes::SIZE_HINT
266    }
267
268    fn write_to<P: TlPacket>(&self, packet: &mut P) {
269        self.min_message.write_to(packet);
270        self.max_message.write_to(packet);
271        packet.write_u32(self.shards_messages_count.len() as u32);
272
273        for (shard_ident, count) in &self.shards_messages_count {
274            tl::shard_ident::write(shard_ident, packet);
275            packet.write_u64(*count);
276        }
277
278        tl::hash_bytes::write(&self.hash, packet);
279
280        processed_to_map::write(&self.processed_to, packet);
281        router_partitions_map::write(&self.router_partitions_src, packet);
282        router_partitions_map::write(&self.router_partitions_dst, packet);
283        packet.write_u32(self.seqno);
284    }
285}
286
287impl<'tl> TlRead<'tl> for DiffInfo {
288    type Repr = tl_proto::Boxed;
289
290    fn read_from(data: &mut &'tl [u8]) -> TlResult<Self> {
291        let min_message = QueueKey::read_from(data)?;
292        let max_message = QueueKey::read_from(data)?;
293
294        let len = u32::read_from(data)? as usize;
295        if len > 10_000_000 {
296            return Err(TlError::InvalidData);
297        }
298
299        let mut shards_messages_count =
300            FastHashMap::with_capacity_and_hasher(len, Default::default());
301
302        for _ in 0..len {
303            let shard_ident = tl::shard_ident::read(data)?;
304            let count = u64::read_from(data)?;
305            shards_messages_count.insert(shard_ident, count);
306        }
307
308        let hash = tl::hash_bytes::read(data)?;
309
310        let processed_to = processed_to_map::read(data)?;
311        let router_partitions_src = router_partitions_map::read(data)?;
312        let router_partitions_dst = router_partitions_map::read(data)?;
313        let seqno = u32::read_from(data)?;
314
315        Ok(DiffInfo {
316            min_message,
317            max_message,
318            shards_messages_count,
319            hash,
320            processed_to,
321            router_partitions_src,
322            router_partitions_dst,
323            seqno,
324        })
325    }
326}
327
328#[derive(Debug, Clone)]
329pub struct CommitPointerKey {
330    pub shard_ident: ShardIdent,
331}
332
333#[derive(Debug, Clone, Default)]
334
335pub struct CommitPointerValue {
336    pub queue_key: QueueKey,
337    pub seqno: u32,
338}
339
340impl StoredValue for CommitPointerKey {
341    const SIZE_HINT: usize = ShardIdent::SIZE_HINT;
342
343    type OnStackSlice = [u8; Self::SIZE_HINT];
344
345    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
346        self.shard_ident.serialize(buffer);
347    }
348
349    fn deserialize(reader: &mut &[u8]) -> Self {
350        if reader.len() < Self::SIZE_HINT {
351            panic!("Insufficient data for deserialization");
352        }
353
354        let shard_ident = ShardIdent::deserialize(reader);
355
356        Self { shard_ident }
357    }
358}
359
360impl StoredValue for CommitPointerValue {
361    const SIZE_HINT: usize = QueueKey::SIZE_HINT + 4;
362
363    type OnStackSlice = [u8; Self::SIZE_HINT];
364
365    fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
366        self.queue_key.serialize(buffer);
367        buffer.write_raw_slice(&self.seqno.to_be_bytes());
368    }
369
370    fn deserialize(reader: &mut &[u8]) -> Self {
371        if reader.len() < Self::SIZE_HINT {
372            panic!("Insufficient data for deserialization");
373        }
374
375        let queue_key = QueueKey::deserialize(reader);
376        let mut seqno_bytes = [0u8; 4];
377        seqno_bytes.copy_from_slice(&reader[..4]);
378        let seqno = u32::from_be_bytes(seqno_bytes);
379
380        Self { queue_key, seqno }
381    }
382}