Skip to main content

reddb_file/serverless/
plan.rs

1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct ServerlessFilePlan {
5    pub root: PathBuf,
6    pub namespace: String,
7    pub generation: u64,
8    pub cache_policy: ServerlessCachePolicy,
9}
10
11impl ServerlessFilePlan {
12    pub fn new(root: impl Into<PathBuf>, namespace: impl Into<String>, generation: u64) -> Self {
13        Self {
14            root: root.into(),
15            namespace: namespace.into(),
16            generation,
17            cache_policy: ServerlessCachePolicy::default(),
18        }
19    }
20
21    pub fn for_data_path(data_path: impl AsRef<Path>, generation: u64) -> Self {
22        let data_path = data_path.as_ref();
23        Self::new(
24            crate::layout::serverless_root(data_path),
25            crate::layout::serverless_namespace(data_path),
26            generation,
27        )
28    }
29
30    pub fn with_cache_policy(mut self, policy: ServerlessCachePolicy) -> Self {
31        self.cache_policy = policy;
32        self
33    }
34
35    pub fn for_generation(&self, generation: u64) -> Self {
36        Self::new(self.root.clone(), self.namespace.clone(), generation)
37            .with_cache_policy(self.cache_policy.clone())
38    }
39
40    pub fn local_cache(&self) -> ServerlessLocalCache {
41        ServerlessLocalCache::new(
42            crate::layout::serverless_cache_root(&self.root, &self.namespace),
43            self.generation,
44        )
45    }
46
47    pub fn artifact_path(&self, kind: ServerlessPackKind) -> PathBuf {
48        self.root
49            .join(&self.namespace)
50            .join(format!("g{:020}", self.generation))
51            .join(format!("{}.redpack", kind.as_str()))
52    }
53
54    pub fn generation_dir(&self) -> PathBuf {
55        self.root
56            .join(&self.namespace)
57            .join(format!("g{:020}", self.generation))
58    }
59
60    pub fn manifest_path(&self) -> PathBuf {
61        self.artifact_path(ServerlessPackKind::Manifest)
62    }
63
64    pub fn boot_index_path(&self) -> PathBuf {
65        self.artifact_path(ServerlessPackKind::BootIndex)
66    }
67
68    pub fn extent_index_path(&self) -> PathBuf {
69        self.artifact_path(ServerlessPackKind::ExtentIndex)
70    }
71
72    pub fn collection_data_path(&self) -> PathBuf {
73        self.artifact_path(ServerlessPackKind::CollectionData)
74    }
75
76    pub fn collection_data_extent_ref(
77        &self,
78        collection: impl Into<String>,
79        offset: u64,
80        payload: &[u8],
81        hot: bool,
82    ) -> RdbFileResult<ServerlessExtentRef> {
83        ServerlessExtentRef::new(
84            collection,
85            Vec::<u8>::new(),
86            Vec::<u8>::new(),
87            relative_to_generation_dir(&self.collection_data_path()),
88            offset,
89            payload,
90            hot,
91        )
92    }
93
94    pub fn secondary_index_path(&self) -> PathBuf {
95        self.artifact_path(ServerlessPackKind::SecondaryIndex)
96    }
97
98    pub fn current_pointer_path(&self) -> PathBuf {
99        self.root.join(&self.namespace).join("CURRENT.redptr")
100    }
101
102    pub fn publish_generation_pointer(
103        &self,
104        manifest: &ServerlessManifest,
105    ) -> RdbFileResult<ServerlessGenerationPointer> {
106        if manifest.namespace != self.namespace {
107            return Err(RdbFileError::InvalidOperation(format!(
108                "manifest namespace {} does not match plan namespace {}",
109                manifest.namespace, self.namespace
110            )));
111        }
112        if manifest.generation != self.generation {
113            return Err(RdbFileError::InvalidOperation(format!(
114                "manifest generation {} does not match plan generation {}",
115                manifest.generation, self.generation
116            )));
117        }
118        self.validate_complete_generation(manifest)?;
119        let pointer = ServerlessGenerationPointer::from_manifest(self, manifest);
120        pointer.write_to_path(self.current_pointer_path())?;
121        Ok(pointer)
122    }
123
124    pub fn read_current_pointer(&self) -> RdbFileResult<ServerlessGenerationPointer> {
125        ServerlessGenerationPointer::read_from_path(self.current_pointer_path())
126    }
127
128    pub fn read_current_pointer_verified(&self) -> RdbFileResult<ServerlessGenerationPointer> {
129        let pointer = self.read_current_pointer()?;
130        if pointer.namespace != self.namespace {
131            return Err(RdbFileError::InvalidOperation(format!(
132                "current pointer namespace {} does not match plan namespace {}",
133                pointer.namespace, self.namespace
134            )));
135        }
136
137        let expected_manifest_relative_path =
138            PathBuf::from(format!("g{:020}/manifest.redpack", pointer.generation));
139        validate_generation_relative_path(&pointer.manifest_relative_path)?;
140        if pointer.manifest_relative_path != expected_manifest_relative_path {
141            return Err(RdbFileError::InvalidOperation(format!(
142                "current pointer manifest path {} does not match expected {}",
143                pointer.manifest_relative_path.display(),
144                expected_manifest_relative_path.display()
145            )));
146        }
147
148        let generation_plan = ServerlessFilePlan::new(
149            self.root.clone(),
150            self.namespace.clone(),
151            pointer.generation,
152        )
153        .with_cache_policy(self.cache_policy.clone());
154        let manifest_path = self
155            .root
156            .join(&self.namespace)
157            .join(&pointer.manifest_relative_path);
158        let manifest_bytes = fs::read(&manifest_path)?;
159        if manifest_bytes.len() as u64 != pointer.manifest_bytes {
160            return Err(RdbFileError::InvalidOperation(format!(
161                "current pointer manifest has {} bytes, expected {}",
162                manifest_bytes.len(),
163                pointer.manifest_bytes
164            )));
165        }
166        let computed_crc = crc32(&manifest_bytes);
167        if computed_crc != pointer.manifest_checksum {
168            return Err(RdbFileError::InvalidOperation(format!(
169                "current pointer manifest checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
170                pointer.manifest_checksum
171            )));
172        }
173        let computed_hash = ServerlessContentHash::from_bytes(&manifest_bytes);
174        if computed_hash != pointer.manifest_content_hash {
175            return Err(RdbFileError::InvalidOperation(
176                "current pointer manifest content hash mismatch".into(),
177            ));
178        }
179
180        let manifest = ServerlessManifest::decode(&manifest_bytes)?;
181        if manifest.namespace != pointer.namespace {
182            return Err(RdbFileError::InvalidOperation(format!(
183                "current pointer namespace {} does not match manifest namespace {}",
184                pointer.namespace, manifest.namespace
185            )));
186        }
187        if manifest.generation != pointer.generation {
188            return Err(RdbFileError::InvalidOperation(format!(
189                "current pointer generation {} does not match manifest generation {}",
190                pointer.generation, manifest.generation
191            )));
192        }
193        generation_plan.validate_complete_generation(&manifest)?;
194        Ok(pointer)
195    }
196
197    pub fn wal_tail_path(&self) -> PathBuf {
198        self.artifact_path(ServerlessPackKind::WalTail)
199    }
200
201    pub fn hot_snapshot_path(&self) -> PathBuf {
202        self.artifact_path(ServerlessPackKind::HotSnapshot)
203    }
204
205    pub fn publish_core_generation(
206        &self,
207        extent_index: &ServerlessExtentIndex,
208        collection_data: &[u8],
209        secondary_index: &[u8],
210    ) -> RdbFileResult<ServerlessGenerationPointer> {
211        if extent_index.generation != self.generation {
212            return Err(RdbFileError::InvalidOperation(format!(
213                "extent index generation {} does not match plan generation {}",
214                extent_index.generation, self.generation
215            )));
216        }
217
218        let mut manifest = ServerlessManifest::new(&self.namespace, self.generation);
219        let boot_index = ServerlessBootIndex::from_plan(self);
220        let boot_index_bytes = boot_index.encode();
221        let extent_index_bytes = extent_index.encode();
222        let empty_pack: &[u8] = &[];
223        let packs = [
224            (
225                ServerlessPackKind::BootIndex,
226                relative_to_generation_dir(&self.boot_index_path()),
227                boot_index_bytes.as_slice(),
228            ),
229            (
230                ServerlessPackKind::ExtentIndex,
231                relative_to_generation_dir(&self.extent_index_path()),
232                extent_index_bytes.as_slice(),
233            ),
234            (
235                ServerlessPackKind::HotSnapshot,
236                relative_to_generation_dir(&self.hot_snapshot_path()),
237                empty_pack,
238            ),
239            (
240                ServerlessPackKind::WalTail,
241                relative_to_generation_dir(&self.wal_tail_path()),
242                empty_pack,
243            ),
244            (
245                ServerlessPackKind::CollectionData,
246                relative_to_generation_dir(&self.collection_data_path()),
247                collection_data,
248            ),
249            (
250                ServerlessPackKind::SecondaryIndex,
251                relative_to_generation_dir(&self.secondary_index_path()),
252                secondary_index,
253            ),
254            (
255                ServerlessPackKind::ColdArchive,
256                relative_to_generation_dir(&self.artifact_path(ServerlessPackKind::ColdArchive)),
257                empty_pack,
258            ),
259        ];
260
261        for (kind, relative_path, payload) in packs {
262            write_bytes(self.generation_dir().join(&relative_path), payload)?;
263            manifest.push(ServerlessManifestEntry::from_bytes(
264                kind,
265                relative_path,
266                payload,
267            ));
268        }
269        manifest.write_to_path(self.manifest_path())?;
270        self.publish_generation_pointer(&manifest)
271    }
272
273    pub fn validate_complete_generation(&self, manifest: &ServerlessManifest) -> RdbFileResult<()> {
274        let required = [
275            ServerlessPackKind::BootIndex,
276            ServerlessPackKind::ExtentIndex,
277            ServerlessPackKind::HotSnapshot,
278            ServerlessPackKind::WalTail,
279            ServerlessPackKind::CollectionData,
280            ServerlessPackKind::SecondaryIndex,
281        ];
282        for required_kind in required {
283            if !manifest
284                .entries
285                .iter()
286                .any(|entry| entry.kind == required_kind)
287            {
288                return Err(RdbFileError::InvalidOperation(format!(
289                    "serverless generation {} is missing required {} pack",
290                    self.generation,
291                    required_kind.as_str()
292                )));
293            }
294        }
295
296        for entry in &manifest.entries {
297            validate_generation_relative_path(&entry.relative_path)?;
298            let payload = fs::read(self.generation_dir().join(&entry.relative_path))?;
299            if payload.len() as u64 != entry.bytes {
300                return Err(RdbFileError::InvalidOperation(format!(
301                    "serverless pack {} has {} bytes, expected {}",
302                    entry.relative_path.display(),
303                    payload.len(),
304                    entry.bytes
305                )));
306            }
307            let computed_crc = crc32(&payload);
308            if computed_crc != entry.checksum {
309                return Err(RdbFileError::InvalidOperation(format!(
310                    "serverless pack {} checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
311                    entry.relative_path.display(),
312                    entry.checksum
313                )));
314            }
315            let computed_hash = ServerlessContentHash::from_bytes(&payload);
316            if !entry.content_hash.is_zero() && computed_hash != entry.content_hash {
317                return Err(RdbFileError::InvalidOperation(format!(
318                    "serverless pack {} content hash mismatch",
319                    entry.relative_path.display()
320                )));
321            }
322        }
323
324        let manifest_bytes = fs::read(self.manifest_path())?;
325        let encoded = manifest.encode();
326        if manifest_bytes != encoded {
327            return Err(RdbFileError::InvalidOperation(
328                "serverless manifest on disk does not match publish manifest".into(),
329            ));
330        }
331        Ok(())
332    }
333
334    pub fn cold_start_order(&self) -> Vec<PathBuf> {
335        let mut order = vec![self.manifest_path(), self.boot_index_path()];
336        order.push(self.extent_index_path());
337        if self.cache_policy.keep_hot_snapshot_local {
338            order.push(self.hot_snapshot_path());
339        }
340        order.push(self.wal_tail_path());
341        order
342    }
343
344    pub fn hot_start_order(&self) -> Vec<PathBuf> {
345        let mut order = Vec::new();
346        if self.cache_policy.keep_boot_index_local {
347            order.push(self.boot_index_path());
348        }
349        if self.cache_policy.keep_hot_snapshot_local {
350            order.push(self.hot_snapshot_path());
351        }
352        order.push(self.wal_tail_path());
353        order
354    }
355
356    pub fn is_generation_dir(path: &Path) -> bool {
357        path.file_name()
358            .and_then(|name| name.to_str())
359            .map(|name| {
360                name.len() == 21
361                    && name.starts_with('g')
362                    && name[1..].chars().all(|c| c.is_ascii_digit())
363            })
364            .unwrap_or(false)
365    }
366}
367
368fn validate_generation_relative_path(path: &Path) -> RdbFileResult<()> {
369    if path.is_absolute() {
370        return Err(RdbFileError::InvalidOperation(
371            "serverless pack path must be relative".into(),
372        ));
373    }
374    if path
375        .components()
376        .any(|component| matches!(component, std::path::Component::ParentDir))
377    {
378        return Err(RdbFileError::InvalidOperation(
379            "serverless pack path must not contain parent components".into(),
380        ));
381    }
382    Ok(())
383}
384
385#[derive(Debug, Clone, PartialEq, Eq)]
386pub struct ServerlessBootPlan {
387    pub required_first: Vec<PathBuf>,
388    pub lazy_after_open: Vec<PathBuf>,
389}
390
391impl ServerlessBootPlan {
392    pub fn cold(plan: &ServerlessFilePlan) -> Self {
393        Self {
394            required_first: plan.cold_start_order(),
395            lazy_after_open: vec![
396                plan.artifact_path(ServerlessPackKind::CollectionData),
397                plan.artifact_path(ServerlessPackKind::SecondaryIndex),
398                plan.artifact_path(ServerlessPackKind::ColdArchive),
399            ],
400        }
401    }
402
403    pub fn hot(plan: &ServerlessFilePlan) -> Self {
404        Self {
405            required_first: plan.hot_start_order(),
406            lazy_after_open: vec![
407                plan.artifact_path(ServerlessPackKind::Manifest),
408                plan.artifact_path(ServerlessPackKind::CollectionData),
409                plan.artifact_path(ServerlessPackKind::SecondaryIndex),
410            ],
411        }
412    }
413}