tycho_collator/internal_queue/state/
state_iterator.rs1use std::cmp::{Ordering, Reverse};
2use std::collections::hash_map::Entry;
3use std::collections::{BinaryHeap, HashSet};
4use std::sync::Arc;
5
6use ahash::HashMapExt;
7use anyhow::{Context, Result, bail};
8use tycho_block_util::queue::QueueKey;
9use tycho_types::models::ShardIdent;
10use tycho_util::FastHashMap;
11
12use crate::internal_queue::state::shard_iterator::{IterResult, ShardIterator};
13use crate::internal_queue::types::message::InternalMessageValue;
14use crate::storage::iterator::InternalQueueMessagesIter;
15
16pub struct ShardIteratorWithRange {
17 pub iter: InternalQueueMessagesIter,
18 pub range_start: QueueKey,
19 pub range_end: QueueKey,
20}
21
22impl ShardIteratorWithRange {
23 pub fn new(
24 iter: InternalQueueMessagesIter,
25 range_start: QueueKey,
26 range_end: QueueKey,
27 ) -> Self {
28 ShardIteratorWithRange {
29 iter,
30 range_start,
31 range_end,
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
37pub struct MessageExt<V: InternalMessageValue> {
38 pub source: ShardIdent,
39 pub message: Arc<V>,
40}
41
42impl<V: InternalMessageValue> MessageExt<V> {
43 pub fn new(source: ShardIdent, message: Arc<V>) -> Self {
44 MessageExt { source, message }
45 }
46}
47
48impl<V: InternalMessageValue> PartialEq for MessageExt<V> {
49 fn eq(&self, other: &Self) -> bool {
50 self.message == other.message
51 }
52}
53
54impl<V: InternalMessageValue> Eq for MessageExt<V> {}
55
56impl<V: InternalMessageValue> PartialOrd for MessageExt<V> {
57 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
58 Some(self.cmp(other))
59 }
60}
61
62impl<V: InternalMessageValue> Ord for MessageExt<V> {
63 fn cmp(&self, other: &Self) -> Ordering {
64 self.message.cmp(&other.message)
65 }
66}
67
68#[derive(Debug, Clone)]
69pub struct IterRange {
70 pub shard_id: ShardIdent,
71 pub key: QueueKey,
72}
73
74pub trait StateIterator<V: InternalMessageValue>: Send {
75 fn next(&mut self) -> Result<Option<MessageExt<V>>>;
76 fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey>;
77}
78
79pub struct StateIteratorImpl<V: InternalMessageValue> {
80 iters: FastHashMap<ShardIdent, ShardIterator>,
81 message_queue: BinaryHeap<Reverse<MessageExt<V>>>,
82 in_queue: HashSet<ShardIdent>,
83 current_position: FastHashMap<ShardIdent, QueueKey>,
84 iters_to_remove: Vec<ShardIdent>,
85}
86
87impl<V: InternalMessageValue> StateIteratorImpl<V> {
88 pub fn new(
89 shard_iters: Vec<(InternalQueueMessagesIter, ShardIdent)>,
90 receiver: ShardIdent,
91 ) -> Result<Self> {
92 let mut iters = FastHashMap::with_capacity(shard_iters.len());
93
94 for (iter, shard_ident) in shard_iters {
95 let shard_iterator = ShardIterator::new(receiver, iter);
96
97 match iters.entry(shard_ident) {
98 Entry::Occupied(_) => {
99 bail!("Iterator already exists for shard {:?}", shard_ident);
100 }
101 Entry::Vacant(entry) => {
102 entry.insert(shard_iterator);
103 }
104 }
105 }
106
107 Ok(Self {
108 iters,
109 message_queue: BinaryHeap::new(),
110 in_queue: HashSet::new(),
111 current_position: Default::default(),
112 iters_to_remove: Vec::new(),
113 })
114 }
115
116 fn refill_queue(&mut self) -> Result<()> {
117 self.iters_to_remove.clear();
118
119 'outer: for (shard_ident, iter) in &mut self.iters {
120 if self.in_queue.contains(shard_ident) {
121 continue;
122 }
123
124 while let Some(msg) = iter.next()? {
125 match msg {
126 IterResult::Value(value) => {
127 let message =
128 V::deserialize(value).context("Failed to deserialize message")?;
129
130 let message_ext = MessageExt::new(*shard_ident, Arc::new(message));
131
132 self.message_queue.push(Reverse(message_ext));
133 self.in_queue.insert(*shard_ident);
134 continue 'outer;
135 }
136 IterResult::Skip(Some((shard_partition, queue_key))) => {
138 self.current_position.insert(shard_partition, queue_key);
139 }
140 IterResult::Skip(None) => {}
142 }
143 }
144
145 self.iters_to_remove.push(*shard_ident);
147 }
148
149 for key in &self.iters_to_remove {
150 self.iters.remove(key);
151 }
152
153 Ok(())
154 }
155}
156
157impl<V: InternalMessageValue> StateIterator<V> for StateIteratorImpl<V> {
158 fn next(&mut self) -> Result<Option<MessageExt<V>>> {
159 self.refill_queue()?;
161
162 if let Some(Reverse(message)) = self.message_queue.pop() {
164 let message_key = message.message.key();
165 self.current_position.insert(message.source, message_key);
166
167 self.in_queue.remove(&message.source);
169 return Ok(Some(message));
170 }
171
172 Ok(None)
173 }
174
175 fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey> {
176 self.current_position.clone()
177 }
178}