Skip to main content

reddb_file/serverless/
extent.rs

1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct ServerlessExtentRef {
5    pub collection: String,
6    pub range_start: Vec<u8>,
7    pub range_end: Vec<u8>,
8    pub relative_path: PathBuf,
9    pub offset: u64,
10    pub bytes: u64,
11    pub checksum: u32,
12    pub content_hash: ServerlessContentHash,
13    pub hot: bool,
14}
15
16impl ServerlessExtentRef {
17    pub fn new(
18        collection: impl Into<String>,
19        range_start: impl Into<Vec<u8>>,
20        range_end: impl Into<Vec<u8>>,
21        relative_path: impl Into<PathBuf>,
22        offset: u64,
23        payload: &[u8],
24        hot: bool,
25    ) -> RdbFileResult<Self> {
26        let range_start = range_start.into();
27        let range_end = range_end.into();
28        if !range_end.is_empty() && range_start >= range_end {
29            return Err(RdbFileError::InvalidOperation(
30                "serverless extent range_start must be before range_end".into(),
31            ));
32        }
33        Ok(Self {
34            collection: collection.into(),
35            range_start,
36            range_end,
37            relative_path: relative_path.into(),
38            offset,
39            bytes: payload.len() as u64,
40            checksum: crc32(payload),
41            content_hash: ServerlessContentHash::from_bytes(payload),
42            hot,
43        })
44    }
45
46    pub fn contains_key(&self, collection: &str, key: &[u8]) -> bool {
47        self.collection == collection
48            && key >= self.range_start.as_slice()
49            && (self.range_end.is_empty() || key < self.range_end.as_slice())
50    }
51
52    pub fn overlaps_range(&self, collection: &str, range_start: &[u8], range_end: &[u8]) -> bool {
53        if self.collection != collection {
54            return false;
55        }
56        let extent_ends_after_start =
57            self.range_end.is_empty() || self.range_end.as_slice() > range_start;
58        let extent_starts_before_end =
59            range_end.is_empty() || self.range_start.as_slice() < range_end;
60        extent_ends_after_start && extent_starts_before_end
61    }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct ServerlessExtentIndex {
66    pub generation: u64,
67    pub extents: Vec<ServerlessExtentRef>,
68}
69
70impl ServerlessExtentIndex {
71    pub fn new(generation: u64) -> Self {
72        Self {
73            generation,
74            extents: Vec::new(),
75        }
76    }
77
78    pub fn push(&mut self, extent: ServerlessExtentRef) {
79        self.extents.push(extent);
80        self.extents.sort_by(|left, right| {
81            (
82                left.collection.as_str(),
83                left.range_start.as_slice(),
84                left.relative_path.to_string_lossy(),
85                left.offset,
86            )
87                .cmp(&(
88                    right.collection.as_str(),
89                    right.range_start.as_slice(),
90                    right.relative_path.to_string_lossy(),
91                    right.offset,
92                ))
93        });
94    }
95
96    pub fn extents_for_key(&self, collection: &str, key: &[u8]) -> Vec<&ServerlessExtentRef> {
97        self.extents
98            .iter()
99            .filter(|extent| extent.contains_key(collection, key))
100            .collect()
101    }
102
103    pub fn extents_for_range(
104        &self,
105        collection: &str,
106        range_start: &[u8],
107        range_end: &[u8],
108    ) -> RdbFileResult<Vec<&ServerlessExtentRef>> {
109        if !range_end.is_empty() && range_start >= range_end {
110            return Err(RdbFileError::InvalidOperation(
111                "serverless hydration range_start must be before range_end".into(),
112            ));
113        }
114        Ok(self
115            .extents
116            .iter()
117            .filter(|extent| extent.overlaps_range(collection, range_start, range_end))
118            .collect())
119    }
120
121    pub fn hot_prefetch_paths(&self) -> Vec<PathBuf> {
122        let mut paths: Vec<PathBuf> = self
123            .extents
124            .iter()
125            .filter(|extent| extent.hot)
126            .map(|extent| extent.relative_path.clone())
127            .collect();
128        paths.sort();
129        paths.dedup();
130        paths
131    }
132
133    pub fn hydration_plan_for_key(&self, collection: &str, key: &[u8]) -> ServerlessHydrationPlan {
134        ServerlessHydrationPlan {
135            generation: self.generation,
136            requests: self
137                .extents_for_key(collection, key)
138                .into_iter()
139                .map(ServerlessHydrationRequest::from_extent)
140                .collect(),
141        }
142    }
143
144    pub fn hydration_plan_for_range(
145        &self,
146        collection: &str,
147        range_start: &[u8],
148        range_end: &[u8],
149    ) -> RdbFileResult<ServerlessHydrationPlan> {
150        Ok(ServerlessHydrationPlan {
151            generation: self.generation,
152            requests: self
153                .extents_for_range(collection, range_start, range_end)?
154                .into_iter()
155                .map(ServerlessHydrationRequest::from_extent)
156                .collect(),
157        })
158    }
159
160    pub fn hot_hydration_plan(&self) -> ServerlessHydrationPlan {
161        ServerlessHydrationPlan {
162            generation: self.generation,
163            requests: self
164                .extents
165                .iter()
166                .filter(|extent| extent.hot)
167                .map(ServerlessHydrationRequest::from_extent)
168                .collect(),
169        }
170    }
171
172    pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
173        write_bytes(path, &self.encode())
174    }
175
176    pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
177        Self::decode(&fs::read(path)?)
178    }
179
180    pub fn encode(&self) -> Vec<u8> {
181        let mut out = Vec::new();
182        out.extend_from_slice(SERVERLESS_EXTENT_INDEX_MAGIC);
183        put_u16(&mut out, SERVERLESS_ARTIFACT_VERSION);
184        put_u64(&mut out, self.generation);
185        put_u32(&mut out, self.extents.len() as u32);
186        for extent in &self.extents {
187            put_string(&mut out, &extent.collection);
188            put_bytes(&mut out, &extent.range_start);
189            put_bytes(&mut out, &extent.range_end);
190            put_string(&mut out, &extent.relative_path.to_string_lossy());
191            put_u64(&mut out, extent.offset);
192            put_u64(&mut out, extent.bytes);
193            put_u32(&mut out, extent.checksum);
194            put_content_hash(&mut out, extent.content_hash);
195            out.push(u8::from(extent.hot));
196        }
197        let checksum = crc32(&out);
198        put_u32(&mut out, checksum);
199        out
200    }
201
202    pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
203        verify_checksum(bytes)?;
204        let mut cursor = 0usize;
205        expect_magic(bytes, &mut cursor, SERVERLESS_EXTENT_INDEX_MAGIC)?;
206        let version = take_u16(bytes, &mut cursor)?;
207        if version != SERVERLESS_ARTIFACT_VERSION {
208            return Err(RdbFileError::InvalidOperation(format!(
209                "unsupported serverless extent index version {version}"
210            )));
211        }
212        let generation = take_u64(bytes, &mut cursor)?;
213        let count = take_u32(bytes, &mut cursor)? as usize;
214        let mut index = Self::new(generation);
215        for _ in 0..count {
216            let collection = take_string(bytes, &mut cursor)?;
217            let range_start = take_vec_bytes(bytes, &mut cursor)?;
218            let range_end = take_vec_bytes(bytes, &mut cursor)?;
219            let relative_path = PathBuf::from(take_string(bytes, &mut cursor)?);
220            let offset = take_u64(bytes, &mut cursor)?;
221            let bytes_len = take_u64(bytes, &mut cursor)?;
222            let checksum = take_u32(bytes, &mut cursor)?;
223            let content_hash = take_content_hash(bytes, &mut cursor)?;
224            let hot = take_u8(bytes, &mut cursor)? != 0;
225            if !range_end.is_empty() && range_start >= range_end {
226                return Err(RdbFileError::InvalidOperation(
227                    "serverless extent range_start must be before range_end".into(),
228                ));
229            }
230            index.push(ServerlessExtentRef {
231                collection,
232                range_start,
233                range_end,
234                relative_path,
235                offset,
236                bytes: bytes_len,
237                checksum,
238                content_hash,
239                hot,
240            });
241        }
242        reject_trailing_bytes(bytes, cursor)?;
243        Ok(index)
244    }
245}