1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use tl_proto::TlRead;
6use tycho_types::cell::Lazy;
7use tycho_types::error::Error;
8use tycho_types::models::*;
9use tycho_types::prelude::*;
10
11use crate::archive::WithArchiveData;
12use crate::queue::RouterPartitions;
13use crate::queue::proto::{QueueDiff, QueueKey};
14
15pub type QueueDiffStuffAug = WithArchiveData<QueueDiffStuff>;
16
17pub struct QueueDiffStuffBuilder {
18 inner: Arc<Inner>,
19}
20
21impl QueueDiffStuffBuilder {
22 pub fn serialize(mut self) -> SerializedQueueDiff {
23 let data = tl_proto::serialize(&self.inner.diff);
24 self.inner_mut().diff.hash = QueueDiff::compute_hash(&data);
25
26 SerializedQueueDiff {
27 inner: self.inner,
28 data,
29 }
30 }
31
32 pub fn with_processed_to(mut self, processed_to: BTreeMap<ShardIdent, QueueKey>) -> Self {
33 self.inner_mut().diff.processed_to = processed_to;
34 self
35 }
36
37 pub fn with_router(
38 mut self,
39 src_router: RouterPartitions,
40 dsc_router: RouterPartitions,
41 ) -> Self {
42 let inner = self.inner_mut();
43 inner.diff.router_partitions_src = src_router;
44 inner.diff.router_partitions_dst = dsc_router;
45 self
46 }
47
48 pub fn with_messages<'a, I>(
49 mut self,
50 min_message: &QueueKey,
51 max_message: &QueueKey,
52 hashes: I,
53 ) -> Self
54 where
55 I: IntoIterator<Item = &'a HashBytes>,
56 {
57 let inner = self.inner_mut();
58 inner.diff.min_message = *min_message;
59 inner.diff.max_message = *max_message;
60 inner.diff.messages = hashes.into_iter().copied().collect();
61 inner.diff.messages.sort_unstable();
62 self
63 }
64
65 fn inner_mut(&mut self) -> &mut Inner {
66 Arc::get_mut(&mut self.inner).expect("inner is not shared")
67 }
68}
69
70pub struct SerializedQueueDiff {
71 inner: Arc<Inner>,
72 data: Vec<u8>,
73}
74
75impl SerializedQueueDiff {
76 pub fn build(mut self, block_id: &BlockId) -> QueueDiffStuffAug {
77 let inner = self.inner_mut();
78 debug_assert_eq!(inner.diff.shard_ident, block_id.shard);
79 debug_assert_eq!(inner.diff.seqno, block_id.seqno);
80 inner.block_id = *block_id;
81
82 QueueDiffStuffAug::new(QueueDiffStuff { inner: self.inner }, self.data)
83 }
84
85 pub fn hash(&self) -> &HashBytes {
86 &self.inner.diff.hash
87 }
88
89 pub fn processed_to(&self) -> &BTreeMap<ShardIdent, QueueKey> {
90 &self.inner.diff.processed_to
91 }
92
93 fn inner_mut(&mut self) -> &mut Inner {
94 Arc::get_mut(&mut self.inner).expect("inner is not shared")
95 }
96}
97
98#[derive(Clone)]
99#[repr(transparent)]
100pub struct QueueDiffStuff {
101 inner: Arc<Inner>,
102}
103
104impl QueueDiffStuff {
105 pub fn new_empty(block_id: &BlockId) -> Self {
106 use std::collections::BTreeMap;
107
108 Self {
109 inner: Arc::new(Inner {
110 block_id: *block_id,
111 diff: QueueDiff {
112 hash: HashBytes::ZERO,
113 prev_hash: HashBytes::ZERO,
114 shard_ident: block_id.shard,
115 seqno: block_id.seqno,
116 processed_to: BTreeMap::from([(block_id.shard, QueueKey::MIN)]),
117 min_message: QueueKey::MIN,
118 max_message: QueueKey::MIN,
119 messages: Vec::new(),
120 router_partitions_src: Default::default(),
121 router_partitions_dst: Default::default(),
122 },
123 }),
124 }
125 }
126
127 #[cfg(any(test, feature = "test"))]
128 pub fn new(block_id: &BlockId, diff: QueueDiff) -> Self {
129 Self {
130 inner: Arc::new(Inner {
131 block_id: *block_id,
132 diff,
133 }),
134 }
135 }
136
137 pub fn builder(
138 shard_ident: ShardIdent,
139 seqno: u32,
140 prev_hash: &HashBytes,
141 ) -> QueueDiffStuffBuilder {
142 QueueDiffStuffBuilder {
143 inner: Arc::new(Inner {
144 block_id: BlockId::default(),
145 diff: QueueDiff {
146 hash: HashBytes::ZERO,
147 prev_hash: *prev_hash,
148 shard_ident,
149 seqno,
150 processed_to: Default::default(),
151 min_message: Default::default(),
152 max_message: Default::default(),
153 messages: Default::default(),
154 router_partitions_src: Default::default(),
155 router_partitions_dst: Default::default(),
156 },
157 }),
158 }
159 }
160
161 pub fn deserialize(block_id: &BlockId, data: &[u8]) -> Result<Self> {
162 let packet = &mut std::convert::identity(data);
163 let mut diff = QueueDiff::read_from(packet)?;
164 anyhow::ensure!(
165 block_id.shard == diff.shard_ident && block_id.seqno == diff.seqno,
166 "short block id mismatch"
167 );
168 anyhow::ensure!(packet.is_empty(), "unexpected data after the diff");
169
170 diff.hash = QueueDiff::compute_hash(data);
171
172 Ok(Self {
173 inner: Arc::new(Inner {
174 block_id: *block_id,
175 diff,
176 }),
177 })
178 }
179
180 pub fn block_id(&self) -> &BlockId {
181 &self.inner.block_id
182 }
183
184 pub fn diff_hash(&self) -> &HashBytes {
185 &self.inner.diff.hash
186 }
187
188 pub fn diff(&self) -> &QueueDiff {
190 &self.inner.diff
191 }
192
193 pub fn zip(&self, out_messages: &OutMsgDescr) -> QueueDiffMessagesIter {
194 QueueDiffMessagesIter {
195 index: 0,
196 out_messages: out_messages.dict().clone(),
197 inner: self.inner.clone(),
198 }
199 }
200}
201
202impl AsRef<QueueDiff> for QueueDiffStuff {
203 fn as_ref(&self) -> &QueueDiff {
204 &self.inner.diff
205 }
206}
207
208unsafe impl arc_swap::RefCnt for QueueDiffStuff {
209 type Base = Inner;
210
211 fn into_ptr(me: Self) -> *mut Self::Base {
212 arc_swap::RefCnt::into_ptr(me.inner)
213 }
214
215 fn as_ptr(me: &Self) -> *mut Self::Base {
216 arc_swap::RefCnt::as_ptr(&me.inner)
217 }
218
219 unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
220 Self {
221 inner: unsafe { arc_swap::RefCnt::from_ptr(ptr) },
222 }
223 }
224}
225
226#[doc(hidden)]
227pub struct Inner {
228 block_id: BlockId,
229 diff: QueueDiff,
230}
231
232#[derive(Clone)]
234pub struct QueueDiffMessagesIter {
235 index: usize,
236 out_messages: Dict<HashBytes, (CurrencyCollection, OutMsg)>,
237 inner: Arc<Inner>,
238}
239
240impl Iterator for QueueDiffMessagesIter {
241 type Item = Result<Lazy<OwnedMessage>, Error>;
242
243 fn next(&mut self) -> Option<Self::Item> {
244 let messages = &self.inner.diff.messages;
245 if self.index >= messages.len() {
246 return None;
247 }
248
249 let hash = &messages[self.index];
250 self.index += 1;
251
252 match self.out_messages.get(hash) {
253 Ok(Some((_, out_msg))) => {
254 let OutMsg::New(out_msg) = &out_msg else {
255 return Some(Err(Error::InvalidData));
256 };
257
258 let out_msg = out_msg.out_msg_envelope.inner();
260 let ref_count = out_msg.descriptor().reference_count();
261 if ref_count > 0
262 && let Some(cell) = out_msg.reference_cloned(ref_count - 1)
263 {
264 return Some(Lazy::from_raw(cell));
265 }
266
267 Some(Err(Error::InvalidData))
268 }
269 Ok(None) => Some(Err(Error::InvalidData)),
270 Err(e) => Some(Err(e)),
271 }
272 }
273
274 fn size_hint(&self) -> (usize, Option<usize>) {
275 let len = self.inner.diff.messages.len() - self.index;
277 (len, Some(len))
278 }
279}
280
281impl ExactSizeIterator for QueueDiffMessagesIter {
282 fn len(&self) -> usize {
283 self.inner.diff.messages.len() - self.index
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use tycho_types::num::Tokens;
291
292 use super::*;
293
294 #[test]
295 fn queue_diff_messages_iter() -> Result<()> {
296 let mut out_messages = Dict::<HashBytes, (CurrencyCollection, OutMsg)>::new();
297
298 let dummy_tx = Lazy::from_raw(Cell::default())?;
299
300 for i in 0..10 {
302 let message = Lazy::new(&Message {
303 info: MsgInfo::ExtOut(ExtOutMsgInfo {
304 src: IntAddr::Std(StdAddr::new(0, HashBytes::from([i as u8; 32]))),
305 dst: None,
306 created_lt: i,
307 created_at: 0,
308 }),
309 init: None,
310 body: Cell::empty_cell_ref().as_slice()?,
311 layout: None,
312 })?;
313
314 out_messages.set(
315 message.inner().repr_hash(),
316 (
317 CurrencyCollection::ZERO,
318 OutMsg::External(OutMsgExternal {
319 out_msg: message.cast_ref().clone(),
320 transaction: dummy_tx.clone(),
321 }),
322 ),
323 )?;
324 }
325
326 let mut message_hashes = Vec::new();
328 for i in 0..10 {
329 let addr = IntAddr::Std(StdAddr::new(0, HashBytes::from([i as u8; 32])));
330
331 let message = Lazy::new(&Message {
332 info: MsgInfo::Int(IntMsgInfo {
333 src: addr.clone(),
334 dst: addr,
335 created_lt: i,
336 ..Default::default()
337 }),
338 init: None,
339 body: Cell::empty_cell_ref().as_slice()?,
340 layout: None,
341 })?;
342
343 let message_hash = *message.inner().repr_hash();
344 message_hashes.push(message_hash);
345
346 let envelope = Lazy::new(&MsgEnvelope {
347 cur_addr: IntermediateAddr::FULL_SRC_SAME_WORKCHAIN,
348 next_addr: IntermediateAddr::FULL_DEST_SAME_WORKCHAIN,
349 fwd_fee_remaining: Tokens::ZERO,
350 message: message.cast_into(),
351 })?;
352
353 out_messages.set(
354 message_hash,
355 (
356 CurrencyCollection::ZERO,
357 OutMsg::New(OutMsgNew {
358 out_msg_envelope: envelope,
359 transaction: dummy_tx.clone(),
360 }),
361 ),
362 )?;
363 }
364
365 let out_messages = AugDict::from_parts(out_messages, CurrencyCollection::ZERO);
366
367 message_hashes.sort_unstable();
369 assert_eq!(message_hashes.len(), 10);
370
371 let diff = QueueDiffStuff {
372 inner: Arc::new(Inner {
373 block_id: BlockId::default(),
374 diff: QueueDiff {
375 hash: HashBytes::ZERO,
376 prev_hash: HashBytes::ZERO,
377 shard_ident: ShardIdent::BASECHAIN,
378 seqno: 1,
379 processed_to: Default::default(),
380 min_message: QueueKey {
381 lt: 0,
382 hash: message_hashes[0],
383 },
384 max_message: QueueKey {
385 lt: 9,
386 hash: message_hashes[9],
387 },
388 messages: message_hashes.clone(),
389 router_partitions_src: Default::default(),
390 router_partitions_dst: Default::default(),
391 },
392 }),
393 };
394
395 let mut messages_iter = diff.zip(&out_messages);
397 assert_eq!(message_hashes.len(), messages_iter.len());
398
399 for expected_hash in message_hashes {
400 let message = messages_iter.next().unwrap()?;
401 assert_eq!(expected_hash, *message.inner().repr_hash());
402
403 message.load().unwrap();
404 }
405
406 assert!(messages_iter.next().is_none());
407 assert_eq!(messages_iter.len(), 0);
408 assert_eq!(messages_iter.size_hint(), (0, Some(0)));
409
410 Ok(())
411 }
412}