Skip to main content

tycho_collator/internal_queue/
iterator.rs

1use std::cmp::Reverse;
2use std::collections::{BTreeMap, BinaryHeap};
3use std::sync::Arc;
4
5use anyhow::Result;
6use tycho_block_util::queue::QueueKey;
7use tycho_types::models::ShardIdent;
8use tycho_util::FastHashMap;
9
10use crate::internal_queue::state::state_iterator::MessageExt;
11use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
12use crate::internal_queue::types::message::InternalMessageValue;
13
14pub trait QueueIterator<V: InternalMessageValue>: Send {
15    /// Get next message
16    fn next(&mut self, with_new: bool) -> Result<Option<IterItem<V>>>;
17    fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey>;
18    fn process_new_messages(&mut self) -> Result<Option<IterItem<V>>>;
19}
20
21pub struct QueueIteratorImpl<V: InternalMessageValue> {
22    messages_for_current_shard: BinaryHeap<Reverse<MessageExt<V>>>,
23    new_messages: BTreeMap<QueueKey, Arc<V>>,
24    iterators_manager: StatesIteratorsManager<V>,
25}
26
27impl<V: InternalMessageValue> QueueIteratorImpl<V> {
28    pub fn new(iterators_manager: StatesIteratorsManager<V>) -> Result<Self> {
29        let messages_for_current_shard = BinaryHeap::default();
30
31        Ok(Self {
32            messages_for_current_shard,
33            new_messages: Default::default(),
34            iterators_manager,
35        })
36    }
37}
38
39pub struct IterItem<V: InternalMessageValue> {
40    pub item: MessageExt<V>,
41    pub is_new: bool,
42}
43
44impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorImpl<V> {
45    fn next(&mut self, with_new: bool) -> Result<Option<IterItem<V>>> {
46        // Process the next message from the snapshot manager
47        if let Some(next_message) = self.iterators_manager.next()? {
48            return Ok(Some(IterItem {
49                item: next_message,
50                is_new: false,
51            }));
52        }
53
54        // Process the new messages if required
55        if with_new {
56            return self.process_new_messages();
57        }
58
59        Ok(None)
60    }
61    fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey> {
62        self.iterators_manager.current_position()
63    }
64
65    // Function to process the new messages
66    fn process_new_messages(&mut self) -> Result<Option<IterItem<V>>> {
67        if let Some(next_message) = self.messages_for_current_shard.pop() {
68            // remove message from new_messages
69            self.new_messages.remove(&next_message.0.message.key());
70
71            return Ok(Some(IterItem {
72                item: next_message.0,
73                is_new: true,
74            }));
75        }
76        Ok(None)
77    }
78}