Skip to main content

tycho_collator/internal_queue/state/
state_iterator.rs

1use std::cmp::{Ordering, Reverse};
2use std::collections::hash_map::Entry;
3use std::collections::{BinaryHeap, HashSet};
4use std::sync::Arc;
5
6use ahash::HashMapExt;
7use anyhow::{Context, Result, bail};
8use tycho_block_util::queue::QueueKey;
9use tycho_types::models::ShardIdent;
10use tycho_util::FastHashMap;
11
12use crate::internal_queue::state::shard_iterator::{IterResult, ShardIterator};
13use crate::internal_queue::types::message::InternalMessageValue;
14use crate::storage::iterator::InternalQueueMessagesIter;
15
16pub struct ShardIteratorWithRange {
17    pub iter: InternalQueueMessagesIter,
18    pub range_start: QueueKey,
19    pub range_end: QueueKey,
20}
21
22impl ShardIteratorWithRange {
23    pub fn new(
24        iter: InternalQueueMessagesIter,
25        range_start: QueueKey,
26        range_end: QueueKey,
27    ) -> Self {
28        ShardIteratorWithRange {
29            iter,
30            range_start,
31            range_end,
32        }
33    }
34}
35
36#[derive(Debug, Clone)]
37pub struct MessageExt<V: InternalMessageValue> {
38    pub source: ShardIdent,
39    pub message: Arc<V>,
40}
41
42impl<V: InternalMessageValue> MessageExt<V> {
43    pub fn new(source: ShardIdent, message: Arc<V>) -> Self {
44        MessageExt { source, message }
45    }
46}
47
48impl<V: InternalMessageValue> PartialEq for MessageExt<V> {
49    fn eq(&self, other: &Self) -> bool {
50        self.message == other.message
51    }
52}
53
54impl<V: InternalMessageValue> Eq for MessageExt<V> {}
55
56impl<V: InternalMessageValue> PartialOrd for MessageExt<V> {
57    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
58        Some(self.cmp(other))
59    }
60}
61
62impl<V: InternalMessageValue> Ord for MessageExt<V> {
63    fn cmp(&self, other: &Self) -> Ordering {
64        self.message.cmp(&other.message)
65    }
66}
67
68#[derive(Debug, Clone)]
69pub struct IterRange {
70    pub shard_id: ShardIdent,
71    pub key: QueueKey,
72}
73
74pub trait StateIterator<V: InternalMessageValue>: Send {
75    fn next(&mut self) -> Result<Option<MessageExt<V>>>;
76    fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey>;
77}
78
79pub struct StateIteratorImpl<V: InternalMessageValue> {
80    iters: FastHashMap<ShardIdent, ShardIterator>,
81    message_queue: BinaryHeap<Reverse<MessageExt<V>>>,
82    in_queue: HashSet<ShardIdent>,
83    current_position: FastHashMap<ShardIdent, QueueKey>,
84    iters_to_remove: Vec<ShardIdent>,
85}
86
87impl<V: InternalMessageValue> StateIteratorImpl<V> {
88    pub fn new(
89        shard_iters: Vec<(InternalQueueMessagesIter, ShardIdent)>,
90        receiver: ShardIdent,
91    ) -> Result<Self> {
92        let mut iters = FastHashMap::with_capacity(shard_iters.len());
93
94        for (iter, shard_ident) in shard_iters {
95            let shard_iterator = ShardIterator::new(receiver, iter);
96
97            match iters.entry(shard_ident) {
98                Entry::Occupied(_) => {
99                    bail!("Iterator already exists for shard {:?}", shard_ident);
100                }
101                Entry::Vacant(entry) => {
102                    entry.insert(shard_iterator);
103                }
104            }
105        }
106
107        Ok(Self {
108            iters,
109            message_queue: BinaryHeap::new(),
110            in_queue: HashSet::new(),
111            current_position: Default::default(),
112            iters_to_remove: Vec::new(),
113        })
114    }
115
116    fn refill_queue(&mut self) -> Result<()> {
117        self.iters_to_remove.clear();
118
119        'outer: for (shard_ident, iter) in &mut self.iters {
120            if self.in_queue.contains(shard_ident) {
121                continue;
122            }
123
124            while let Some(msg) = iter.next()? {
125                match msg {
126                    IterResult::Value(value) => {
127                        let message =
128                            V::deserialize(value).context("Failed to deserialize message")?;
129
130                        let message_ext = MessageExt::new(*shard_ident, Arc::new(message));
131
132                        self.message_queue.push(Reverse(message_ext));
133                        self.in_queue.insert(*shard_ident);
134                        continue 'outer;
135                    }
136                    // skip if we are not receiver for this message
137                    IterResult::Skip(Some((shard_partition, queue_key))) => {
138                        self.current_position.insert(shard_partition, queue_key);
139                    }
140                    // skip if it's a first key in range
141                    IterResult::Skip(None) => {}
142                }
143            }
144
145            // remove iterator if it's empty
146            self.iters_to_remove.push(*shard_ident);
147        }
148
149        for key in &self.iters_to_remove {
150            self.iters.remove(key);
151        }
152
153        Ok(())
154    }
155}
156
157impl<V: InternalMessageValue> StateIterator<V> for StateIteratorImpl<V> {
158    fn next(&mut self) -> Result<Option<MessageExt<V>>> {
159        // refill queue for each shard in range
160        self.refill_queue()?;
161
162        // take ordered by lt+hash message from filled queue
163        if let Some(Reverse(message)) = self.message_queue.pop() {
164            let message_key = message.message.key();
165            self.current_position.insert(message.source, message_key);
166
167            // set shard as not in queue for refilling next time
168            self.in_queue.remove(&message.source);
169            return Ok(Some(message));
170        }
171
172        Ok(None)
173    }
174
175    fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey> {
176        self.current_position.clone()
177    }
178}