1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct ServerlessExtentRef {
5 pub collection: String,
6 pub range_start: Vec<u8>,
7 pub range_end: Vec<u8>,
8 pub relative_path: PathBuf,
9 pub offset: u64,
10 pub bytes: u64,
11 pub checksum: u32,
12 pub content_hash: ServerlessContentHash,
13 pub hot: bool,
14}
15
16impl ServerlessExtentRef {
17 pub fn new(
18 collection: impl Into<String>,
19 range_start: impl Into<Vec<u8>>,
20 range_end: impl Into<Vec<u8>>,
21 relative_path: impl Into<PathBuf>,
22 offset: u64,
23 payload: &[u8],
24 hot: bool,
25 ) -> RdbFileResult<Self> {
26 let range_start = range_start.into();
27 let range_end = range_end.into();
28 if !range_end.is_empty() && range_start >= range_end {
29 return Err(RdbFileError::InvalidOperation(
30 "serverless extent range_start must be before range_end".into(),
31 ));
32 }
33 Ok(Self {
34 collection: collection.into(),
35 range_start,
36 range_end,
37 relative_path: relative_path.into(),
38 offset,
39 bytes: payload.len() as u64,
40 checksum: crc32(payload),
41 content_hash: ServerlessContentHash::from_bytes(payload),
42 hot,
43 })
44 }
45
46 pub fn contains_key(&self, collection: &str, key: &[u8]) -> bool {
47 self.collection == collection
48 && key >= self.range_start.as_slice()
49 && (self.range_end.is_empty() || key < self.range_end.as_slice())
50 }
51
52 pub fn overlaps_range(&self, collection: &str, range_start: &[u8], range_end: &[u8]) -> bool {
53 if self.collection != collection {
54 return false;
55 }
56 let extent_ends_after_start =
57 self.range_end.is_empty() || self.range_end.as_slice() > range_start;
58 let extent_starts_before_end =
59 range_end.is_empty() || self.range_start.as_slice() < range_end;
60 extent_ends_after_start && extent_starts_before_end
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct ServerlessExtentIndex {
66 pub generation: u64,
67 pub extents: Vec<ServerlessExtentRef>,
68}
69
70impl ServerlessExtentIndex {
71 pub fn new(generation: u64) -> Self {
72 Self {
73 generation,
74 extents: Vec::new(),
75 }
76 }
77
78 pub fn push(&mut self, extent: ServerlessExtentRef) {
79 self.extents.push(extent);
80 self.extents.sort_by(|left, right| {
81 (
82 left.collection.as_str(),
83 left.range_start.as_slice(),
84 left.relative_path.to_string_lossy(),
85 left.offset,
86 )
87 .cmp(&(
88 right.collection.as_str(),
89 right.range_start.as_slice(),
90 right.relative_path.to_string_lossy(),
91 right.offset,
92 ))
93 });
94 }
95
96 pub fn extents_for_key(&self, collection: &str, key: &[u8]) -> Vec<&ServerlessExtentRef> {
97 self.extents
98 .iter()
99 .filter(|extent| extent.contains_key(collection, key))
100 .collect()
101 }
102
103 pub fn extents_for_range(
104 &self,
105 collection: &str,
106 range_start: &[u8],
107 range_end: &[u8],
108 ) -> RdbFileResult<Vec<&ServerlessExtentRef>> {
109 if !range_end.is_empty() && range_start >= range_end {
110 return Err(RdbFileError::InvalidOperation(
111 "serverless hydration range_start must be before range_end".into(),
112 ));
113 }
114 Ok(self
115 .extents
116 .iter()
117 .filter(|extent| extent.overlaps_range(collection, range_start, range_end))
118 .collect())
119 }
120
121 pub fn hot_prefetch_paths(&self) -> Vec<PathBuf> {
122 let mut paths: Vec<PathBuf> = self
123 .extents
124 .iter()
125 .filter(|extent| extent.hot)
126 .map(|extent| extent.relative_path.clone())
127 .collect();
128 paths.sort();
129 paths.dedup();
130 paths
131 }
132
133 pub fn hydration_plan_for_key(&self, collection: &str, key: &[u8]) -> ServerlessHydrationPlan {
134 ServerlessHydrationPlan {
135 generation: self.generation,
136 requests: self
137 .extents_for_key(collection, key)
138 .into_iter()
139 .map(ServerlessHydrationRequest::from_extent)
140 .collect(),
141 }
142 }
143
144 pub fn hydration_plan_for_range(
145 &self,
146 collection: &str,
147 range_start: &[u8],
148 range_end: &[u8],
149 ) -> RdbFileResult<ServerlessHydrationPlan> {
150 Ok(ServerlessHydrationPlan {
151 generation: self.generation,
152 requests: self
153 .extents_for_range(collection, range_start, range_end)?
154 .into_iter()
155 .map(ServerlessHydrationRequest::from_extent)
156 .collect(),
157 })
158 }
159
160 pub fn hot_hydration_plan(&self) -> ServerlessHydrationPlan {
161 ServerlessHydrationPlan {
162 generation: self.generation,
163 requests: self
164 .extents
165 .iter()
166 .filter(|extent| extent.hot)
167 .map(ServerlessHydrationRequest::from_extent)
168 .collect(),
169 }
170 }
171
172 pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
173 write_bytes(path, &self.encode())
174 }
175
176 pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
177 Self::decode(&fs::read(path)?)
178 }
179
180 pub fn encode(&self) -> Vec<u8> {
181 let mut out = Vec::new();
182 out.extend_from_slice(SERVERLESS_EXTENT_INDEX_MAGIC);
183 put_u16(&mut out, SERVERLESS_ARTIFACT_VERSION);
184 put_u64(&mut out, self.generation);
185 put_u32(&mut out, self.extents.len() as u32);
186 for extent in &self.extents {
187 put_string(&mut out, &extent.collection);
188 put_bytes(&mut out, &extent.range_start);
189 put_bytes(&mut out, &extent.range_end);
190 put_string(&mut out, &extent.relative_path.to_string_lossy());
191 put_u64(&mut out, extent.offset);
192 put_u64(&mut out, extent.bytes);
193 put_u32(&mut out, extent.checksum);
194 put_content_hash(&mut out, extent.content_hash);
195 out.push(u8::from(extent.hot));
196 }
197 let checksum = crc32(&out);
198 put_u32(&mut out, checksum);
199 out
200 }
201
202 pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
203 verify_checksum(bytes)?;
204 let mut cursor = 0usize;
205 expect_magic(bytes, &mut cursor, SERVERLESS_EXTENT_INDEX_MAGIC)?;
206 let version = take_u16(bytes, &mut cursor)?;
207 if version != SERVERLESS_ARTIFACT_VERSION {
208 return Err(RdbFileError::InvalidOperation(format!(
209 "unsupported serverless extent index version {version}"
210 )));
211 }
212 let generation = take_u64(bytes, &mut cursor)?;
213 let count = take_u32(bytes, &mut cursor)? as usize;
214 let mut index = Self::new(generation);
215 for _ in 0..count {
216 let collection = take_string(bytes, &mut cursor)?;
217 let range_start = take_vec_bytes(bytes, &mut cursor)?;
218 let range_end = take_vec_bytes(bytes, &mut cursor)?;
219 let relative_path = PathBuf::from(take_string(bytes, &mut cursor)?);
220 let offset = take_u64(bytes, &mut cursor)?;
221 let bytes_len = take_u64(bytes, &mut cursor)?;
222 let checksum = take_u32(bytes, &mut cursor)?;
223 let content_hash = take_content_hash(bytes, &mut cursor)?;
224 let hot = take_u8(bytes, &mut cursor)? != 0;
225 if !range_end.is_empty() && range_start >= range_end {
226 return Err(RdbFileError::InvalidOperation(
227 "serverless extent range_start must be before range_end".into(),
228 ));
229 }
230 index.push(ServerlessExtentRef {
231 collection,
232 range_start,
233 range_end,
234 relative_path,
235 offset,
236 bytes: bytes_len,
237 checksum,
238 content_hash,
239 hot,
240 });
241 }
242 reject_trailing_bytes(bytes, cursor)?;
243 Ok(index)
244 }
245}