1use std::collections::BTreeMap;
2
3use tl_proto::{TlError, TlPacket, TlRead, TlResult, TlWrite};
4use tycho_block_util::queue::{
5 QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions, processed_to_map,
6 router_partitions_map,
7};
8use tycho_block_util::tl;
9use tycho_storage::kv::{StoredValue, StoredValueBuffer};
10use tycho_types::cell::HashBytes;
11use tycho_types::models::ShardIdent;
12use tycho_util::FastHashMap;
13
14pub struct InternalQueueMessage<'a> {
15 pub key: ShardsInternalMessagesKey,
16 pub workchain: i8,
17 pub prefix: u64,
18 pub message_boc: &'a [u8],
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
22pub struct ShardsInternalMessagesKey {
23 pub partition: QueuePartitionIdx,
24 pub shard_ident: ShardIdent,
25 pub internal_message_key: QueueKey,
26}
27
28impl ShardsInternalMessagesKey {
29 pub fn new(
30 partition: QueuePartitionIdx,
31 shard_ident: ShardIdent,
32 internal_message_key: QueueKey,
33 ) -> Self {
34 Self {
35 partition,
36 shard_ident,
37 internal_message_key,
38 }
39 }
40}
41
42impl From<&[u8]> for ShardsInternalMessagesKey {
43 fn from(bytes: &[u8]) -> Self {
44 let mut reader = bytes;
45 Self::deserialize(&mut reader)
46 }
47}
48
49impl StoredValue for ShardsInternalMessagesKey {
50 const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT;
51
52 type OnStackSlice = [u8; Self::SIZE_HINT];
53
54 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
55 self.partition.serialize(buffer);
56 self.shard_ident.serialize(buffer);
57 self.internal_message_key.serialize(buffer);
58 }
59
60 fn deserialize(reader: &mut &[u8]) -> Self {
61 if reader.len() < Self::SIZE_HINT {
62 panic!("Insufficient data for deserialization")
63 }
64
65 let partition = QueuePartitionIdx::deserialize(reader);
66 let shard_ident = ShardIdent::deserialize(reader);
67 let internal_message_key = QueueKey::deserialize(reader);
68
69 Self {
70 partition,
71 shard_ident,
72 internal_message_key,
73 }
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
78pub struct StatKey {
79 pub shard_ident: ShardIdent,
80 pub partition: QueuePartitionIdx,
81 pub max_message: QueueKey,
82 pub dest: RouterAddr,
83}
84
85impl StatKey {
86 pub(crate) const PREFIX_SIZE: usize =
87 ShardIdent::SIZE_HINT + QueuePartitionIdx::SIZE_HINT + QueueKey::SIZE_HINT;
88 pub fn new(
89 shard_ident: ShardIdent,
90 partition: QueuePartitionIdx,
91 max_message: QueueKey,
92 dest: RouterAddr,
93 ) -> Self {
94 Self {
95 shard_ident,
96 partition,
97 max_message,
98 dest,
99 }
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104pub struct DiffTailKey {
105 pub shard_ident: ShardIdent,
106 pub max_message: QueueKey,
107}
108
109impl DiffTailKey {
110 pub fn new(shard_ident: ShardIdent, max_message: QueueKey) -> Self {
111 Self {
112 shard_ident,
113 max_message,
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
119pub struct DiffInfoKey {
120 pub shard_ident: ShardIdent,
121 pub seqno: u32,
122}
123
124impl DiffInfoKey {
125 pub fn new(shard_ident: ShardIdent, seqno: u32) -> Self {
126 Self { shard_ident, seqno }
127 }
128}
129
130impl StoredValue for StatKey {
131 const SIZE_HINT: usize = ShardIdent::SIZE_HINT
132 + QueuePartitionIdx::SIZE_HINT
133 + QueueKey::SIZE_HINT
134 + RouterAddr::SIZE_HINT;
135
136 type OnStackSlice = [u8; Self::SIZE_HINT];
137
138 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
139 self.shard_ident.serialize(buffer);
140 self.partition.serialize(buffer);
141 self.max_message.serialize(buffer);
142 self.dest.serialize(buffer);
143 }
144
145 fn deserialize(reader: &mut &[u8]) -> Self {
146 if reader.len() < Self::SIZE_HINT {
147 panic!("Insufficient data for deserialization");
148 }
149
150 let shard_ident = ShardIdent::deserialize(reader);
151 let partition = QueuePartitionIdx::deserialize(reader);
152 let max_message = QueueKey::deserialize(reader);
153 let dest = RouterAddr::deserialize(reader);
154
155 Self {
156 shard_ident,
157 partition,
158 max_message,
159 dest,
160 }
161 }
162}
163
164impl StoredValue for DiffTailKey {
165 const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT;
166 type OnStackSlice = [u8; Self::SIZE_HINT];
167
168 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
169 self.shard_ident.serialize(buffer);
170 self.max_message.serialize(buffer);
171 }
172
173 fn deserialize(reader: &mut &[u8]) -> Self
174 where
175 Self: Sized,
176 {
177 if reader.len() < Self::SIZE_HINT {
178 panic!("Insufficient data for deserialization");
179 }
180
181 let shard_ident = ShardIdent::deserialize(reader);
182 let max_message = QueueKey::deserialize(reader);
183
184 Self {
185 shard_ident,
186 max_message,
187 }
188 }
189}
190
191impl StoredValue for DiffInfoKey {
192 const SIZE_HINT: usize = ShardIdent::SIZE_HINT + 4;
193 type OnStackSlice = [u8; Self::SIZE_HINT];
194
195 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
196 self.shard_ident.serialize(buffer);
197 buffer.write_raw_slice(&self.seqno.to_be_bytes());
198 }
199
200 fn deserialize(reader: &mut &[u8]) -> Self
201 where
202 Self: Sized,
203 {
204 if reader.len() < Self::SIZE_HINT {
205 panic!("Insufficient data for deserialization");
206 }
207
208 let shard_ident = ShardIdent::deserialize(reader);
209 let mut seqno_bytes = [0u8; 4];
210 seqno_bytes.copy_from_slice(&reader[..4]);
211 let seqno = u32::from_be_bytes(seqno_bytes);
212 *reader = &reader[4..];
213
214 Self { shard_ident, seqno }
215 }
216}
217
218#[derive(Debug)]
219pub struct QueueRange {
220 pub shard_ident: ShardIdent,
221 pub partition: QueuePartitionIdx,
222 pub from: QueueKey,
223 pub to: QueueKey,
224}
225
226#[test]
227fn diff_info_key_serialization() {
228 let key = DiffInfoKey::new(ShardIdent::MASTERCHAIN, 10);
229 let mut buffer = Vec::with_capacity(DiffInfoKey::SIZE_HINT);
230 key.serialize(&mut buffer);
231 let key2 = DiffInfoKey::deserialize(&mut buffer.as_slice());
232 assert_eq!(key, key2);
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
236pub struct DiffInfo {
237 pub min_message: QueueKey,
238 pub max_message: QueueKey,
239 pub shards_messages_count: FastHashMap<ShardIdent, u64>,
240 pub hash: HashBytes,
241 pub processed_to: BTreeMap<ShardIdent, QueueKey>,
242 pub router_partitions_src: RouterPartitions,
243 pub router_partitions_dst: RouterPartitions,
244 pub seqno: u32,
245}
246
247impl DiffInfo {
248 pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 {
249 self.shards_messages_count
250 .get(shard_ident)
251 .copied()
252 .unwrap_or_default()
253 }
254}
255
256impl TlWrite for DiffInfo {
257 type Repr = tl_proto::Boxed;
258
259 fn max_size_hint(&self) -> usize {
260 QueueKey::SIZE_HINT
261 + QueueKey::SIZE_HINT
262 + QueueKey::SIZE_HINT
263 + 4
264 + self.shards_messages_count.len() * (tl::shard_ident::SIZE_HINT + 8)
265 + tl::hash_bytes::SIZE_HINT
266 }
267
268 fn write_to<P: TlPacket>(&self, packet: &mut P) {
269 self.min_message.write_to(packet);
270 self.max_message.write_to(packet);
271 packet.write_u32(self.shards_messages_count.len() as u32);
272
273 for (shard_ident, count) in &self.shards_messages_count {
274 tl::shard_ident::write(shard_ident, packet);
275 packet.write_u64(*count);
276 }
277
278 tl::hash_bytes::write(&self.hash, packet);
279
280 processed_to_map::write(&self.processed_to, packet);
281 router_partitions_map::write(&self.router_partitions_src, packet);
282 router_partitions_map::write(&self.router_partitions_dst, packet);
283 packet.write_u32(self.seqno);
284 }
285}
286
287impl<'tl> TlRead<'tl> for DiffInfo {
288 type Repr = tl_proto::Boxed;
289
290 fn read_from(data: &mut &'tl [u8]) -> TlResult<Self> {
291 let min_message = QueueKey::read_from(data)?;
292 let max_message = QueueKey::read_from(data)?;
293
294 let len = u32::read_from(data)? as usize;
295 if len > 10_000_000 {
296 return Err(TlError::InvalidData);
297 }
298
299 let mut shards_messages_count =
300 FastHashMap::with_capacity_and_hasher(len, Default::default());
301
302 for _ in 0..len {
303 let shard_ident = tl::shard_ident::read(data)?;
304 let count = u64::read_from(data)?;
305 shards_messages_count.insert(shard_ident, count);
306 }
307
308 let hash = tl::hash_bytes::read(data)?;
309
310 let processed_to = processed_to_map::read(data)?;
311 let router_partitions_src = router_partitions_map::read(data)?;
312 let router_partitions_dst = router_partitions_map::read(data)?;
313 let seqno = u32::read_from(data)?;
314
315 Ok(DiffInfo {
316 min_message,
317 max_message,
318 shards_messages_count,
319 hash,
320 processed_to,
321 router_partitions_src,
322 router_partitions_dst,
323 seqno,
324 })
325 }
326}
327
328#[derive(Debug, Clone)]
329pub struct CommitPointerKey {
330 pub shard_ident: ShardIdent,
331}
332
333#[derive(Debug, Clone, Default)]
334
335pub struct CommitPointerValue {
336 pub queue_key: QueueKey,
337 pub seqno: u32,
338}
339
340impl StoredValue for CommitPointerKey {
341 const SIZE_HINT: usize = ShardIdent::SIZE_HINT;
342
343 type OnStackSlice = [u8; Self::SIZE_HINT];
344
345 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
346 self.shard_ident.serialize(buffer);
347 }
348
349 fn deserialize(reader: &mut &[u8]) -> Self {
350 if reader.len() < Self::SIZE_HINT {
351 panic!("Insufficient data for deserialization");
352 }
353
354 let shard_ident = ShardIdent::deserialize(reader);
355
356 Self { shard_ident }
357 }
358}
359
360impl StoredValue for CommitPointerValue {
361 const SIZE_HINT: usize = QueueKey::SIZE_HINT + 4;
362
363 type OnStackSlice = [u8; Self::SIZE_HINT];
364
365 fn serialize<T: StoredValueBuffer>(&self, buffer: &mut T) {
366 self.queue_key.serialize(buffer);
367 buffer.write_raw_slice(&self.seqno.to_be_bytes());
368 }
369
370 fn deserialize(reader: &mut &[u8]) -> Self {
371 if reader.len() < Self::SIZE_HINT {
372 panic!("Insufficient data for deserialization");
373 }
374
375 let queue_key = QueueKey::deserialize(reader);
376 let mut seqno_bytes = [0u8; 4];
377 seqno_bytes.copy_from_slice(&reader[..4]);
378 let seqno = u32::from_be_bytes(seqno_bytes);
379
380 Self { queue_key, seqno }
381 }
382}