absurder_sql/storage/io_operations.rs
1//! I/O operations for BlockStorage
2//! This module contains block reading and writing functionality
3
4#[cfg(not(target_arch = "wasm32"))]
5use std::sync::atomic::Ordering;
6use crate::types::DatabaseError;
7use super::block_storage::{BlockStorage, BLOCK_SIZE};
8
9#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions))))]
10use super::vfs_sync;
11
12#[cfg(target_arch = "wasm32")]
13use std::collections::HashMap;
14#[cfg(target_arch = "wasm32")]
15use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
16
17#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
18use std::{fs, io::Read, path::PathBuf};
19
20#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
21use super::block_storage::GLOBAL_METADATA_TEST;
22
23/// Synchronous block read implementation
24pub fn read_block_sync_impl(storage: &mut BlockStorage, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
25 // Skip auto_sync check for reads - only writes trigger sync
26
27 // Check cache first (both native and WASM)
28 if let Some(data) = storage.cache.get(&block_id).cloned() {
29 // Record cache hit
30 #[cfg(feature = "telemetry")]
31 if let Some(ref metrics) = storage.metrics {
32 metrics.cache_hits().inc();
33 }
34 // Verify checksum even for cached data to catch corruption
35 // Skip block 0 as it's the SQLite header which can be modified by SQLite
36 if block_id != 0 {
37 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
38 return Err(e);
39 }
40 }
41 // Only update LRU when close to capacity to avoid O(n) overhead on every read
42 // This maintains correctness for eviction while optimizing hot-path performance
43 if storage.cache.len() > (storage.capacity * 4 / 5) {
44 storage.touch_lru(block_id);
45 }
46
47 return Ok(data);
48 }
49
50 // Record cache miss
51 #[cfg(feature = "telemetry")]
52 if let Some(ref metrics) = storage.metrics {
53 metrics.cache_misses().inc();
54 metrics.indexeddb_operations_total().inc();
55 }
56
57 // For WASM, check global storage for persistence across instances
58 #[cfg(target_arch = "wasm32")]
59 {
60 // Single combined lookup for commit marker, visibility, and data
61 let (data, is_visible) = vfs_sync::with_global_commit_marker(|cm| {
62 let committed = cm.borrow().get(&storage.db_name).copied().unwrap_or(0);
63
64 // Block 0 (database header) is always visible
65 if block_id == 0 {
66 let data = vfs_sync::with_global_storage(|gs| {
67 gs.borrow()
68 .get(&storage.db_name)
69 .and_then(|db_storage| db_storage.get(&block_id))
70 .cloned()
71 .unwrap_or_else(|| vec![0; BLOCK_SIZE])
72 });
73 return (data, true);
74 }
75
76 // For other blocks, check visibility and get data in one pass
77 vfs_sync::with_global_metadata(|meta| {
78 let has_metadata = meta.borrow()
79 .get(&storage.db_name)
80 .and_then(|db_meta| db_meta.get(&block_id))
81 .is_some();
82
83 if has_metadata {
84 // Has metadata - check if visible based on commit marker
85 let is_visible = meta.borrow()
86 .get(&storage.db_name)
87 .and_then(|db_meta| db_meta.get(&block_id))
88 .map(|m| (m.version as u64) <= committed)
89 .unwrap_or(false);
90
91 if is_visible {
92 // Visible - return actual data
93 let data = vfs_sync::with_global_storage(|gs| {
94 gs.borrow()
95 .get(&storage.db_name)
96 .and_then(|db_storage| db_storage.get(&block_id))
97 .cloned()
98 .unwrap_or_else(|| vec![0; BLOCK_SIZE])
99 });
100 (data, true)
101 } else {
102 // Not visible (version > commit marker) - return zeroed data for SQLite
103 (vec![0; BLOCK_SIZE], false)
104 }
105 } else {
106 // No metadata - check if data exists in global storage
107 let data = vfs_sync::with_global_storage(|gs| {
108 gs.borrow()
109 .get(&storage.db_name)
110 .and_then(|db_storage| db_storage.get(&block_id))
111 .cloned()
112 });
113
114 match data {
115 Some(data) => (data, true), // Old data before metadata tracking
116 None => (vec![0; BLOCK_SIZE], true) // Return zeros for RMW (read-modify-write)
117 }
118 }
119 })
120 });
121
122 // Verify checksum ONLY for visible blocks in WASM
123 // Skip block 0 as it's the SQLite header which can be modified by SQLite
124 if is_visible && block_id != 0 {
125 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
126 return Err(e);
127 }
128 }
129
130 // Cache for future reads (skip eviction check for performance)
131 storage.cache.insert(block_id, data.clone());
132 return Ok(data);
133 }
134
135 // For native fs_persist, read from filesystem if allocated
136 #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
137 {
138 let base: PathBuf = storage.base_dir.clone();
139 let mut dir = base.clone();
140 dir.push(&storage.db_name);
141 let mut blocks = dir.clone();
142 blocks.push("blocks");
143 let mut block_path = blocks.clone();
144 block_path.push(format!("block_{}.bin", block_id));
145 // If the block was explicitly deallocated (tombstoned), refuse reads
146 if storage.deallocated_blocks.contains(&block_id) {
147 return Err(DatabaseError::new(
148 "BLOCK_NOT_ALLOCATED",
149 &format!("Block {} is not allocated", block_id),
150 ));
151 }
152 if let Ok(mut f) = fs::File::open(&block_path) {
153 let mut data = vec![0u8; BLOCK_SIZE];
154 f.read_exact(&mut data).map_err(|e| DatabaseError::new("IO_ERROR", &format!("read block {} failed: {}", block_id, e)))?;
155 storage.cache.insert(block_id, data.clone());
156 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) { return Err(e); }
157 storage.touch_lru(block_id);
158 storage.evict_if_needed();
159 return Ok(data);
160 }
161 // If file missing, treat as zeroed data (compat). This covers never-written blocks
162 // and avoids depending on allocated_blocks for read behavior.
163 let data = vec![0; BLOCK_SIZE];
164 storage.cache.insert(block_id, data.clone());
165 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) { return Err(e); }
166 storage.touch_lru(block_id);
167 storage.evict_if_needed();
168 return Ok(data);
169 }
170
171 // For native tests, check test-global storage for persistence across instances (when fs_persist disabled)
172 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
173 {
174 // Enforce commit gating in native test path as well
175 let committed: u64 = vfs_sync::with_global_commit_marker(|cm| {
176 let cm = cm.borrow();
177 cm.get(&storage.db_name).copied().unwrap_or(0)
178 });
179 let is_visible: bool = GLOBAL_METADATA_TEST.with(|meta| {
180 let meta_map = meta.borrow();
181 if let Some(db_meta) = meta_map.get(&storage.db_name) {
182 if let Some(m) = db_meta.get(&block_id) {
183 return (m.version as u64) <= committed;
184 }
185 }
186 false
187 });
188 let data = if is_visible {
189 vfs_sync::with_global_storage(|gs| {
190 let storage_map = gs.borrow();
191 if let Some(db_storage) = storage_map.get(&storage.db_name) {
192 if let Some(data) = db_storage.get(&block_id) {
193 log::debug!("[test] Block {} found in global storage (sync, committed visible)", block_id);
194 return data.clone();
195 }
196 }
197 vec![0; BLOCK_SIZE]
198 })
199 } else {
200 log::debug!(
201 "[test] Block {} not visible due to commit gating (committed={}, treating as zeroed)",
202 block_id,
203 committed
204 );
205 vec![0; BLOCK_SIZE]
206 };
207
208 // Check if block is actually allocated before returning zeroed data
209 if !storage.allocated_blocks.contains(&block_id) && !is_visible {
210 let error = DatabaseError::new(
211 "BLOCK_NOT_FOUND",
212 &format!("Block {} not found in storage", block_id)
213 );
214 // Record error for observability
215 storage.observability.record_error(&error);
216 return Err(error);
217 }
218
219 storage.cache.insert(block_id, data.clone());
220 log::debug!("[test] Block {} cached from global storage (sync)", block_id);
221 // Verify checksum only if the block is visible under the commit marker
222 if is_visible {
223 if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
224 log::error!(
225 "[test] Checksum verification failed for block {} (test storage): {}",
226 block_id, e.message
227 );
228 storage.observability.record_error(&e);
229 return Err(e);
230 }
231 }
232 storage.touch_lru(block_id);
233 storage.evict_if_needed();
234 return Ok(data);
235 }
236 }
237
238/// Synchronous block write implementation
239pub fn write_block_sync_impl(storage: &mut BlockStorage, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
240 // Record IndexedDB write operation
241 #[cfg(feature = "telemetry")]
242 if let Some(ref metrics) = storage.metrics {
243 metrics.indexeddb_operations_total().inc();
244 }
245
246 storage.maybe_auto_sync();
247
248 // Check for backpressure conditions
249 let dirty_count = storage.get_dirty_count();
250 if dirty_count > 100 { // Threshold for backpressure
251 storage.observability.record_backpressure("high", "too_many_dirty_blocks");
252 }
253
254 if data.len() != BLOCK_SIZE {
255 return Err(DatabaseError::new(
256 "INVALID_BLOCK_SIZE",
257 &format!("Block size must be {} bytes, got {}", BLOCK_SIZE, data.len())
258 ));
259 }
260
261 // If requested by policy, verify existing data integrity BEFORE accepting the new write.
262 // This prevents overwriting a block whose prior contents no longer match the stored checksum.
263 let verify_before = storage
264 .policy
265 .as_ref()
266 .map(|p| p.verify_after_write)
267 .unwrap_or(false);
268 if verify_before {
269 #[cfg(not(target_arch = "wasm32"))]
270 {
271 if let Some(bytes) = storage.cache.get(&block_id).cloned() {
272 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
273 log::error!(
274 "verify_after_write: pre-write checksum verification failed for block {}: {}",
275 block_id, e.message
276 );
277 return Err(e);
278 }
279 }
280 }
281 #[cfg(target_arch = "wasm32")]
282 {
283 if let Some(bytes) = storage.cache.get(&block_id).cloned() {
284 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
285 log::error!(
286 "verify_after_write: pre-write checksum verification failed for block {}: {}",
287 block_id, e.message
288 );
289 return Err(e);
290 }
291 } else {
292 let maybe_bytes = vfs_sync::with_global_storage(|gs| {
293 let storage_map = gs.borrow();
294 storage_map
295 .get(&storage.db_name)
296 .and_then(|db| db.get(&block_id))
297 .cloned()
298 });
299 if let Some(bytes) = maybe_bytes {
300 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
301 log::error!(
302 "verify_after_write: pre-write checksum verification failed for block {}: {}",
303 block_id, e.message
304 );
305 return Err(e);
306 }
307 }
308 }
309 }
310 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
311 {
312 if let Some(bytes) = storage.cache.get(&block_id).cloned() {
313 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
314 log::error!(
315 "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
316 block_id, e.message
317 );
318 return Err(e);
319 }
320 } else {
321 let maybe_bytes = vfs_sync::with_global_storage(|gs| {
322 let storage_map = gs.borrow();
323 storage_map
324 .get(&storage.db_name)
325 .and_then(|db| db.get(&block_id))
326 .cloned()
327 });
328 if let Some(bytes) = maybe_bytes {
329 if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
330 log::error!(
331 "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
332 block_id, e.message
333 );
334 return Err(e);
335 }
336 }
337 }
338 }
339 }
340
341 // For WASM, immediately persist to global storage FIRST for cross-instance visibility
342 #[cfg(target_arch = "wasm32")]
343 {
344 // Check if this block already exists in global storage with committed data
345 let existing_data = vfs_sync::with_global_storage(|gs| {
346 let storage_map = gs.borrow();
347 if let Some(db_storage) = storage_map.get(&storage.db_name) {
348 db_storage.get(&block_id).cloned()
349 } else {
350 None
351 }
352 });
353
354 // Check if there's existing metadata for this block
355 let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
356 let meta_map = meta.borrow();
357 if let Some(db_meta) = meta_map.get(&storage.db_name) {
358 if let Some(metadata) = db_meta.get(&block_id) {
359 // If version > 0, this block has been committed before
360 metadata.version > 0
361 } else {
362 false
363 }
364 } else {
365 false
366 }
367 });
368
369 // Only overwrite if there's no committed data or if this is a legitimate update
370 let should_write = if let Some(existing) = existing_data {
371 if has_committed_metadata {
372 // CRITICAL FIX: Always allow writes during transactions to ensure schema changes persist
373 true // Always allow writes when there's committed metadata
374 } else if existing.iter().zip(data.iter()).all(|(a, b)| a == b) {
375 // If the data is identical, skip the write
376 false
377 } else {
378 // Check if the new data is richer (has more non-zero bytes) than existing
379 let existing_non_zero = existing.iter().filter(|&&b| b != 0).count();
380 let new_non_zero = data.iter().filter(|&&b| b != 0).count();
381
382 if new_non_zero > existing_non_zero {
383 true
384 } else if new_non_zero < existing_non_zero {
385 false
386 } else {
387 true
388 }
389 }
390 } else {
391 // Check if there's committed data in global storage that we haven't seen yet
392 let has_global_committed_data = vfs_sync::with_global_metadata(|meta| {
393 let meta_map = meta.borrow();
394 if let Some(db_meta) = meta_map.get(&storage.db_name) {
395 if let Some(metadata) = db_meta.get(&block_id) {
396 metadata.version > 0
397 } else {
398 false
399 }
400 } else {
401 false
402 }
403 });
404
405 if has_global_committed_data {
406 true // Allow transactional writes even when committed data exists
407 } else {
408 // No existing data and no committed metadata, safe to write
409 true
410 }
411 };
412
413 if should_write {
414 vfs_sync::with_global_storage(|gs| {
415 let mut storage_map = gs.borrow_mut();
416 let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
417
418 // Log what we're about to write vs what exists
419 // Block overwrite (debug logging removed for performance)
420
421 db_storage.insert(block_id, data.clone());
422 });
423 }
424
425 // Always ensure metadata exists for the block, and UPDATE checksum if we wrote new data
426 vfs_sync::with_global_metadata(|meta| {
427 let mut meta_map = meta.borrow_mut();
428 let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
429
430 // Calculate checksum for the data that will be stored (either new or existing)
431 let stored_data = if should_write {
432 data.clone()
433 } else {
434 // Use existing data from global storage
435 vfs_sync::with_global_storage(|gs| {
436 let storage_map = gs.borrow();
437 if let Some(db_storage) = storage_map.get(&storage.db_name) {
438 if let Some(existing) = db_storage.get(&block_id) {
439 existing.clone()
440 } else {
441 data.clone() // Fallback to new data
442 }
443 } else {
444 data.clone() // Fallback to new data
445 }
446 })
447 };
448
449 let checksum = {
450 let mut hasher = crc32fast::Hasher::new();
451 hasher.update(&stored_data);
452 hasher.finalize() as u64
453 };
454
455 // If metadata exists, preserve the version number but update the checksum
456 let version = if let Some(existing_meta) = db_meta.get(&block_id) {
457 existing_meta.version
458 } else {
459 1 // Start at version 1 so uncommitted data is hidden (commit marker starts at 0)
460 };
461
462 db_meta.insert(block_id, BlockMetadataPersist {
463 checksum,
464 version,
465 last_modified_ms: 0, // Will be updated during sync
466 algo: ChecksumAlgorithm::CRC32,
467 });
468 });
469
470 // Also create/update metadata for native test path
471 #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
472 GLOBAL_METADATA_TEST.with(|meta| {
473 let mut meta_map = meta.borrow_mut();
474 let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
475
476 // Calculate checksum for the data that will be stored (either new or existing)
477 let stored_data = if should_write {
478 data.clone()
479 } else {
480 // Use existing data from global test storage
481 vfs_sync::with_global_storage(|gs| {
482 let storage_map = gs.borrow();
483 if let Some(db_storage) = storage_map.get(&storage.db_name) {
484 if let Some(existing) = db_storage.get(&block_id) {
485 existing.clone()
486 } else {
487 data.clone() // Fallback to new data
488 }
489 } else {
490 data.clone() // Fallback to new data
491 }
492 })
493 };
494
495 let checksum = {
496 let mut hasher = crc32fast::Hasher::new();
497 hasher.update(&stored_data);
498 hasher.finalize() as u64
499 };
500
501 // If metadata exists, preserve the version number but update the checksum
502 let version = if let Some(existing_meta) = db_meta.get(&block_id) {
503 existing_meta.version
504 } else {
505 1 // Start at version 1 so uncommitted data is hidden (commit marker starts at 0)
506 };
507
508 db_meta.insert(block_id, BlockMetadataPersist {
509 checksum,
510 version,
511 last_modified_ms: 0, // Will be updated during sync
512 algo: ChecksumAlgorithm::CRC32,
513 });
514 log::debug!("Updated test metadata for block {} with checksum {} (version {})", block_id, checksum, version);
515 });
516 }
517
518 // Update cache and mark as dirty
519 storage.cache.insert(block_id, data.clone());
520 {
521 let mut dirty = storage.dirty_blocks.lock();
522 dirty.insert(block_id, data);
523 }
524 // Update checksum metadata on write
525 if let Some(bytes) = storage.cache.get(&block_id) {
526 storage.checksum_manager.store_checksum(block_id, bytes);
527 }
528 // Record write time for debounce tracking (native)
529 #[cfg(not(target_arch = "wasm32"))]
530 {
531 storage.last_write_ms.store(BlockStorage::now_millis(), Ordering::SeqCst);
532 }
533
534 // Policy-based triggers: thresholds
535 let (max_dirty_opt, max_bytes_opt) = storage
536 .policy
537 .as_ref()
538 .map(|p| (p.max_dirty, p.max_dirty_bytes))
539 .unwrap_or((None, None));
540
541 let mut threshold_reached = false;
542 if let Some(max_dirty) = max_dirty_opt {
543 let cur = storage.dirty_blocks.lock().len();
544 if cur >= max_dirty { threshold_reached = true; }
545 }
546 if let Some(max_bytes) = max_bytes_opt {
547 let cur_bytes: usize = {
548 let m = storage.dirty_blocks.lock();
549 m.values().map(|v| v.len()).sum()
550 };
551 if cur_bytes >= max_bytes { threshold_reached = true; }
552 }
553
554 if threshold_reached {
555 let debounce_ms_opt = storage.policy.as_ref().and_then(|p| p.debounce_ms);
556 if let Some(_debounce) = debounce_ms_opt {
557 // Debounce enabled: mark threshold and let debounce thread flush after inactivity
558 #[cfg(not(target_arch = "wasm32"))]
559 {
560 storage.threshold_hit.store(true, Ordering::SeqCst);
561 }
562 } else {
563 // No debounce: flush immediately
564 let _ = storage.sync_now();
565 }
566 }
567
568 storage.touch_lru(block_id);
569 storage.evict_if_needed();
570
571 // Update storage and cache size gauges
572 #[cfg(feature = "telemetry")]
573 if let Some(ref metrics) = storage.metrics {
574 // Update storage bytes gauge
575 let total_bytes: usize = storage.cache.values().map(|v| v.len()).sum();
576 metrics.storage_bytes().set(total_bytes as f64);
577
578 // Update cache size bytes gauge
579 let cache_bytes: usize = storage.cache.len() * BLOCK_SIZE;
580 metrics.cache_size_bytes().set(cache_bytes as f64);
581 }
582
583 Ok(())
584}
585