absurder_sql/storage/
allocation.rs1#[cfg(target_arch = "wasm32")]
6macro_rules! lock_mutex {
7 ($mutex:expr) => {
8 $mutex
9 .try_borrow_mut()
10 .expect("RefCell borrow failed - reentrancy detected in allocation.rs")
11 };
12}
13
14#[cfg(not(target_arch = "wasm32"))]
15macro_rules! lock_mutex {
16 ($mutex:expr) => {
17 $mutex.lock()
18 };
19}
20
21use super::block_storage::BlockStorage;
22use crate::types::DatabaseError;
23#[cfg(any(
24 target_arch = "wasm32",
25 all(
26 not(target_arch = "wasm32"),
27 any(test, debug_assertions),
28 not(feature = "fs_persist")
29 )
30))]
31use std::collections::HashSet;
32use std::sync::atomic::Ordering;
33
34#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
35use std::{
36 fs,
37 io::{Read, Write},
38 path::PathBuf,
39};
40
41#[cfg(target_arch = "wasm32")]
42use super::vfs_sync;
43
44#[cfg(all(
45 not(target_arch = "wasm32"),
46 any(test, debug_assertions),
47 not(feature = "fs_persist")
48))]
49use super::vfs_sync;
50
51#[cfg(all(
52 not(target_arch = "wasm32"),
53 any(test, debug_assertions),
54 not(feature = "fs_persist")
55))]
56use super::block_storage::GLOBAL_METADATA_TEST;
57
58#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
60#[derive(serde::Serialize, serde::Deserialize, Default)]
61#[allow(dead_code)]
62struct FsAlloc {
63 allocated: Vec<u64>,
64}
65
66#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
67#[derive(serde::Serialize, serde::Deserialize, Default)]
68#[allow(dead_code)]
69struct FsDealloc {
70 tombstones: Vec<u64>,
71}
72
73pub async fn allocate_block_impl(storage: &mut BlockStorage) -> Result<u64, DatabaseError> {
75 let block_id = storage.next_block_id.fetch_add(1, Ordering::SeqCst);
77
78 lock_mutex!(storage.allocated_blocks).insert(block_id);
80
81 #[cfg(target_arch = "wasm32")]
83 {
84 vfs_sync::with_global_allocation_map(|allocation_map| {
85 let mut map = allocation_map.borrow_mut();
86 let db_allocations = map
87 .entry(storage.db_name.clone())
88 .or_insert_with(HashSet::new);
89 db_allocations.insert(block_id);
90 });
91 }
92
93 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
95 {
96 let base: PathBuf = storage.base_dir.clone();
97 let mut db_dir = base.clone();
98 db_dir.push(&storage.db_name);
99 let _ = fs::create_dir_all(&db_dir);
100 let mut blocks_dir = db_dir.clone();
102 blocks_dir.push("blocks");
103 let _ = fs::create_dir_all(&blocks_dir);
104 let mut alloc_path = db_dir.clone();
105 alloc_path.push("allocations.json");
106 let mut alloc = FsAlloc::default();
108 if let Ok(mut f) = fs::File::open(&alloc_path) {
109 let mut s = String::new();
110 if f.read_to_string(&mut s).is_ok() {
111 let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| {
112 alloc = a;
113 });
114 }
115 }
116 if !alloc.allocated.contains(&block_id) {
117 alloc.allocated.push(block_id);
118 }
119 if let Ok(mut f) = fs::File::create(&alloc_path) {
120 let _ = f.write_all(
121 serde_json::to_string(&alloc)
122 .unwrap_or_else(|_| "{}".into())
123 .as_bytes(),
124 );
125 }
126
127 let mut dealloc_path = db_dir.clone();
129 dealloc_path.push("deallocated.json");
130 lock_mutex!(storage.deallocated_blocks).remove(&block_id);
131 let mut dealloc = FsDealloc::default();
132 if let Ok(mut f) = fs::File::open(&dealloc_path) {
134 let mut s = String::new();
135 if f.read_to_string(&mut s).is_ok() {
136 let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| {
137 dealloc = d;
138 });
139 }
140 }
141 dealloc.tombstones = lock_mutex!(storage.deallocated_blocks)
142 .iter()
143 .cloned()
144 .collect();
145 dealloc.tombstones.sort_unstable();
146 if let Ok(mut f) = fs::File::create(&dealloc_path) {
147 let _ = f.write_all(
148 serde_json::to_string(&dealloc)
149 .unwrap_or_else(|_| "{}".into())
150 .as_bytes(),
151 );
152 }
153 }
154
155 #[cfg(all(
157 not(target_arch = "wasm32"),
158 any(test, debug_assertions),
159 not(feature = "fs_persist")
160 ))]
161 {
162 vfs_sync::with_global_allocation_map(|allocation_map| {
163 let mut map = allocation_map.borrow_mut();
164 let db_allocations = map
165 .entry(storage.db_name.clone())
166 .or_insert_with(HashSet::new);
167 db_allocations.insert(block_id);
168 });
169 }
170
171 #[cfg(feature = "telemetry")]
173 if let Some(ref metrics) = storage.metrics {
174 metrics.blocks_allocated_total().inc();
175 let total_memory = (lock_mutex!(storage.allocated_blocks).len() as f64)
177 * (super::block_storage::BLOCK_SIZE as f64);
178 metrics.memory_bytes().set(total_memory);
179 }
180
181 log::info!(
182 "Allocated block: {} (total allocated: {})",
183 block_id,
184 lock_mutex!(storage.allocated_blocks).len()
185 );
186 Ok(block_id)
187}
188
189pub async fn deallocate_block_impl(
191 storage: &mut BlockStorage,
192 block_id: u64,
193) -> Result<(), DatabaseError> {
194 if !lock_mutex!(storage.allocated_blocks).contains(&block_id) {
196 return Err(DatabaseError::new(
197 "BLOCK_NOT_ALLOCATED",
198 &format!("Block {} is not allocated", block_id),
199 ));
200 }
201
202 lock_mutex!(storage.allocated_blocks).remove(&block_id);
204
205 lock_mutex!(storage.cache).remove(&block_id);
207 lock_mutex!(storage.dirty_blocks).remove(&block_id);
208 storage.checksum_manager.remove_checksum(block_id);
210
211 #[cfg(target_arch = "wasm32")]
213 {
214 vfs_sync::with_global_storage(|storage_map| {
215 if let Some(db_storage) = storage_map.borrow_mut().get_mut(&storage.db_name) {
216 db_storage.remove(&block_id);
217 }
218 });
219
220 vfs_sync::with_global_allocation_map(|allocation_map| {
221 if let Some(db_allocations) = allocation_map.borrow_mut().get_mut(&storage.db_name) {
222 db_allocations.remove(&block_id);
223 }
224 });
225
226 vfs_sync::with_global_metadata(|meta_map| {
228 if let Some(db_meta) = meta_map.borrow_mut().get_mut(&storage.db_name) {
229 db_meta.remove(&block_id);
230 }
231 });
232 }
233
234 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
236 {
237 let base: PathBuf = storage.base_dir.clone();
238 let mut db_dir = base.clone();
239 db_dir.push(&storage.db_name);
240 let mut blocks_dir = db_dir.clone();
241 blocks_dir.push("blocks");
242 let mut block_path = blocks_dir.clone();
243 block_path.push(format!("block_{}.bin", block_id));
244 let _ = fs::remove_file(&block_path);
245
246 let mut alloc_path = db_dir.clone();
248 alloc_path.push("allocations.json");
249 let mut alloc = FsAlloc::default();
250 if let Ok(mut f) = fs::File::open(&alloc_path) {
251 let mut s = String::new();
252 if f.read_to_string(&mut s).is_ok() {
253 let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| {
254 alloc = a;
255 });
256 }
257 }
258 alloc.allocated.retain(|&id| id != block_id);
259 if let Ok(mut f) = fs::File::create(&alloc_path) {
260 let _ = f.write_all(
261 serde_json::to_string(&alloc)
262 .unwrap_or_else(|_| "{}".into())
263 .as_bytes(),
264 );
265 }
266
267 let mut meta_path = db_dir.clone();
269 meta_path.push("metadata.json");
270 let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
272 if let Ok(mut f) = fs::File::open(&meta_path) {
273 let mut s = String::new();
274 if f.read_to_string(&mut s).is_ok() {
275 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
276 meta_val = v;
277 }
278 }
279 }
280 if !meta_val.is_object() {
281 meta_val = serde_json::json!({"entries": []});
282 }
283 if let Some(entries) = meta_val.get_mut("entries").and_then(|v| v.as_array_mut()) {
284 entries.retain(|ent| {
285 ent.as_array()
286 .and_then(|arr| arr.first())
287 .and_then(|v| v.as_u64())
288 .map(|bid| bid != block_id)
289 .unwrap_or(true)
290 });
291 }
292 let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
293 if let Ok(mut f) = fs::File::create(&meta_path) {
294 let _ = f.write_all(meta_string.as_bytes());
295 }
296
297 let mut dealloc_path = db_dir.clone();
299 dealloc_path.push("deallocated.json");
300 lock_mutex!(storage.deallocated_blocks).insert(block_id);
301 let mut dealloc = FsDealloc::default();
302 if let Ok(mut f) = fs::File::open(&dealloc_path) {
303 let mut s = String::new();
304 if f.read_to_string(&mut s).is_ok() {
305 let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| {
306 dealloc = d;
307 });
308 }
309 }
310 dealloc.tombstones = lock_mutex!(storage.deallocated_blocks)
311 .iter()
312 .cloned()
313 .collect();
314 dealloc.tombstones.sort_unstable();
315 if let Ok(mut f) = fs::File::create(&dealloc_path) {
316 let _ = f.write_all(
317 serde_json::to_string(&dealloc)
318 .unwrap_or_else(|_| "{}".into())
319 .as_bytes(),
320 );
321 }
322 }
323
324 #[cfg(all(
326 not(target_arch = "wasm32"),
327 any(test, debug_assertions),
328 not(feature = "fs_persist")
329 ))]
330 {
331 vfs_sync::with_global_storage(|gs| {
332 let mut storage_map = gs.borrow_mut();
333 if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
334 db_storage.remove(&block_id);
335 }
336 });
337
338 vfs_sync::with_global_allocation_map(|allocation_map| {
339 if let Some(db_allocations) = allocation_map.borrow_mut().get_mut(&storage.db_name) {
340 db_allocations.remove(&block_id);
341 }
342 });
343
344 GLOBAL_METADATA_TEST.with(|meta| {
345 let mut meta_map = meta.lock();
346 if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
347 db_meta.remove(&block_id);
348 }
349 });
350 }
351
352 let current = storage.next_block_id.load(Ordering::SeqCst);
354 if block_id < current {
355 storage.next_block_id.store(block_id, Ordering::SeqCst);
356 }
357
358 #[cfg(feature = "telemetry")]
360 if let Some(ref metrics) = storage.metrics {
361 metrics.blocks_deallocated_total().inc();
362 let total_memory = (lock_mutex!(storage.allocated_blocks).len() as f64)
364 * (super::block_storage::BLOCK_SIZE as f64);
365 metrics.memory_bytes().set(total_memory);
366 }
367
368 log::info!(
369 "Deallocated block: {} (total allocated: {})",
370 block_id,
371 lock_mutex!(storage.allocated_blocks).len()
372 );
373 Ok(())
374}