absurder_sql/storage/
allocation.rs

1//! Block allocation and deallocation operations
2//! This module handles the lifecycle management of blocks
3
4#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
5use std::collections::HashSet;
6use crate::types::DatabaseError;
7use super::block_storage::BlockStorage;
8
9#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
10use std::{fs, io::{Read, Write}, path::PathBuf};
11
12#[cfg(target_arch = "wasm32")]
13use super::vfs_sync;
14
15#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
16use super::vfs_sync;
17
18#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
19use super::block_storage::GLOBAL_METADATA_TEST;
20
21// On-disk JSON schema for fs_persist
22#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
23#[derive(serde::Serialize, serde::Deserialize, Default)]
24#[allow(dead_code)]
25struct FsAlloc { allocated: Vec<u64> }
26
27#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
28#[derive(serde::Serialize, serde::Deserialize, Default)]
29#[allow(dead_code)]
30struct FsDealloc { tombstones: Vec<u64> }
31
32/// Allocate a new block and return its ID
33pub async fn allocate_block_impl(storage: &mut BlockStorage) -> Result<u64, DatabaseError> {
34        
35        // Find the next available block ID
36        let block_id = storage.next_block_id;
37        
38        // Mark block as allocated
39        storage.allocated_blocks.insert(block_id);
40        storage.next_block_id += 1;
41        
42        // For WASM, persist allocation state to global storage
43        #[cfg(target_arch = "wasm32")]
44        {
45            vfs_sync::with_global_allocation_map(|allocation_map| {
46                let mut allocation_map = allocation_map.borrow_mut();
47                let db_allocations = allocation_map.entry(storage.db_name.clone()).or_insert_with(HashSet::new);
48                db_allocations.insert(block_id);
49            });
50        }
51        
52        // fs_persist: mirror allocation to allocations.json
53        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
54        {
55            let base: PathBuf = storage.base_dir.clone();
56            let mut db_dir = base.clone();
57            db_dir.push(&storage.db_name);
58            let _ = fs::create_dir_all(&db_dir);
59            // Proactively ensure blocks directory exists so tests can observe it immediately after first sync
60            let mut blocks_dir = db_dir.clone();
61            blocks_dir.push("blocks");
62            let _ = fs::create_dir_all(&blocks_dir);
63            let mut alloc_path = db_dir.clone();
64            alloc_path.push("allocations.json");
65            // load existing
66            let mut alloc = FsAlloc::default();
67            if let Ok(mut f) = fs::File::open(&alloc_path) {
68                let mut s = String::new();
69                if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| { alloc = a; }); }
70            }
71            if !alloc.allocated.contains(&block_id) { alloc.allocated.push(block_id); }
72            if let Ok(mut f) = fs::File::create(&alloc_path) { let _ = f.write_all(serde_json::to_string(&alloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
73
74            // Remove any tombstone (block was reallocated) and persist deallocated.json
75            let mut dealloc_path = db_dir.clone();
76            dealloc_path.push("deallocated.json");
77            storage.deallocated_blocks.remove(&block_id);
78            let mut dealloc = FsDealloc::default();
79            // best effort read to preserve any existing entries
80            if let Ok(mut f) = fs::File::open(&dealloc_path) {
81                let mut s = String::new();
82                if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| { dealloc = d; }); }
83            }
84            dealloc.tombstones = storage.deallocated_blocks.iter().cloned().collect();
85            dealloc.tombstones.sort_unstable();
86            if let Ok(mut f) = fs::File::create(&dealloc_path) { let _ = f.write_all(serde_json::to_string(&dealloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
87        }
88
89        // For native tests, mirror allocation state to test-global (when fs_persist disabled)
90        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
91        {
92            vfs_sync::with_global_allocation_map(|allocation_map| {
93                let mut allocation_map = allocation_map.borrow_mut();
94                let db_allocations = allocation_map.entry(storage.db_name.clone()).or_insert_with(HashSet::new);
95                db_allocations.insert(block_id);
96            });
97        }
98        
99        // Track allocation in telemetry
100        #[cfg(feature = "telemetry")]
101        if let Some(ref metrics) = storage.metrics {
102            metrics.blocks_allocated_total().inc();
103            // Update memory gauge: total allocated blocks × BLOCK_SIZE
104            let total_memory = (storage.allocated_blocks.len() as f64) * (super::block_storage::BLOCK_SIZE as f64);
105            metrics.memory_bytes().set(total_memory);
106        }
107        
108        log::info!("Allocated block: {} (total allocated: {})", block_id, storage.allocated_blocks.len());
109        Ok(block_id)
110    }
111
112/// Deallocate a block and mark it as available for reuse
113pub async fn deallocate_block_impl(storage: &mut BlockStorage, block_id: u64) -> Result<(), DatabaseError> {
114        
115        // Check if block is actually allocated
116        if !storage.allocated_blocks.contains(&block_id) {
117            return Err(DatabaseError::new(
118                "BLOCK_NOT_ALLOCATED",
119                &format!("Block {} is not allocated", block_id)
120            ));
121        }
122        
123        // Remove from allocated set
124        storage.allocated_blocks.remove(&block_id);
125        
126        // Clear from cache and dirty blocks
127        storage.cache.remove(&block_id);
128        storage.dirty_blocks.lock().remove(&block_id);
129        // Remove checksum metadata
130        storage.checksum_manager.remove_checksum(block_id);
131        
132        // For WASM, remove from global storage
133        #[cfg(target_arch = "wasm32")]
134        {
135            vfs_sync::with_global_storage(|gs| {
136                let mut storage_map = gs.borrow_mut();
137                if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
138                    db_storage.remove(&block_id);
139                }
140            });
141            
142            vfs_sync::with_global_allocation_map(|allocation_map| {
143                let mut allocation_map = allocation_map.borrow_mut();
144                if let Some(db_allocations) = allocation_map.get_mut(&storage.db_name) {
145                    db_allocations.remove(&block_id);
146                }
147            });
148
149            // Remove persisted metadata entry as well
150            vfs_sync::with_global_metadata(|meta| {
151                let mut meta_map = meta.borrow_mut();
152                if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
153                    db_meta.remove(&block_id);
154                }
155            });
156        }
157        
158        // For native fs_persist, remove files and update JSON stores
159        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
160        {
161            let base: PathBuf = storage.base_dir.clone();
162            let mut db_dir = base.clone();
163            db_dir.push(&storage.db_name);
164            let mut blocks_dir = db_dir.clone();
165            blocks_dir.push("blocks");
166            let mut block_path = blocks_dir.clone();
167            block_path.push(format!("block_{}.bin", block_id));
168            let _ = fs::remove_file(&block_path);
169
170            // update allocations.json
171            let mut alloc_path = db_dir.clone();
172            alloc_path.push("allocations.json");
173            let mut alloc = FsAlloc::default();
174            if let Ok(mut f) = fs::File::open(&alloc_path) { let mut s = String::new(); if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| { alloc = a; }); } }
175            alloc.allocated.retain(|&id| id != block_id);
176            if let Ok(mut f) = fs::File::create(&alloc_path) { let _ = f.write_all(serde_json::to_string(&alloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
177
178            // update metadata.json (remove entry)
179            let mut meta_path = db_dir.clone();
180            meta_path.push("metadata.json");
181            // Tolerant JSON handling: remove entry with matching id from entries array
182            let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
183            if let Ok(mut f) = fs::File::open(&meta_path) {
184                let mut s = String::new();
185                if f.read_to_string(&mut s).is_ok() { if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) { meta_val = v; } }
186            }
187            if !meta_val.is_object() { meta_val = serde_json::json!({"entries": []}); }
188            if let Some(entries) = meta_val.get_mut("entries").and_then(|v| v.as_array_mut()) {
189                entries.retain(|ent| {
190                    ent.as_array()
191                        .and_then(|arr| arr.get(0))
192                        .and_then(|v| v.as_u64())
193                        .map(|bid| bid != block_id)
194                        .unwrap_or(true)
195                });
196            }
197            let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
198            if let Ok(mut f) = fs::File::create(&meta_path) { let _ = f.write_all(meta_string.as_bytes()); }
199
200            // Append to deallocated tombstones and persist deallocated.json
201            let mut dealloc_path = db_dir.clone();
202            dealloc_path.push("deallocated.json");
203            storage.deallocated_blocks.insert(block_id);
204            let mut dealloc = FsDealloc::default();
205            if let Ok(mut f) = fs::File::open(&dealloc_path) { let mut s = String::new(); if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| { dealloc = d; }); } }
206            dealloc.tombstones = storage.deallocated_blocks.iter().cloned().collect();
207            dealloc.tombstones.sort_unstable();
208            if let Ok(mut f) = fs::File::create(&dealloc_path) { let _ = f.write_all(serde_json::to_string(&dealloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
209        }
210
211        // For native tests, mirror removal from test-globals (when fs_persist disabled)
212        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
213        {
214            vfs_sync::with_global_storage(|gs| {
215                let mut storage_map = gs.borrow_mut();
216                if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
217                    db_storage.remove(&block_id);
218                }
219            });
220            
221            vfs_sync::with_global_allocation_map(|allocation_map| {
222                let mut allocation_map = allocation_map.borrow_mut();
223                if let Some(db_allocations) = allocation_map.get_mut(&storage.db_name) {
224                    db_allocations.remove(&block_id);
225                }
226            });
227
228            GLOBAL_METADATA_TEST.with(|meta| {
229                let mut meta_map = meta.borrow_mut();
230                if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
231                    db_meta.remove(&block_id);
232                }
233            });
234        }
235        
236        // Update next_block_id to reuse deallocated blocks
237        if block_id < storage.next_block_id {
238            storage.next_block_id = block_id;
239        }
240        
241        // Track deallocation in telemetry
242        #[cfg(feature = "telemetry")]
243        if let Some(ref metrics) = storage.metrics {
244            metrics.blocks_deallocated_total().inc();
245            // Update memory gauge: total allocated blocks × BLOCK_SIZE
246            let total_memory = (storage.allocated_blocks.len() as f64) * (super::block_storage::BLOCK_SIZE as f64);
247            metrics.memory_bytes().set(total_memory);
248        }
249        
250        log::info!("Deallocated block: {} (total allocated: {})", block_id, storage.allocated_blocks.len());
251        Ok(())
252    }