Skip to main content

reddb_file/serverless/
hydrate.rs

1use super::*;
2use std::io::{Read, Seek, SeekFrom};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct ServerlessHydrationRequest {
6    pub relative_path: PathBuf,
7    pub offset: u64,
8    pub bytes: u64,
9    pub checksum: u32,
10    pub content_hash: ServerlessContentHash,
11}
12
13impl ServerlessHydrationRequest {
14    pub fn from_extent(extent: &ServerlessExtentRef) -> Self {
15        Self {
16            relative_path: extent.relative_path.clone(),
17            offset: extent.offset,
18            bytes: extent.bytes,
19            checksum: extent.checksum,
20            content_hash: extent.content_hash,
21        }
22    }
23
24    pub fn validate_payload(&self, payload: &[u8]) -> RdbFileResult<()> {
25        if payload.len() as u64 != self.bytes {
26            return Err(RdbFileError::InvalidOperation(format!(
27                "serverless hydration range {} has {} bytes, expected {}",
28                self.relative_path.display(),
29                payload.len(),
30                self.bytes
31            )));
32        }
33        let computed_crc = crc32(payload);
34        if computed_crc != self.checksum {
35            return Err(RdbFileError::InvalidOperation(format!(
36                "serverless hydration range {} checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
37                self.relative_path.display(),
38                self.checksum
39            )));
40        }
41        let computed_hash = ServerlessContentHash::from_bytes(payload);
42        if !self.content_hash.is_zero() && computed_hash != self.content_hash {
43            return Err(RdbFileError::InvalidOperation(format!(
44                "serverless hydration range {} content hash mismatch",
45                self.relative_path.display()
46            )));
47        }
48        Ok(())
49    }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ServerlessHydrationPlan {
54    pub generation: u64,
55    pub requests: Vec<ServerlessHydrationRequest>,
56}
57
58impl ServerlessHydrationPlan {
59    pub fn total_bytes(&self) -> u64 {
60        self.requests.iter().map(|request| request.bytes).sum()
61    }
62
63    pub fn is_empty(&self) -> bool {
64        self.requests.is_empty()
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct ServerlessHydratedRange {
70    pub request: ServerlessHydrationRequest,
71    pub payload: Vec<u8>,
72}
73
74impl ServerlessFilePlan {
75    pub fn hydrate_local_plan(
76        &self,
77        plan: &ServerlessHydrationPlan,
78    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
79        if plan.generation != self.generation {
80            return Err(RdbFileError::InvalidOperation(format!(
81                "hydration plan generation {} does not match file plan generation {}",
82                plan.generation, self.generation
83            )));
84        }
85        let mut hydrated = Vec::with_capacity(plan.requests.len());
86        for request in &plan.requests {
87            hydrated.push(self.hydrate_local_request(request)?);
88        }
89        Ok(hydrated)
90    }
91
92    pub fn hydrate_local_plan_cached(
93        &self,
94        plan: &ServerlessHydrationPlan,
95        cache: &ServerlessLocalCache,
96    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
97        if plan.generation != self.generation {
98            return Err(RdbFileError::InvalidOperation(format!(
99                "hydration plan generation {} does not match file plan generation {}",
100                plan.generation, self.generation
101            )));
102        }
103        let mut hydrated = Vec::with_capacity(plan.requests.len());
104        for request in &plan.requests {
105            hydrated.push(self.hydrate_local_request_cached(request, cache)?);
106        }
107        Ok(hydrated)
108    }
109
110    pub fn hydrate_local_request(
111        &self,
112        request: &ServerlessHydrationRequest,
113    ) -> RdbFileResult<ServerlessHydratedRange> {
114        validate_hydration_relative_path(&request.relative_path)?;
115        let end = request
116            .offset
117            .checked_add(request.bytes)
118            .ok_or_else(|| RdbFileError::InvalidOperation("hydration range overflow".into()))?;
119        let len = usize::try_from(request.bytes).map_err(|_| {
120            RdbFileError::InvalidOperation("hydration range too large for local memory".into())
121        })?;
122        let path = self.generation_dir().join(&request.relative_path);
123        let mut file = File::open(&path)?;
124        let file_len = file.metadata()?.len();
125        if end > file_len {
126            return Err(RdbFileError::InvalidOperation(format!(
127                "hydration range {}..{} exceeds pack {} length {}",
128                request.offset,
129                end,
130                request.relative_path.display(),
131                file_len
132            )));
133        }
134        file.seek(SeekFrom::Start(request.offset))?;
135        let mut payload = vec![0u8; len];
136        file.read_exact(&mut payload)?;
137        request.validate_payload(&payload)?;
138        Ok(ServerlessHydratedRange {
139            request: request.clone(),
140            payload,
141        })
142    }
143
144    pub fn hydrate_local_request_cached(
145        &self,
146        request: &ServerlessHydrationRequest,
147        cache: &ServerlessLocalCache,
148    ) -> RdbFileResult<ServerlessHydratedRange> {
149        if cache.generation != self.generation {
150            return Err(RdbFileError::InvalidOperation(format!(
151                "serverless cache generation {} does not match file plan generation {}",
152                cache.generation, self.generation
153            )));
154        }
155        if let Ok(range) = cache.read_hydrated_range(request) {
156            return Ok(range);
157        }
158        let range = self.hydrate_local_request(request)?;
159        cache.write_hydrated_range(&range)?;
160        cache.enforce_max_bytes(self.cache_policy.max_hot_bytes)?;
161        Ok(range)
162    }
163
164    pub fn prefetch_hot_extents(
165        &self,
166        index: &ServerlessExtentIndex,
167    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
168        self.hydrate_local_plan(&index.hot_hydration_plan())
169    }
170
171    pub fn prefetch_hot_extents_cached(
172        &self,
173        index: &ServerlessExtentIndex,
174        cache: &ServerlessLocalCache,
175    ) -> RdbFileResult<Vec<ServerlessHydratedRange>> {
176        self.hydrate_local_plan_cached(&index.hot_hydration_plan(), cache)
177    }
178}
179
180fn validate_hydration_relative_path(path: &Path) -> RdbFileResult<()> {
181    if path.is_absolute() {
182        return Err(RdbFileError::InvalidOperation(
183            "serverless hydration path must be relative".into(),
184        ));
185    }
186    if path
187        .components()
188        .any(|component| matches!(component, std::path::Component::ParentDir))
189    {
190        return Err(RdbFileError::InvalidOperation(
191            "serverless hydration path must not contain parent components".into(),
192        ));
193    }
194    Ok(())
195}