reddb_server/runtime/
impl_serverless.rs1use super::*;
2
3impl RedDBRuntime {
4 pub fn serverless_file_plan(&self) -> Option<reddb_file::ServerlessFilePlan> {
5 let data_path = self.inner.db.options().data_path.as_ref()?;
6 let generation = self
7 .primary_logical_head_lsn()
8 .max(self.cdc_current_lsn())
9 .max(1);
10 Some(reddb_file::ServerlessFilePlan::for_data_path(
11 data_path, generation,
12 ))
13 }
14
15 fn serverless_file_plan_for_generation(
16 &self,
17 generation: u64,
18 ) -> Option<reddb_file::ServerlessFilePlan> {
19 let plan = self.serverless_file_plan()?;
20 Some(plan.for_generation(generation))
21 }
22
23 fn serverless_local_cache_for_generation(
24 &self,
25 generation: u64,
26 ) -> Option<reddb_file::ServerlessLocalCache> {
27 let plan = self.serverless_file_plan_for_generation(generation)?;
28 Some(plan.local_cache())
29 }
30
31 fn serverless_collection_snapshot_bytes(&self, collection: &str) -> RedDBResult<Vec<u8>> {
32 let source = self
33 .inner
34 .db
35 .store()
36 .get_collection(collection)
37 .ok_or_else(|| {
38 RedDBError::Internal(format!("serverless collection not found: {collection}"))
39 })?;
40 let snapshot = crate::storage::unified::UnifiedStore::with_config(
41 crate::storage::unified::UnifiedStoreConfig::default(),
42 );
43 let mut error: Option<RedDBError> = None;
44 source.for_each_entity(|entity| {
45 let cloned = entity.clone();
46 match snapshot.insert_auto(collection, cloned) {
47 Ok(id) => {
48 if let Some(metadata) = source.get_metadata(entity.id) {
49 if let Err(err) = snapshot.set_metadata(collection, id, metadata) {
50 error = Some(RedDBError::Internal(err.to_string()));
51 return false;
52 }
53 }
54 true
55 }
56 Err(err) => {
57 error = Some(RedDBError::Internal(err.to_string()));
58 false
59 }
60 }
61 });
62 if let Some(error) = error {
63 return Err(error);
64 }
65 Ok(snapshot.to_binary_dump_bytes())
66 }
67
68 pub fn publish_serverless_generation(
69 &self,
70 ) -> RedDBResult<Option<reddb_file::ServerlessGenerationPointer>> {
71 let Some(base_plan) = self.serverless_file_plan() else {
72 return Ok(None);
73 };
74 self.flush()?;
75 let next_generation = match base_plan.read_current_pointer_verified() {
76 Ok(pointer) => base_plan
77 .generation
78 .max(pointer.generation.saturating_add(1)),
79 Err(reddb_file::RdbFileError::Io(err))
80 if err.kind() == std::io::ErrorKind::NotFound =>
81 {
82 base_plan.generation
83 }
84 Err(err) => {
85 return Err(RedDBError::InvalidOperation(format!(
86 "corrupt serverless generation: {err}"
87 )));
88 }
89 };
90 let plan = reddb_file::ServerlessFilePlan::new(
91 base_plan.root,
92 base_plan.namespace,
93 next_generation,
94 )
95 .with_cache_policy(base_plan.cache_policy);
96 let mut extent_index = reddb_file::ServerlessExtentIndex::new(plan.generation);
97 let mut collections = self.inner.db.store().list_collections();
98 collections.sort();
99 collections.dedup();
100 if collections.is_empty() {
101 collections.push("__database__".to_string());
102 }
103 let mut collection_data = Vec::new();
104 for collection in collections {
105 let payload = if collection == "__database__" {
106 self.inner.db.store().to_binary_dump_bytes()
107 } else {
108 self.serverless_collection_snapshot_bytes(&collection)?
109 };
110 let offset = collection_data.len() as u64;
111 collection_data.extend_from_slice(&payload);
112 extent_index.push(
113 plan.collection_data_extent_ref(collection, offset, &payload, true)
114 .map_err(|err| RedDBError::Internal(err.to_string()))?,
115 );
116 }
117 let secondary_index =
118 reddb_file::ServerlessSecondaryIndex::from_extent_index(&extent_index);
119 let pointer = plan
120 .publish_core_generation(&extent_index, &collection_data, &secondary_index.encode())
121 .map_err(|err| RedDBError::Internal(err.to_string()))?;
122 Ok(Some(pointer))
123 }
124
125 pub fn read_current_serverless_generation_verified(
126 &self,
127 ) -> RedDBResult<Option<reddb_file::ServerlessGenerationPointer>> {
128 let Some(plan) = self.serverless_file_plan() else {
129 return Ok(None);
130 };
131 match plan.read_current_pointer_verified() {
132 Ok(pointer) => Ok(Some(pointer)),
133 Err(reddb_file::RdbFileError::Io(err))
134 if err.kind() == std::io::ErrorKind::NotFound =>
135 {
136 Ok(None)
137 }
138 Err(err) => Err(RedDBError::InvalidOperation(format!(
139 "corrupt serverless generation: {err}"
140 ))),
141 }
142 }
143
144 pub fn hydrate_current_serverless_collection(
145 &self,
146 collection: &str,
147 ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
148 let Some(pointer) = self.read_current_serverless_generation_verified()? else {
149 return Ok(None);
150 };
151 let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
152 return Ok(None);
153 };
154 let secondary =
155 reddb_file::ServerlessSecondaryIndex::read_from_path(plan.secondary_index_path())
156 .map_err(|err| {
157 RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
158 })?;
159 let hydration = secondary.hydration_plan_for_collection(collection);
160 plan.hydrate_local_plan(&hydration)
161 .map(Some)
162 .map_err(|err| {
163 RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
164 })
165 }
166
167 pub fn hydrate_current_serverless_key(
168 &self,
169 collection: &str,
170 key: &[u8],
171 ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
172 let Some(pointer) = self.read_current_serverless_generation_verified()? else {
173 return Ok(None);
174 };
175 let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
176 return Ok(None);
177 };
178 let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
179 .map_err(|err| {
180 RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
181 })?;
182 let hydration = index.hydration_plan_for_key(collection, key);
183 plan.hydrate_local_plan(&hydration)
184 .map(Some)
185 .map_err(|err| {
186 RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
187 })
188 }
189
190 pub fn hydrate_current_serverless_range(
191 &self,
192 collection: &str,
193 range_start: &[u8],
194 range_end: &[u8],
195 ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
196 let Some(pointer) = self.read_current_serverless_generation_verified()? else {
197 return Ok(None);
198 };
199 let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
200 return Ok(None);
201 };
202 let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
203 .map_err(|err| {
204 RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
205 })?;
206 let hydration = index
207 .hydration_plan_for_range(collection, range_start, range_end)
208 .map_err(|err| RedDBError::InvalidOperation(err.to_string()))?;
209 plan.hydrate_local_plan(&hydration)
210 .map(Some)
211 .map_err(|err| {
212 RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
213 })
214 }
215
216 pub fn hydrate_current_serverless_key_cached(
217 &self,
218 collection: &str,
219 key: &[u8],
220 ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
221 let Some(pointer) = self.read_current_serverless_generation_verified()? else {
222 return Ok(None);
223 };
224 let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
225 return Ok(None);
226 };
227 let Some(cache) = self.serverless_local_cache_for_generation(pointer.generation) else {
228 return Ok(None);
229 };
230 let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
231 .map_err(|err| {
232 RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
233 })?;
234 let hydration = index.hydration_plan_for_key(collection, key);
235 plan.hydrate_local_plan_cached(&hydration, &cache)
236 .map(Some)
237 .map_err(|err| {
238 RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
239 })
240 }
241
242 pub fn prefetch_current_serverless_hot_extents_cached(
243 &self,
244 ) -> RedDBResult<Option<Vec<reddb_file::ServerlessHydratedRange>>> {
245 let Some(pointer) = self.read_current_serverless_generation_verified()? else {
246 return Ok(None);
247 };
248 let Some(plan) = self.serverless_file_plan_for_generation(pointer.generation) else {
249 return Ok(None);
250 };
251 let Some(cache) = self.serverless_local_cache_for_generation(pointer.generation) else {
252 return Ok(None);
253 };
254 let index = reddb_file::ServerlessExtentIndex::read_from_path(plan.extent_index_path())
255 .map_err(|err| {
256 RedDBError::InvalidOperation(format!("corrupt serverless generation: {err}"))
257 })?;
258 plan.prefetch_hot_extents_cached(&index, &cache)
259 .map(Some)
260 .map_err(|err| {
261 RedDBError::InvalidOperation(format!("serverless hydrate failed: {err}"))
262 })
263 }
264}