tycho_block_util/queue/
queue_diff.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use tl_proto::TlRead;
6use tycho_types::cell::Lazy;
7use tycho_types::error::Error;
8use tycho_types::models::*;
9use tycho_types::prelude::*;
10
11use crate::archive::WithArchiveData;
12use crate::queue::RouterPartitions;
13use crate::queue::proto::{QueueDiff, QueueKey};
14
15pub type QueueDiffStuffAug = WithArchiveData<QueueDiffStuff>;
16
17pub struct QueueDiffStuffBuilder {
18    inner: Arc<Inner>,
19}
20
21impl QueueDiffStuffBuilder {
22    pub fn serialize(mut self) -> SerializedQueueDiff {
23        let data = tl_proto::serialize(&self.inner.diff);
24        self.inner_mut().diff.hash = QueueDiff::compute_hash(&data);
25
26        SerializedQueueDiff {
27            inner: self.inner,
28            data,
29        }
30    }
31
32    pub fn with_processed_to(mut self, processed_to: BTreeMap<ShardIdent, QueueKey>) -> Self {
33        self.inner_mut().diff.processed_to = processed_to;
34        self
35    }
36
37    pub fn with_router(
38        mut self,
39        src_router: RouterPartitions,
40        dsc_router: RouterPartitions,
41    ) -> Self {
42        let inner = self.inner_mut();
43        inner.diff.router_partitions_src = src_router;
44        inner.diff.router_partitions_dst = dsc_router;
45        self
46    }
47
48    pub fn with_messages<'a, I>(
49        mut self,
50        min_message: &QueueKey,
51        max_message: &QueueKey,
52        hashes: I,
53    ) -> Self
54    where
55        I: IntoIterator<Item = &'a HashBytes>,
56    {
57        let inner = self.inner_mut();
58        inner.diff.min_message = *min_message;
59        inner.diff.max_message = *max_message;
60        inner.diff.messages = hashes.into_iter().copied().collect();
61        inner.diff.messages.sort_unstable();
62        self
63    }
64
65    fn inner_mut(&mut self) -> &mut Inner {
66        Arc::get_mut(&mut self.inner).expect("inner is not shared")
67    }
68}
69
70pub struct SerializedQueueDiff {
71    inner: Arc<Inner>,
72    data: Vec<u8>,
73}
74
75impl SerializedQueueDiff {
76    pub fn build(mut self, block_id: &BlockId) -> QueueDiffStuffAug {
77        let inner = self.inner_mut();
78        debug_assert_eq!(inner.diff.shard_ident, block_id.shard);
79        debug_assert_eq!(inner.diff.seqno, block_id.seqno);
80        inner.block_id = *block_id;
81
82        QueueDiffStuffAug::new(QueueDiffStuff { inner: self.inner }, self.data)
83    }
84
85    pub fn hash(&self) -> &HashBytes {
86        &self.inner.diff.hash
87    }
88
89    pub fn processed_to(&self) -> &BTreeMap<ShardIdent, QueueKey> {
90        &self.inner.diff.processed_to
91    }
92
93    fn inner_mut(&mut self) -> &mut Inner {
94        Arc::get_mut(&mut self.inner).expect("inner is not shared")
95    }
96}
97
98#[derive(Clone)]
99#[repr(transparent)]
100pub struct QueueDiffStuff {
101    inner: Arc<Inner>,
102}
103
104impl QueueDiffStuff {
105    pub fn new_empty(block_id: &BlockId) -> Self {
106        use std::collections::BTreeMap;
107
108        Self {
109            inner: Arc::new(Inner {
110                block_id: *block_id,
111                diff: QueueDiff {
112                    hash: HashBytes::ZERO,
113                    prev_hash: HashBytes::ZERO,
114                    shard_ident: block_id.shard,
115                    seqno: block_id.seqno,
116                    processed_to: BTreeMap::from([(block_id.shard, QueueKey::MIN)]),
117                    min_message: QueueKey::MIN,
118                    max_message: QueueKey::MIN,
119                    messages: Vec::new(),
120                    router_partitions_src: Default::default(),
121                    router_partitions_dst: Default::default(),
122                },
123            }),
124        }
125    }
126
127    #[cfg(any(test, feature = "test"))]
128    pub fn new(block_id: &BlockId, diff: QueueDiff) -> Self {
129        Self {
130            inner: Arc::new(Inner {
131                block_id: *block_id,
132                diff,
133            }),
134        }
135    }
136
137    pub fn builder(
138        shard_ident: ShardIdent,
139        seqno: u32,
140        prev_hash: &HashBytes,
141    ) -> QueueDiffStuffBuilder {
142        QueueDiffStuffBuilder {
143            inner: Arc::new(Inner {
144                block_id: BlockId::default(),
145                diff: QueueDiff {
146                    hash: HashBytes::ZERO,
147                    prev_hash: *prev_hash,
148                    shard_ident,
149                    seqno,
150                    processed_to: Default::default(),
151                    min_message: Default::default(),
152                    max_message: Default::default(),
153                    messages: Default::default(),
154                    router_partitions_src: Default::default(),
155                    router_partitions_dst: Default::default(),
156                },
157            }),
158        }
159    }
160
161    pub fn deserialize(block_id: &BlockId, data: &[u8]) -> Result<Self> {
162        let packet = &mut std::convert::identity(data);
163        let mut diff = QueueDiff::read_from(packet)?;
164        anyhow::ensure!(
165            block_id.shard == diff.shard_ident && block_id.seqno == diff.seqno,
166            "short block id mismatch"
167        );
168        anyhow::ensure!(packet.is_empty(), "unexpected data after the diff");
169
170        diff.hash = QueueDiff::compute_hash(data);
171
172        Ok(Self {
173            inner: Arc::new(Inner {
174                block_id: *block_id,
175                diff,
176            }),
177        })
178    }
179
180    pub fn block_id(&self) -> &BlockId {
181        &self.inner.block_id
182    }
183
184    pub fn diff_hash(&self) -> &HashBytes {
185        &self.inner.diff.hash
186    }
187
188    // TODO: Use only `AsRef<QueueDiff>`?
189    pub fn diff(&self) -> &QueueDiff {
190        &self.inner.diff
191    }
192
193    pub fn zip(&self, out_messages: &OutMsgDescr) -> QueueDiffMessagesIter {
194        QueueDiffMessagesIter {
195            index: 0,
196            out_messages: out_messages.dict().clone(),
197            inner: self.inner.clone(),
198        }
199    }
200}
201
202impl AsRef<QueueDiff> for QueueDiffStuff {
203    fn as_ref(&self) -> &QueueDiff {
204        &self.inner.diff
205    }
206}
207
208unsafe impl arc_swap::RefCnt for QueueDiffStuff {
209    type Base = Inner;
210
211    fn into_ptr(me: Self) -> *mut Self::Base {
212        arc_swap::RefCnt::into_ptr(me.inner)
213    }
214
215    fn as_ptr(me: &Self) -> *mut Self::Base {
216        arc_swap::RefCnt::as_ptr(&me.inner)
217    }
218
219    unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
220        Self {
221            inner: unsafe { arc_swap::RefCnt::from_ptr(ptr) },
222        }
223    }
224}
225
226#[doc(hidden)]
227pub struct Inner {
228    block_id: BlockId,
229    diff: QueueDiff,
230}
231
232/// Iterator over the messages in the queue diff.
233#[derive(Clone)]
234pub struct QueueDiffMessagesIter {
235    index: usize,
236    out_messages: Dict<HashBytes, (CurrencyCollection, OutMsg)>,
237    inner: Arc<Inner>,
238}
239
240impl Iterator for QueueDiffMessagesIter {
241    type Item = Result<Lazy<OwnedMessage>, Error>;
242
243    fn next(&mut self) -> Option<Self::Item> {
244        let messages = &self.inner.diff.messages;
245        if self.index >= messages.len() {
246            return None;
247        }
248
249        let hash = &messages[self.index];
250        self.index += 1;
251
252        match self.out_messages.get(hash) {
253            Ok(Some((_, out_msg))) => {
254                let OutMsg::New(out_msg) = &out_msg else {
255                    return Some(Err(Error::InvalidData));
256                };
257
258                // Get the last ref from the envelope, it will be the message itself
259                let out_msg = out_msg.out_msg_envelope.inner();
260                let ref_count = out_msg.descriptor().reference_count();
261                if ref_count > 0
262                    && let Some(cell) = out_msg.reference_cloned(ref_count - 1)
263                {
264                    return Some(Lazy::from_raw(cell));
265                }
266
267                Some(Err(Error::InvalidData))
268            }
269            Ok(None) => Some(Err(Error::InvalidData)),
270            Err(e) => Some(Err(e)),
271        }
272    }
273
274    fn size_hint(&self) -> (usize, Option<usize>) {
275        // NOTE: `seld.index` increment stops at `len`
276        let len = self.inner.diff.messages.len() - self.index;
277        (len, Some(len))
278    }
279}
280
281impl ExactSizeIterator for QueueDiffMessagesIter {
282    fn len(&self) -> usize {
283        // NOTE: `seld.index` increment stops at `len`
284        self.inner.diff.messages.len() - self.index
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use tycho_types::num::Tokens;
291
292    use super::*;
293
294    #[test]
295    fn queue_diff_messages_iter() -> Result<()> {
296        let mut out_messages = Dict::<HashBytes, (CurrencyCollection, OutMsg)>::new();
297
298        let dummy_tx = Lazy::from_raw(Cell::default())?;
299
300        // Fill with external messages
301        for i in 0..10 {
302            let message = Lazy::new(&Message {
303                info: MsgInfo::ExtOut(ExtOutMsgInfo {
304                    src: IntAddr::Std(StdAddr::new(0, HashBytes::from([i as u8; 32]))),
305                    dst: None,
306                    created_lt: i,
307                    created_at: 0,
308                }),
309                init: None,
310                body: Cell::empty_cell_ref().as_slice()?,
311                layout: None,
312            })?;
313
314            out_messages.set(
315                message.inner().repr_hash(),
316                (
317                    CurrencyCollection::ZERO,
318                    OutMsg::External(OutMsgExternal {
319                        out_msg: message.cast_ref().clone(),
320                        transaction: dummy_tx.clone(),
321                    }),
322                ),
323            )?;
324        }
325
326        // Fill with outgoing messages
327        let mut message_hashes = Vec::new();
328        for i in 0..10 {
329            let addr = IntAddr::Std(StdAddr::new(0, HashBytes::from([i as u8; 32])));
330
331            let message = Lazy::new(&Message {
332                info: MsgInfo::Int(IntMsgInfo {
333                    src: addr.clone(),
334                    dst: addr,
335                    created_lt: i,
336                    ..Default::default()
337                }),
338                init: None,
339                body: Cell::empty_cell_ref().as_slice()?,
340                layout: None,
341            })?;
342
343            let message_hash = *message.inner().repr_hash();
344            message_hashes.push(message_hash);
345
346            let envelope = Lazy::new(&MsgEnvelope {
347                cur_addr: IntermediateAddr::FULL_SRC_SAME_WORKCHAIN,
348                next_addr: IntermediateAddr::FULL_DEST_SAME_WORKCHAIN,
349                fwd_fee_remaining: Tokens::ZERO,
350                message: message.cast_into(),
351            })?;
352
353            out_messages.set(
354                message_hash,
355                (
356                    CurrencyCollection::ZERO,
357                    OutMsg::New(OutMsgNew {
358                        out_msg_envelope: envelope,
359                        transaction: dummy_tx.clone(),
360                    }),
361                ),
362            )?;
363        }
364
365        let out_messages = AugDict::from_parts(out_messages, CurrencyCollection::ZERO);
366
367        // Create queue diff
368        message_hashes.sort_unstable();
369        assert_eq!(message_hashes.len(), 10);
370
371        let diff = QueueDiffStuff {
372            inner: Arc::new(Inner {
373                block_id: BlockId::default(),
374                diff: QueueDiff {
375                    hash: HashBytes::ZERO,
376                    prev_hash: HashBytes::ZERO,
377                    shard_ident: ShardIdent::BASECHAIN,
378                    seqno: 1,
379                    processed_to: Default::default(),
380                    min_message: QueueKey {
381                        lt: 0,
382                        hash: message_hashes[0],
383                    },
384                    max_message: QueueKey {
385                        lt: 9,
386                        hash: message_hashes[9],
387                    },
388                    messages: message_hashes.clone(),
389                    router_partitions_src: Default::default(),
390                    router_partitions_dst: Default::default(),
391                },
392            }),
393        };
394
395        // Verify messages
396        let mut messages_iter = diff.zip(&out_messages);
397        assert_eq!(message_hashes.len(), messages_iter.len());
398
399        for expected_hash in message_hashes {
400            let message = messages_iter.next().unwrap()?;
401            assert_eq!(expected_hash, *message.inner().repr_hash());
402
403            message.load().unwrap();
404        }
405
406        assert!(messages_iter.next().is_none());
407        assert_eq!(messages_iter.len(), 0);
408        assert_eq!(messages_iter.size_hint(), (0, Some(0)));
409
410        Ok(())
411    }
412}