Skip to main content

tycho_collator/storage/
iterator.rs

1use anyhow::Result;
2use tycho_storage::kv::StoredValue;
3use weedb::OwnedRawIterator;
4
5use super::models::{InternalQueueMessage, ShardsInternalMessagesKey};
6
7/// Iterator for internal queue messages.
8pub struct InternalQueueMessagesIter {
9    pub inner: OwnedRawIterator,
10    pub first: bool,
11}
12
13impl InternalQueueMessagesIter {
14    /// Moves the iterator to the specified key.
15    ///
16    /// # Arguments
17    /// * `key` - The key to seek to.
18    pub fn seek(&mut self, key: &ShardsInternalMessagesKey) {
19        self.inner.seek(key.to_vec());
20        self.first = true;
21    }
22
23    /// Moves the iterator to the first element.
24    pub fn seek_to_first(&mut self) {
25        self.inner.seek_to_first();
26        self.first = true;
27    }
28
29    /// Retrieves the next message from the iterator.
30    ///
31    /// # Returns
32    /// * `Ok(Some(InternalQueueMessage<'_>))` if there is a next message.
33    /// * `Ok(None)` if the iterator has reached the end.
34    /// * `Err(anyhow::Error)` if an error occurs.
35    #[allow(clippy::should_implement_trait)]
36    pub fn next(&mut self) -> Result<Option<InternalQueueMessage<'_>>> {
37        if !std::mem::take(&mut self.first) {
38            self.inner.next();
39        }
40
41        let Some((key, value)) = self.inner.item() else {
42            return match self.inner.status() {
43                Ok(()) => Ok(None),
44                Err(e) => Err(e.into()),
45            };
46        };
47
48        let key = ShardsInternalMessagesKey::from(key);
49        Ok(Some(InternalQueueMessage {
50            key,
51            workchain: value[0] as i8,
52            prefix: u64::from_le_bytes(value[1..9].try_into().unwrap()),
53            message_boc: &value[9..],
54        }))
55    }
56}