absurder_sql/storage/
allocation.rs1#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist"))))]
5use std::collections::HashSet;
6use crate::types::DatabaseError;
7use super::block_storage::BlockStorage;
8
9#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
10use std::{fs, io::{Read, Write}, path::PathBuf};
11
12#[cfg(target_arch = "wasm32")]
13use super::vfs_sync;
14
15#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
16use super::vfs_sync;
17
18#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
19use super::block_storage::GLOBAL_METADATA_TEST;
20
21#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
23#[derive(serde::Serialize, serde::Deserialize, Default)]
24#[allow(dead_code)]
25struct FsAlloc { allocated: Vec<u64> }
26
27#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
28#[derive(serde::Serialize, serde::Deserialize, Default)]
29#[allow(dead_code)]
30struct FsDealloc { tombstones: Vec<u64> }
31
32pub async fn allocate_block_impl(storage: &mut BlockStorage) -> Result<u64, DatabaseError> {
34
35 let block_id = storage.next_block_id;
37
38 storage.allocated_blocks.insert(block_id);
40 storage.next_block_id += 1;
41
42 #[cfg(target_arch = "wasm32")]
44 {
45 vfs_sync::with_global_allocation_map(|allocation_map| {
46 let mut allocation_map = allocation_map.borrow_mut();
47 let db_allocations = allocation_map.entry(storage.db_name.clone()).or_insert_with(HashSet::new);
48 db_allocations.insert(block_id);
49 });
50 }
51
52 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
54 {
55 let base: PathBuf = storage.base_dir.clone();
56 let mut db_dir = base.clone();
57 db_dir.push(&storage.db_name);
58 let _ = fs::create_dir_all(&db_dir);
59 let mut blocks_dir = db_dir.clone();
61 blocks_dir.push("blocks");
62 let _ = fs::create_dir_all(&blocks_dir);
63 let mut alloc_path = db_dir.clone();
64 alloc_path.push("allocations.json");
65 let mut alloc = FsAlloc::default();
67 if let Ok(mut f) = fs::File::open(&alloc_path) {
68 let mut s = String::new();
69 if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| { alloc = a; }); }
70 }
71 if !alloc.allocated.contains(&block_id) { alloc.allocated.push(block_id); }
72 if let Ok(mut f) = fs::File::create(&alloc_path) { let _ = f.write_all(serde_json::to_string(&alloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
73
74 let mut dealloc_path = db_dir.clone();
76 dealloc_path.push("deallocated.json");
77 storage.deallocated_blocks.remove(&block_id);
78 let mut dealloc = FsDealloc::default();
79 if let Ok(mut f) = fs::File::open(&dealloc_path) {
81 let mut s = String::new();
82 if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| { dealloc = d; }); }
83 }
84 dealloc.tombstones = storage.deallocated_blocks.iter().cloned().collect();
85 dealloc.tombstones.sort_unstable();
86 if let Ok(mut f) = fs::File::create(&dealloc_path) { let _ = f.write_all(serde_json::to_string(&dealloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
87 }
88
89 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
91 {
92 vfs_sync::with_global_allocation_map(|allocation_map| {
93 let mut allocation_map = allocation_map.borrow_mut();
94 let db_allocations = allocation_map.entry(storage.db_name.clone()).or_insert_with(HashSet::new);
95 db_allocations.insert(block_id);
96 });
97 }
98
99 #[cfg(feature = "telemetry")]
101 if let Some(ref metrics) = storage.metrics {
102 metrics.blocks_allocated_total().inc();
103 let total_memory = (storage.allocated_blocks.len() as f64) * (super::block_storage::BLOCK_SIZE as f64);
105 metrics.memory_bytes().set(total_memory);
106 }
107
108 log::info!("Allocated block: {} (total allocated: {})", block_id, storage.allocated_blocks.len());
109 Ok(block_id)
110 }
111
112pub async fn deallocate_block_impl(storage: &mut BlockStorage, block_id: u64) -> Result<(), DatabaseError> {
114
115 if !storage.allocated_blocks.contains(&block_id) {
117 return Err(DatabaseError::new(
118 "BLOCK_NOT_ALLOCATED",
119 &format!("Block {} is not allocated", block_id)
120 ));
121 }
122
123 storage.allocated_blocks.remove(&block_id);
125
126 storage.cache.remove(&block_id);
128 storage.dirty_blocks.lock().remove(&block_id);
129 storage.checksum_manager.remove_checksum(block_id);
131
132 #[cfg(target_arch = "wasm32")]
134 {
135 vfs_sync::with_global_storage(|gs| {
136 let mut storage_map = gs.borrow_mut();
137 if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
138 db_storage.remove(&block_id);
139 }
140 });
141
142 vfs_sync::with_global_allocation_map(|allocation_map| {
143 let mut allocation_map = allocation_map.borrow_mut();
144 if let Some(db_allocations) = allocation_map.get_mut(&storage.db_name) {
145 db_allocations.remove(&block_id);
146 }
147 });
148
149 vfs_sync::with_global_metadata(|meta| {
151 let mut meta_map = meta.borrow_mut();
152 if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
153 db_meta.remove(&block_id);
154 }
155 });
156 }
157
158 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
160 {
161 let base: PathBuf = storage.base_dir.clone();
162 let mut db_dir = base.clone();
163 db_dir.push(&storage.db_name);
164 let mut blocks_dir = db_dir.clone();
165 blocks_dir.push("blocks");
166 let mut block_path = blocks_dir.clone();
167 block_path.push(format!("block_{}.bin", block_id));
168 let _ = fs::remove_file(&block_path);
169
170 let mut alloc_path = db_dir.clone();
172 alloc_path.push("allocations.json");
173 let mut alloc = FsAlloc::default();
174 if let Ok(mut f) = fs::File::open(&alloc_path) { let mut s = String::new(); if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsAlloc>(&s).map(|a| { alloc = a; }); } }
175 alloc.allocated.retain(|&id| id != block_id);
176 if let Ok(mut f) = fs::File::create(&alloc_path) { let _ = f.write_all(serde_json::to_string(&alloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
177
178 let mut meta_path = db_dir.clone();
180 meta_path.push("metadata.json");
181 let mut meta_val: serde_json::Value = serde_json::json!({"entries": []});
183 if let Ok(mut f) = fs::File::open(&meta_path) {
184 let mut s = String::new();
185 if f.read_to_string(&mut s).is_ok() { if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) { meta_val = v; } }
186 }
187 if !meta_val.is_object() { meta_val = serde_json::json!({"entries": []}); }
188 if let Some(entries) = meta_val.get_mut("entries").and_then(|v| v.as_array_mut()) {
189 entries.retain(|ent| {
190 ent.as_array()
191 .and_then(|arr| arr.get(0))
192 .and_then(|v| v.as_u64())
193 .map(|bid| bid != block_id)
194 .unwrap_or(true)
195 });
196 }
197 let meta_string = serde_json::to_string(&meta_val).unwrap_or_else(|_| "{}".into());
198 if let Ok(mut f) = fs::File::create(&meta_path) { let _ = f.write_all(meta_string.as_bytes()); }
199
200 let mut dealloc_path = db_dir.clone();
202 dealloc_path.push("deallocated.json");
203 storage.deallocated_blocks.insert(block_id);
204 let mut dealloc = FsDealloc::default();
205 if let Ok(mut f) = fs::File::open(&dealloc_path) { let mut s = String::new(); if f.read_to_string(&mut s).is_ok() { let _ = serde_json::from_str::<FsDealloc>(&s).map(|d| { dealloc = d; }); } }
206 dealloc.tombstones = storage.deallocated_blocks.iter().cloned().collect();
207 dealloc.tombstones.sort_unstable();
208 if let Ok(mut f) = fs::File::create(&dealloc_path) { let _ = f.write_all(serde_json::to_string(&dealloc).unwrap_or_else(|_| "{}".into()).as_bytes()); }
209 }
210
211 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
213 {
214 vfs_sync::with_global_storage(|gs| {
215 let mut storage_map = gs.borrow_mut();
216 if let Some(db_storage) = storage_map.get_mut(&storage.db_name) {
217 db_storage.remove(&block_id);
218 }
219 });
220
221 vfs_sync::with_global_allocation_map(|allocation_map| {
222 let mut allocation_map = allocation_map.borrow_mut();
223 if let Some(db_allocations) = allocation_map.get_mut(&storage.db_name) {
224 db_allocations.remove(&block_id);
225 }
226 });
227
228 GLOBAL_METADATA_TEST.with(|meta| {
229 let mut meta_map = meta.borrow_mut();
230 if let Some(db_meta) = meta_map.get_mut(&storage.db_name) {
231 db_meta.remove(&block_id);
232 }
233 });
234 }
235
236 if block_id < storage.next_block_id {
238 storage.next_block_id = block_id;
239 }
240
241 #[cfg(feature = "telemetry")]
243 if let Some(ref metrics) = storage.metrics {
244 metrics.blocks_deallocated_total().inc();
245 let total_memory = (storage.allocated_blocks.len() as f64) * (super::block_storage::BLOCK_SIZE as f64);
247 metrics.memory_bytes().set(total_memory);
248 }
249
250 log::info!("Deallocated block: {} (total allocated: {})", block_id, storage.allocated_blocks.len());
251 Ok(())
252 }