absurder_sql/storage/
allocation.rs

1//! Block allocation and deallocation operations
2//! This module handles the lifecycle management of blocks
3
4// Reentrancy-safe lock macros
5#[cfg(target_arch = "wasm32")]
6macro_rules! lock_mutex {
7    ($mutex:expr) => {
8        $mutex
9            .try_borrow_mut()
10            .expect("RefCell borrow failed - reentrancy detected in allocation.rs")
11    };
12}
13
14#[cfg(not(target_arch = "wasm32"))]
15macro_rules! lock_mutex {
16    ($mutex:expr) => {
17        $mutex.lock()
18    };
19}
20
21use super::block_storage::BlockStorage;
22use crate::types::DatabaseError;
23#[cfg(any(
24    target_arch = "wasm32",
25    all(
26        not(target_arch = "wasm32"),
27        any(test, debug_assertions),
28        not(feature = "fs_persist")
29    )
30))]
31use std::collections::HashSet;
32use std::sync::atomic::Ordering;
33
34#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
35use std::{
36    fs,
37    io::{Read, Write},
38    path::PathBuf,
39};
40
41#[cfg(target_arch = "wasm32")]
42use super::vfs_sync;
43
44#[cfg(all(
45    not(target_arch = "wasm32"),
46    any(test, debug_assertions),
47    not(feature = "fs_persist")
48))]
49use super::vfs_sync;
50
51#[cfg(all(
52    not(target_arch = "wasm32"),
53    any(test, debug_assertions),
54    not(feature = "fs_persist")
55))]
56use super::block_storage::GLOBAL_METADATA_TEST;
57
58// On-disk JSON schema for fs_persist
59#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
60#[derive(serde::Serialize, serde::Deserialize, Default)]
61#[allow(dead_code)]
62struct FsAlloc {
63    allocated: Vec<u64>,
64}
65
66#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
67#[derive(serde::Serialize, serde::Deserialize, Default)]
68#[allow(dead_code)]
69struct FsDealloc {
70    tombstones: Vec<u64>,
71}
72
73/// Allocate a new block and return its ID
74pub async fn allocate_block_impl(storage: &mut BlockStorage) -> Result<u64, DatabaseError> {
75    // Find the next available block ID and atomically increment
76    let block_id = storage.next_block_id.fetch_add(1, Ordering::SeqCst);
77
78    // Mark block as allocated
79    lock_mutex!(storage.allocated_blocks).insert(block_id);
80
81    // For WASM, persist allocation state to global storage
82    #[cfg(target_arch = "wasm32")]
83    {
84        vfs_sync::with_global_allocation_map(|allocation_map| {
85            let mut map = allocation_map.borrow_mut();
86            let db_allocations = map
87                .entry(storage.db_name.clone())
88                .or_insert_with(HashSet::new);
89            db_allocations.insert(block_id);
90        });
91    }
92
93    // fs_persist: mirror allocation to allocations.json
94    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
95    {
96        let base: PathBuf = storage.base_dir.clone();
97        let mut db_dir = base.clone();
98        db_dir.push(&storage.db_name);
99        let _ = fs::create_dir_all(&db_dir);
100        // Proactively ensure blocks directory exists so tests can observe it immediately after first sync
101        let mut blocks_dir = db_dir.clone();
102        blocks_dir.push("blocks");
103        let _ = fs::create_dir_all(&blocks_dir);
104        let mut alloc_path = db_dir.clone();
105        alloc_path.push("allocations.json");
106        // load existing
107        let mut alloc = FsAlloc::default();
108        if let Ok(mut f) = fs::File::open(&alloc_path) {
109            let mut s = String::new();
110            if f.read_to_string(&mut s).is_ok() {
111                let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| {
112                    alloc = a;
113                });
114            }
115        }
116        if !alloc.allocated.contains(&block_id) {
117            alloc.allocated.push(block_id);
118        }
119        if let Ok(mut f) = fs::File::create(&alloc_path) {
120            let _ = f.write_all(
121                serde_json::to_string(&alloc)
122                    .unwrap_or_else(|_| "{}".into())
123                    .as_bytes(),
124            );
125        }
126
127        // Remove any tombstone (block was reallocated) and persist deallocated.json
128        let mut dealloc_path = db_dir.clone();
129        dealloc_path.push("deallocated.json");
130        lock_mutex!(storage.deallocated_blocks).remove(&block_id);
131        let mut dealloc = FsDealloc::default();
132        // best effort read to preserve any existing entries
133        if let Ok(mut f) = fs::File::open(&dealloc_path) {
134            let mut s = String::new();
135            if f.read_to_string(&mut s).is_ok() {
136                let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| {
137                    dealloc = d;
138                });
139            }
140        }
141        dealloc.tombstones = lock_mutex!(storage.deallocated_blocks)
142            .iter()
143            .cloned()
144            .collect();
145        dealloc.tombstones.sort_unstable();
146        if let Ok(mut f) = fs::File::create(&dealloc_path) {
147            let _ = f.write_all(
148                serde_json::to_string(&dealloc)
149                    .unwrap_or_else(|_| "{}".into())
150                    .as_bytes(),
151            );
152        }
153    }
154
155    // For native tests, mirror allocation state to test-global (when fs_persist disabled)
156    #[cfg(all(
157        not(target_arch = "wasm32"),
158        any(test, debug_assertions),
159        not(feature = "fs_persist")
160    ))]
161    {
162        vfs_sync::with_global_allocation_map(|allocation_map| {
163            let mut map = allocation_map.borrow_mut();
164            let db_allocations = map
165                .entry(storage.db_name.clone())
166                .or_insert_with(HashSet::new);
167            db_allocations.insert(block_id);
168        });
169    }
170
171    // Track allocation in telemetry
172    #[cfg(feature = "telemetry")]
173    if let Some(ref metrics) = storage.metrics {
174        metrics.blocks_allocated_total().inc();
175        // Update memory gauge: total allocated blocks × BLOCK_SIZE
176        let total_memory = (lock_mutex!(storage.allocated_blocks).len() as f64)
177            * (super::block_storage::BLOCK_SIZE as f64);
178        metrics.memory_bytes().set(total_memory);
179    }
180
181    log::info!(
182        "Allocated block: {} (total allocated: {})",
183        block_id,
184        lock_mutex!(storage.allocated_blocks).len()
185    );
186    Ok(block_id)
187}
188
189/// Deallocate a block and mark it as available for reuse
190pub async fn deallocate_block_impl(
191    storage: &mut BlockStorage,
192    block_id: u64,
193) -> Result<(), DatabaseError> {
194    // Check if block is actually allocated
195    if !lock_mutex!(storage.allocated_blocks).contains(&block_id) {
196        return Err(DatabaseError::new(
197            "BLOCK_NOT_ALLOCATED",
198            &format!("Block {} is not allocated", block_id),
199        ));
200    }
201
202    // Remove from allocated set
203    lock_mutex!(storage.allocated_blocks).remove(&block_id);
204
205    // Clear from cache and dirty blocks
206    lock_mutex!(storage.cache).remove(&block_id);
207    lock_mutex!(storage.dirty_blocks).remove(&block_id);
208    // Remove checksum metadata
209    storage.checksum_manager.remove_checksum(block_id);
210
211    // For WASM, remove from global storage
212    #[cfg(target_arch = "wasm32")]
213    {
214        vfs_sync::with_global_storage(|storage_map| {
215            if let Some(db_storage) = storage_map.borrow_mut().get_mut(&storage.db_name) {
216                db_storage.remove(&block_id);
217            }
218        });
219
220        vfs_sync::with_global_allocation_map(|allocation_map| {
221            if let Some(db_allocations) = allocation_map.borrow_mut().get_mut(&storage.db_name) {
222                db_allocations.remove(&block_id);
223            }
224        });
225
226        // Remove persisted metadata entry as well
227        vfs_sync::with_global_metadata(|meta_map| {
228            if let Some(db_meta) = meta_map.borrow_mut().get_mut(&storage.db_name) {
229                db_meta.remove(&block_id);
230            }
231        });
232    }
233
234    // For native fs_persist, remove files and update JSON stores
235    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
236    {
237        let base: PathBuf = storage.base_dir.clone();
238        let mut db_dir = base.clone();
239        db_dir.push(&storage.db_name);
240        let mut blocks_dir = db_dir.clone();
241        blocks_dir.push("blocks");
242        let mut block_path = blocks_dir.clone();
243        block_path.push(format!("block_{}.bin", block_id));
244        let _ = fs::remove_file(&block_path);
245
246        // update allocations.json
247        let mut alloc_path = db_dir.clone();
248        alloc_path.push("allocations.json");
249        let mut alloc = FsAlloc::default();
250        if let Ok(mut f) = fs::File::open(&alloc_path) {
251            let mut s = String::new();
252            if f.read_to_string(&mut s).is_ok() {
253                let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| {
254                    alloc = a;
255                });
256            }
257        }
258        alloc.allocated.retain(|&id| id != block_id);
259        if let Ok(mut f) = fs::File::create(&alloc_path) {
260            let _ = f.write_all(
261                serde_json::to_string(&alloc)
262                    .unwrap_or_else(|_| "{}".into())
263                    .as_bytes(),
264            );
265        }
266
267        // update metadata.json (remove entry)
268        let mut meta_path = db_dir.clone();
269        meta_path.push("metadata.json");
270        // Tolerant JSON handling: remove entry with matching id from entries array
271        let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
272        if let Ok(mut f) = fs::File::open(&meta_path) {
273            let mut s = String::new();
274            if f.read_to_string(&mut s).is_ok() {
275                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
276                    meta_val = v;
277                }
278            }
279        }
280        if !meta_val.is_object() {
281            meta_val = serde_json::json!({"entries": []});
282        }
283        if let Some(entries) = meta_val.get_mut("entries").and_then(|v| v.as_array_mut()) {
284            entries.retain(|ent| {
285                ent.as_array()
286                    .and_then(|arr| arr.first())
287                    .and_then(|v| v.as_u64())
288                    .map(|bid| bid != block_id)
289                    .unwrap_or(true)
290            });
291        }
292        let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
293        if let Ok(mut f) = fs::File::create(&meta_path) {
294            let _ = f.write_all(meta_string.as_bytes());
295        }
296
297        // Append to deallocated tombstones and persist deallocated.json
298        let mut dealloc_path = db_dir.clone();
299        dealloc_path.push("deallocated.json");
300        lock_mutex!(storage.deallocated_blocks).insert(block_id);
301        let mut dealloc = FsDealloc::default();
302        if let Ok(mut f) = fs::File::open(&dealloc_path) {
303            let mut s = String::new();
304            if f.read_to_string(&mut s).is_ok() {
305                let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| {
306                    dealloc = d;
307                });
308            }
309        }
310        dealloc.tombstones = lock_mutex!(storage.deallocated_blocks)
311            .iter()
312            .cloned()
313            .collect();
314        dealloc.tombstones.sort_unstable();
315        if let Ok(mut f) = fs::File::create(&dealloc_path) {
316            let _ = f.write_all(
317                serde_json::to_string(&dealloc)
318                    .unwrap_or_else(|_| "{}".into())
319                    .as_bytes(),
320            );
321        }
322    }
323
324    // For native tests, mirror removal from test-globals (when fs_persist disabled)
325    #[cfg(all(
326        not(target_arch = "wasm32"),
327        any(test, debug_assertions),
328        not(feature = "fs_persist")
329    ))]
330    {
331        vfs_sync::with_global_storage(|gs| {
332            let mut storage_map = gs.borrow_mut();
333            if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
334                db_storage.remove(&block_id);
335            }
336        });
337
338        vfs_sync::with_global_allocation_map(|allocation_map| {
339            if let Some(db_allocations) = allocation_map.borrow_mut().get_mut(&storage.db_name) {
340                db_allocations.remove(&block_id);
341            }
342        });
343
344        GLOBAL_METADATA_TEST.with(|meta| {
345            let mut meta_map = meta.lock();
346            if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
347                db_meta.remove(&block_id);
348            }
349        });
350    }
351
352    // Update next_block_id to reuse deallocated blocks
353    let current = storage.next_block_id.load(Ordering::SeqCst);
354    if block_id < current {
355        storage.next_block_id.store(block_id, Ordering::SeqCst);
356    }
357
358    // Track deallocation in telemetry
359    #[cfg(feature = "telemetry")]
360    if let Some(ref metrics) = storage.metrics {
361        metrics.blocks_deallocated_total().inc();
362        // Update memory gauge: total allocated blocks × BLOCK_SIZE
363        let total_memory = (lock_mutex!(storage.allocated_blocks).len() as f64)
364            * (super::block_storage::BLOCK_SIZE as f64);
365        metrics.memory_bytes().set(total_memory);
366    }
367
368    log::info!(
369        "Deallocated block: {} (total allocated: {})",
370        block_id,
371        lock_mutex!(storage.allocated_blocks).len()
372    );
373    Ok(())
374}