reddb_file/serverless/
hydrate.rs1use super::*;
2use std::io::{Read, Seek, SeekFrom};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct ServerlessHydrationRequest {
6 pub relative_path: PathBuf,
7 pub offset: u64,
8 pub bytes: u64,
9 pub checksum: u32,
10 pub content_hash: ServerlessContentHash,
11}
12
13impl ServerlessHydrationRequest {
14 pub fn from_extent(extent: &ServerlessExtentRef) -> Self {
15 Self {
16 relative_path: extent.relative_path.clone(),
17 offset: extent.offset,
18 bytes: extent.bytes,
19 checksum: extent.checksum,
20 content_hash: extent.content_hash,
21 }
22 }
23
24 pub fn validate_payload(&self, payload: &[u8]) -> RdbFileResult<()> {
25 if payload.len() as u64 != self.bytes {
26 return Err(RdbFileError::InvalidOperation(format!(
27 "serverless hydration range {} has {} bytes, expected {}",
28 self.relative_path.display(),
29 payload.len(),
30 self.bytes
31 )));
32 }
33 let computed_crc = crc32(payload);
34 if computed_crc != self.checksum {
35 return Err(RdbFileError::InvalidOperation(format!(
36 "serverless hydration range {} checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
37 self.relative_path.display(),
38 self.checksum
39 )));
40 }
41 let computed_hash = ServerlessContentHash::from_bytes(payload);
42 if !self.content_hash.is_zero() && computed_hash != self.content_hash {
43 return Err(RdbFileError::InvalidOperation(format!(
44 "serverless hydration range {} content hash mismatch",
45 self.relative_path.display()
46 )));
47 }
48 Ok(())
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ServerlessHydrationPlan {
54 pub generation: u64,
55 pub requests: Vec<ServerlessHydrationRequest>,
56}
57
58impl ServerlessHydrationPlan {
59 pub fn total_bytes(&self) -> u64 {
60 self.requests.iter().map(|request| request.bytes).sum()
61 }
62
63 pub fn is_empty(&self) -> bool {
64 self.requests.is_empty()
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct ServerlessHydratedRange {
70 pub request: ServerlessHydrationRequest,
71 pub payload: Vec<u8>,
72}
73
74impl ServerlessFilePlan {
75 pub fn hydrate_local_plan(
76 &self,
77 plan: &ServerlessHydrationPlan,
78 ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
79 if plan.generation != self.generation {
80 return Err(RdbFileError::InvalidOperation(format!(
81 "hydration plan generation {} does not match file plan generation {}",
82 plan.generation, self.generation
83 )));
84 }
85 let mut hydrated = Vec::with_capacity(plan.requests.len());
86 for request in &plan.requests {
87 hydrated.push(self.hydrate_local_request(request)?);
88 }
89 Ok(hydrated)
90 }
91
92 pub fn hydrate_local_plan_cached(
93 &self,
94 plan: &ServerlessHydrationPlan,
95 cache: &ServerlessLocalCache,
96 ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
97 if plan.generation != self.generation {
98 return Err(RdbFileError::InvalidOperation(format!(
99 "hydration plan generation {} does not match file plan generation {}",
100 plan.generation, self.generation
101 )));
102 }
103 let mut hydrated = Vec::with_capacity(plan.requests.len());
104 for request in &plan.requests {
105 hydrated.push(self.hydrate_local_request_cached(request, cache)?);
106 }
107 Ok(hydrated)
108 }
109
110 pub fn hydrate_local_request(
111 &self,
112 request: &ServerlessHydrationRequest,
113 ) -> RdbFileResult<ServerlessHydratedRange> {
114 validate_hydration_relative_path(&request.relative_path)?;
115 let end = request
116 .offset
117 .checked_add(request.bytes)
118 .ok_or_else(|| RdbFileError::InvalidOperation("hydration range overflow".into()))?;
119 let len = usize::try_from(request.bytes).map_err(|_| {
120 RdbFileError::InvalidOperation("hydration range too large for local memory".into())
121 })?;
122 let path = self.generation_dir().join(&request.relative_path);
123 let mut file = File::open(&path)?;
124 let file_len = file.metadata()?.len();
125 if end > file_len {
126 return Err(RdbFileError::InvalidOperation(format!(
127 "hydration range {}..{} exceeds pack {} length {}",
128 request.offset,
129 end,
130 request.relative_path.display(),
131 file_len
132 )));
133 }
134 file.seek(SeekFrom::Start(request.offset))?;
135 let mut payload = vec![0u8; len];
136 file.read_exact(&mut payload)?;
137 request.validate_payload(&payload)?;
138 Ok(ServerlessHydratedRange {
139 request: request.clone(),
140 payload,
141 })
142 }
143
144 pub fn hydrate_local_request_cached(
145 &self,
146 request: &ServerlessHydrationRequest,
147 cache: &ServerlessLocalCache,
148 ) -> RdbFileResult<ServerlessHydratedRange> {
149 if cache.generation != self.generation {
150 return Err(RdbFileError::InvalidOperation(format!(
151 "serverless cache generation {} does not match file plan generation {}",
152 cache.generation, self.generation
153 )));
154 }
155 if let Ok(range) = cache.read_hydrated_range(request) {
156 return Ok(range);
157 }
158 let range = self.hydrate_local_request(request)?;
159 cache.write_hydrated_range(&range)?;
160 cache.enforce_max_bytes(self.cache_policy.max_hot_bytes)?;
161 Ok(range)
162 }
163
164 pub fn prefetch_hot_extents(
165 &self,
166 index: &ServerlessExtentIndex,
167 ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
168 self.hydrate_local_plan(&index.hot_hydration_plan())
169 }
170
171 pub fn prefetch_hot_extents_cached(
172 &self,
173 index: &ServerlessExtentIndex,
174 cache: &ServerlessLocalCache,
175 ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
176 self.hydrate_local_plan_cached(&index.hot_hydration_plan(), cache)
177 }
178}
179
180fn validate_hydration_relative_path(path: &Path) -> RdbFileResult<()> {
181 if path.is_absolute() {
182 return Err(RdbFileError::InvalidOperation(
183 "serverless hydration path must be relative".into(),
184 ));
185 }
186 if path
187 .components()
188 .any(|component| matches!(component, std::path::Component::ParentDir))
189 {
190 return Err(RdbFileError::InvalidOperation(
191 "serverless hydration path must not contain parent components".into(),
192 ));
193 }
194 Ok(())
195}