tycho_collator/internal_queue/
iterator.rs1use std::cmp::Reverse;
2use std::collections::{BTreeMap, BinaryHeap};
3use std::sync::Arc;
4
5use anyhow::Result;
6use tycho_block_util::queue::QueueKey;
7use tycho_types::models::ShardIdent;
8use tycho_util::FastHashMap;
9
10use crate::internal_queue::state::state_iterator::MessageExt;
11use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
12use crate::internal_queue::types::message::InternalMessageValue;
13
14pub trait QueueIterator<V: InternalMessageValue>: Send {
15 fn next(&mut self, with_new: bool) -> Result<Option<IterItem<V>>>;
17 fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey>;
18 fn process_new_messages(&mut self) -> Result<Option<IterItem<V>>>;
19}
20
21pub struct QueueIteratorImpl<V: InternalMessageValue> {
22 messages_for_current_shard: BinaryHeap<Reverse<MessageExt<V>>>,
23 new_messages: BTreeMap<QueueKey, Arc<V>>,
24 iterators_manager: StatesIteratorsManager<V>,
25}
26
27impl<V: InternalMessageValue> QueueIteratorImpl<V> {
28 pub fn new(iterators_manager: StatesIteratorsManager<V>) -> Result<Self> {
29 let messages_for_current_shard = BinaryHeap::default();
30
31 Ok(Self {
32 messages_for_current_shard,
33 new_messages: Default::default(),
34 iterators_manager,
35 })
36 }
37}
38
39pub struct IterItem<V: InternalMessageValue> {
40 pub item: MessageExt<V>,
41 pub is_new: bool,
42}
43
44impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorImpl<V> {
45 fn next(&mut self, with_new: bool) -> Result<Option<IterItem<V>>> {
46 if let Some(next_message) = self.iterators_manager.next()? {
48 return Ok(Some(IterItem {
49 item: next_message,
50 is_new: false,
51 }));
52 }
53
54 if with_new {
56 return self.process_new_messages();
57 }
58
59 Ok(None)
60 }
61 fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey> {
62 self.iterators_manager.current_position()
63 }
64
65 fn process_new_messages(&mut self) -> Result<Option<IterItem<V>>> {
67 if let Some(next_message) = self.messages_for_current_shard.pop() {
68 self.new_messages.remove(&next_message.0.message.key());
70
71 return Ok(Some(IterItem {
72 item: next_message.0,
73 is_new: true,
74 }));
75 }
76 Ok(None)
77 }
78}