nebari/tree/
by_sequence.rs

1use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
2
3use super::{btree_entry::Reducer, BinarySerialization, PagedWriter};
4use crate::{error::Error, tree::key_entry::ValueIndex, ArcBytes, ErrorKind};
5
6/// The index stored within [`VersionedTreeRoot::by_sequence_root`](crate::tree::VersionedTreeRoot::by_sequence_root).
7#[derive(Clone, Debug)]
8pub struct BySequenceIndex {
9    /// The key associated with this sequence id.
10    pub key: ArcBytes<'static>,
11    /// The previous sequence of this key.
12    pub last_sequence: Option<u64>,
13    /// The size of the value stored on disk.
14    pub value_length: u32,
15    /// The position of the value on disk.
16    pub position: u64,
17}
18
19impl BinarySerialization for BySequenceIndex {
20    fn serialize_to(
21        &mut self,
22        writer: &mut Vec<u8>,
23        _paged_writer: &mut PagedWriter<'_>,
24    ) -> Result<usize, Error> {
25        let mut bytes_written = 0;
26        writer.write_u32::<BigEndian>(self.value_length)?;
27        bytes_written += 4;
28        writer.write_u64::<BigEndian>(self.position)?;
29        bytes_written += 8;
30        writer.write_u64::<BigEndian>(self.last_sequence.unwrap_or(0))?;
31        bytes_written += 8;
32
33        let key_length = u16::try_from(self.key.len()).map_err(|_| ErrorKind::KeyTooLarge)?;
34        writer.write_u16::<BigEndian>(key_length)?;
35        bytes_written += 2;
36        writer.extend_from_slice(&self.key);
37        bytes_written += key_length as usize;
38        Ok(bytes_written)
39    }
40
41    fn deserialize_from(
42        reader: &mut ArcBytes<'_>,
43        _current_order: Option<usize>,
44    ) -> Result<Self, Error> {
45        let value_length = reader.read_u32::<BigEndian>()?;
46        let position = reader.read_u64::<BigEndian>()?;
47        let last_sequence = reader.read_u64::<BigEndian>()?;
48        let key_length = reader.read_u16::<BigEndian>()? as usize;
49        if key_length > reader.len() {
50            return Err(Error::data_integrity(format!(
51                "key length {} found but only {} bytes remaining",
52                key_length,
53                reader.len()
54            )));
55        }
56        let key = reader.read_bytes(key_length)?.into_owned();
57
58        Ok(Self {
59            key,
60            last_sequence: if last_sequence > 0 {
61                Some(last_sequence)
62            } else {
63                None
64            },
65            value_length,
66            position,
67        })
68    }
69}
70
71impl ValueIndex for BySequenceIndex {
72    fn position(&self) -> u64 {
73        self.position
74    }
75}
76
77/// The reduced index of [`BySequenceIndex`].
78#[derive(Clone, Debug)]
79pub struct BySequenceStats {
80    /// The total number of sequence entries.
81    pub total_sequences: u64,
82}
83
84impl BinarySerialization for BySequenceStats {
85    fn serialize_to(
86        &mut self,
87        writer: &mut Vec<u8>,
88        _paged_writer: &mut PagedWriter<'_>,
89    ) -> Result<usize, Error> {
90        writer.write_u64::<BigEndian>(self.total_sequences)?;
91        Ok(8)
92    }
93
94    fn deserialize_from(
95        reader: &mut ArcBytes<'_>,
96        _current_order: Option<usize>,
97    ) -> Result<Self, Error> {
98        let number_of_records = reader.read_u64::<BigEndian>()?;
99        Ok(Self {
100            total_sequences: number_of_records,
101        })
102    }
103}
104
105impl Reducer<BySequenceIndex> for BySequenceStats {
106    fn reduce<'a, Indexes, IndexesIter>(indexes: Indexes) -> Self
107    where
108        BySequenceIndex: 'a,
109        Indexes:
110            IntoIterator<Item = &'a BySequenceIndex, IntoIter = IndexesIter> + ExactSizeIterator,
111        IndexesIter: Iterator<Item = &'a BySequenceIndex> + ExactSizeIterator + Clone,
112    {
113        Self {
114            total_sequences: indexes.len() as u64,
115        }
116    }
117
118    fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(values: ReducedIndexes) -> Self
119    where
120        Self: 'a,
121        ReducedIndexes:
122            IntoIterator<Item = &'a Self, IntoIter = ReducedIndexesIter> + ExactSizeIterator,
123        ReducedIndexesIter: Iterator<Item = &'a Self> + ExactSizeIterator,
124    {
125        Self {
126            total_sequences: values.into_iter().map(|v| v.total_sequences).sum(),
127        }
128    }
129}