Skip to main content

reddb_server/runtime/
impl_serverless.rs

1use super::*;
2
3impl RedDBRuntime {
4    pub fn serverless_file_plan(&self) -> Option<reddb_file::ServerlessFilePlan> {
5        let data_path = self.inner.db.options().data_path.as_ref()?;
6        let generation = self
7            .primary_logical_head_lsn()
8            .max(self.cdc_current_lsn())
9            .max(1);
10        Some(reddb_file::ServerlessFilePlan::for_data_path(
11            data_path, generation,
12        ))
13    }
14
15    fn serverless_file_plan_for_generation(
16        &self,
17        generation: u64,
18    ) -> Option<reddb_file::ServerlessFilePlan> {
19        let plan = self.serverless_file_plan()?;
20        Some(plan.for_generation(generation))
21    }
22
23    fn serverless_local_cache_for_generation(
24        &self,
25        generation: u64,
26    ) -> Option<reddb_file::ServerlessLocalCache> {
27        let plan = self.serverless_file_plan_for_generation(generation)?;
28        Some(plan.local_cache())
29    }
30
31    fn serverless_collection_snapshot_bytes(&self, collection: &str) -> RedDBResult<Vec<u8>> {
32        let source = self
33            .inner
34            .db
35            .store()
36            .get_collection(collection)
37            .ok_or_else(|| {
38                RedDBError::Internal(format!("serverless collection not found: {collection}"))
39            })?;
40        let snapshot = crate::storage::unified::UnifiedStore::with_config(
41            crate::storage::unified::UnifiedStoreConfig::default(),
42        );
43        let mut error: Option<RedDBError> = None;
44        source.for_each_entity(|entity| {
45            let cloned = entity.clone();
46            match snapshot.insert_auto(collection, cloned) {
47                Ok(id) => {
48                    if let Some(metadata) = source.get_metadata(entity.id) {
49                        if let Err(err) = snapshot.set_metadata(collection, id, metadata) {
50                            error = Some(RedDBError::Internal(err.to_string()));
51                            return false;
52                        }
53                    }
54                    true
55                }
56                Err(err) => {
57                    error = Some(RedDBError::Internal(err.to_string()));
58                    false
59                }
60            }
61        });
62        if let Some(error) = error {
63            return Err(error);
64        }
65        Ok(snapshot.to_binary_dump_bytes())
66    }
67
68    pub fn publish_serverless_generation(
69        &self,
70    ) -> RedDBResult<Option<reddb_file::ServerlessGenerationPointer>> {
71        let Some(base_plan) = self.serverless_file_plan() else {
72            return Ok(None);
73        };
74        self.flush()?;
75        let next_generation = match base_plan.read_current_pointer_verified() {
76            Ok(pointer) => base_plan
77                .generation
78                .max(pointer.generation.saturating_add(1)),
79            Err(reddb_file::RdbFileError::Io(err))
80                if err.kind() == std::io::ErrorKind::NotFound =>
81            {
82                base_plan.generation
83            }
84            Err(err) => {
85                return Err(RedDBError::InvalidOperation(format!(
86                    "corrupt serverless generation: {err}"
87                )));
88            }
89        };
90        let plan = reddb_file::ServerlessFilePlan::new(
91            base_plan.root,
92            base_plan.namespace,
93            next_generation,
94        )
95        .with_cache_policy(base_plan.cache_policy);
96        let mut extent_index = reddb_file::ServerlessExtentIndex::new(plan.generation);
97        let mut collections = self.inner.db.store().list_collections();
98        collections.sort();
99        collections.dedup();
100        if collections.is_empty() {
101            collections.push("__database__".to_string());
102        }
103        let mut collection_data = Vec::new();
104        for collection in collections {
105            let payload = if collection == "__database__" {
106                self.inner.db.store().to_binary_dump_bytes()
107            } else {
108                self.serverless_collection_snapshot_bytes(&collection)?
109            };
110            let offset = collection_data.len() as u64;
111            collection_data.extend_from_slice(&payload);
112            extent_index.push(
113                plan.collection_data_extent_ref(collection, offset, &payload, true)
114                    .map_err(|err| RedDBError::Internal(err.to_string()))?,
115            );
116        }
117        let secondary_index =
118            reddb_file::ServerlessSecondaryIndex::from_extent_index(&extent_index);
119        let pointer = plan
120            .publish_core_generation(&extent_index, &collection_data, &secondary_index.encode())
121            .map_err(|err| RedDBError::Internal(err.to_string()))?;
122        Ok(Some(pointer))
123    }
124
125    pub fn read_current_serverless_generation_verified(
126        &self,
127    ) -> RedDBResult<Option<reddb_file::ServerlessGenerationPointer>> {
128        let Some(plan) = self.serverless_file_plan() else {
129            return Ok(None);
130        };
131        match plan.read_current_pointer_verified() {
132            Ok(pointer) => Ok(Some(pointer)),
133            Err(reddb_file::RdbFileError::Io(err))
134                if err.kind() == std::io::ErrorKind::NotFound =>
135            {
136                Ok(None)
137            }
138            Err(err) => Err(RedDBError::InvalidOperation(format!(
139                "corrupt serverless generation: {err}"
140            ))),
141        }
142    }
143
144    pub fn hydrate_current_serverless_collection(
145        &self,
146        collection: &str,
147    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
148        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
149            return Ok(None);
150        };
151        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
152            return Ok(None);
153        };
154        let secondary =
155            reddb_file::ServerlessSecondaryIndex::read_from_path(plan.secondary_index_path())
156                .map_err(|err| {
157                    RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
158                })?;
159        let hydration = secondary.hydration_plan_for_collection(collection);
160        plan.hydrate_local_plan(&hydration)
161            .map(Some)
162            .map_err(|err| {
163                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
164            })
165    }
166
167    pub fn hydrate_current_serverless_key(
168        &self,
169        collection: &str,
170        key: &[u8],
171    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
172        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
173            return Ok(None);
174        };
175        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
176            return Ok(None);
177        };
178        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
179            .map_err(|err| {
180                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
181            })?;
182        let hydration = index.hydration_plan_for_key(collection, key);
183        plan.hydrate_local_plan(&hydration)
184            .map(Some)
185            .map_err(|err| {
186                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
187            })
188    }
189
190    pub fn hydrate_current_serverless_range(
191        &self,
192        collection: &str,
193        range_start: &[u8],
194        range_end: &[u8],
195    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
196        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
197            return Ok(None);
198        };
199        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
200            return Ok(None);
201        };
202        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
203            .map_err(|err| {
204                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
205            })?;
206        let hydration = index
207            .hydration_plan_for_range(collection, range_start, range_end)
208            .map_err(|err| RedDBError::InvalidOperation(err.to_string()))?;
209        plan.hydrate_local_plan(&hydration)
210            .map(Some)
211            .map_err(|err| {
212                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
213            })
214    }
215
216    pub fn hydrate_current_serverless_key_cached(
217        &self,
218        collection: &str,
219        key: &[u8],
220    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
221        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
222            return Ok(None);
223        };
224        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
225            return Ok(None);
226        };
227        let Some(cache) = self.serverless_local_cache_for_generation(pointer.generation) else {
228            return Ok(None);
229        };
230        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
231            .map_err(|err| {
232                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
233            })?;
234        let hydration = index.hydration_plan_for_key(collection, key);
235        plan.hydrate_local_plan_cached(&hydration, &cache)
236            .map(Some)
237            .map_err(|err| {
238                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
239            })
240    }
241
242    pub fn prefetch_current_serverless_hot_extents_cached(
243        &self,
244    ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
245        let Some(pointer) = self.read_current_serverless_generation_verified()? else {
246            return Ok(None);
247        };
248        let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
249            return Ok(None);
250        };
251        let Some(cache) = self.serverless_local_cache_for_generation(pointer.generation) else {
252            return Ok(None);
253        };
254        let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
255            .map_err(|err| {
256                RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
257            })?;
258        plan.prefetch_hot_extents_cached(&index, &cache)
259            .map(Some)
260            .map_err(|err| {
261                RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
262            })
263    }
264}