Skip to main content

reddb_file/serverless/
secondary.rs

1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct ServerlessSecondaryIndexEntry {
5    pub collection: String,
6    pub range_start: Vec<u8>,
7    pub range_end: Vec<u8>,
8    pub relative_path: PathBuf,
9    pub offset: u64,
10    pub bytes: u64,
11    pub checksum: u32,
12    pub content_hash: ServerlessContentHash,
13    pub hot: bool,
14}
15
16impl From<&ServerlessExtentRef> for ServerlessSecondaryIndexEntry {
17    fn from(extent: &ServerlessExtentRef) -> Self {
18        Self {
19            collection: extent.collection.clone(),
20            range_start: extent.range_start.clone(),
21            range_end: extent.range_end.clone(),
22            relative_path: extent.relative_path.clone(),
23            offset: extent.offset,
24            bytes: extent.bytes,
25            checksum: extent.checksum,
26            content_hash: extent.content_hash,
27            hot: extent.hot,
28        }
29    }
30}
31
32impl ServerlessSecondaryIndexEntry {
33    pub fn hydration_request(&self) -> ServerlessHydrationRequest {
34        ServerlessHydrationRequest {
35            relative_path: self.relative_path.clone(),
36            offset: self.offset,
37            bytes: self.bytes,
38            checksum: self.checksum,
39            content_hash: self.content_hash,
40        }
41    }
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct ServerlessSecondaryIndex {
46    pub generation: u64,
47    pub entries: Vec<ServerlessSecondaryIndexEntry>,
48}
49
50impl ServerlessSecondaryIndex {
51    pub fn new(generation: u64) -> Self {
52        Self {
53            generation,
54            entries: Vec::new(),
55        }
56    }
57
58    pub fn from_extent_index(index: &ServerlessExtentIndex) -> Self {
59        let mut secondary = Self::new(index.generation);
60        for extent in &index.extents {
61            secondary.push(ServerlessSecondaryIndexEntry::from(extent));
62        }
63        secondary
64    }
65
66    pub fn push(&mut self, entry: ServerlessSecondaryIndexEntry) {
67        self.entries.push(entry);
68        self.entries.sort_by(|left, right| {
69            (
70                left.collection.as_str(),
71                left.range_start.as_slice(),
72                left.range_end.as_slice(),
73                left.relative_path.to_string_lossy(),
74                left.offset,
75            )
76                .cmp(&(
77                    right.collection.as_str(),
78                    right.range_start.as_slice(),
79                    right.range_end.as_slice(),
80                    right.relative_path.to_string_lossy(),
81                    right.offset,
82                ))
83        });
84    }
85
86    pub fn entries_for_collection(&self, collection: &str) -> Vec<&ServerlessSecondaryIndexEntry> {
87        self.entries
88            .iter()
89            .filter(|entry| entry.collection == collection)
90            .collect()
91    }
92
93    pub fn hydration_plan_for_collection(&self, collection: &str) -> ServerlessHydrationPlan {
94        ServerlessHydrationPlan {
95            generation: self.generation,
96            requests: self
97                .entries_for_collection(collection)
98                .into_iter()
99                .map(ServerlessSecondaryIndexEntry::hydration_request)
100                .collect(),
101        }
102    }
103
104    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
105        write_bytes(path, &self.encode())
106    }
107
108    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
109        Self::decode(&fs::read(path)?)
110    }
111
112    pub fn encode(&self) -> Vec<u8> {
113        let mut out = Vec::new();
114        out.extend_from_slice(SERVERLESS_SECONDARY_INDEX_MAGIC);
115        put_u16(&mut out, SERVERLESS_ARTIFACT_VERSION);
116        put_u64(&mut out, self.generation);
117        put_u32(&mut out, self.entries.len() as u32);
118        for entry in &self.entries {
119            put_string(&mut out, &entry.collection);
120            put_bytes(&mut out, &entry.range_start);
121            put_bytes(&mut out, &entry.range_end);
122            put_string(&mut out, &entry.relative_path.to_string_lossy());
123            put_u64(&mut out, entry.offset);
124            put_u64(&mut out, entry.bytes);
125            put_u32(&mut out, entry.checksum);
126            put_content_hash(&mut out, entry.content_hash);
127            out.push(u8::from(entry.hot));
128        }
129        let checksum = crc32(&out);
130        put_u32(&mut out, checksum);
131        out
132    }
133
134    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
135        verify_checksum(bytes)?;
136        let mut cursor = 0usize;
137        expect_magic(bytes, &mut cursor, SERVERLESS_SECONDARY_INDEX_MAGIC)?;
138        let version = take_u16(bytes, &mut cursor)?;
139        if version != SERVERLESS_ARTIFACT_VERSION {
140            return Err(RdbFileError::InvalidOperation(format!(
141                "unsupported serverless secondary index version {version}"
142            )));
143        }
144        let generation = take_u64(bytes, &mut cursor)?;
145        let count = take_u32(bytes, &mut cursor)? as usize;
146        let mut index = Self::new(generation);
147        for _ in 0..count {
148            let collection = take_string(bytes, &mut cursor)?;
149            let range_start = take_vec_bytes(bytes, &mut cursor)?;
150            let range_end = take_vec_bytes(bytes, &mut cursor)?;
151            if !range_end.is_empty() && range_start >= range_end {
152                return Err(RdbFileError::InvalidOperation(
153                    "serverless secondary index range_start must be before range_end".into(),
154                ));
155            }
156            index.push(ServerlessSecondaryIndexEntry {
157                collection,
158                range_start,
159                range_end,
160                relative_path: PathBuf::from(take_string(bytes, &mut cursor)?),
161                offset: take_u64(bytes, &mut cursor)?,
162                bytes: take_u64(bytes, &mut cursor)?,
163                checksum: take_u32(bytes, &mut cursor)?,
164                content_hash: take_content_hash(bytes, &mut cursor)?,
165                hot: take_u8(bytes, &mut cursor)? != 0,
166            });
167        }
168        reject_trailing_bytes(bytes, cursor)?;
169        Ok(index)
170    }
171}