absurder_sql/storage/
io_operations.rs

1//! I/O operations for BlockStorage
2//! This module contains block reading and writing functionality
3
4#[cfg(not(target_arch = "wasm32"))]
5use std::sync::atomic::Ordering;
6use crate::types::DatabaseError;
7use super::block_storage::{BlockStorage, BLOCK_SIZE};
8
9#[cfg(any(target_arch = "wasm32", all(not(target_arch = "wasm32"), any(test, debug_assertions))))]
10use super::vfs_sync;
11
12#[cfg(target_arch = "wasm32")]
13use std::collections::HashMap;
14#[cfg(target_arch = "wasm32")]
15use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm};
16
17#[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
18use std::{fs, io::Read, path::PathBuf};
19
20#[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
21use super::block_storage::GLOBAL_METADATA_TEST;
22
23/// Synchronous block read implementation
24pub fn read_block_sync_impl(storage: &mut BlockStorage, block_id: u64) -> Result<Vec<u8>, DatabaseError> {
25        // Skip auto_sync check for reads - only writes trigger sync
26        
27        // Check cache first (both native and WASM)
28        if let Some(data) = storage.cache.get(&block_id).cloned() {
29            // Record cache hit
30            #[cfg(feature = "telemetry")]
31            if let Some(ref metrics) = storage.metrics {
32                metrics.cache_hits().inc();
33            }
34            // Verify checksum even for cached data to catch corruption
35            // Skip block 0 as it's the SQLite header which can be modified by SQLite
36            if block_id != 0 {
37                if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
38                    return Err(e);
39                }
40            }
41            // Only update LRU when close to capacity to avoid O(n) overhead on every read
42            // This maintains correctness for eviction while optimizing hot-path performance
43            if storage.cache.len() > (storage.capacity * 4 / 5) {
44                storage.touch_lru(block_id);
45            }
46            
47            return Ok(data);
48        }
49        
50        // Record cache miss
51        #[cfg(feature = "telemetry")]
52        if let Some(ref metrics) = storage.metrics {
53            metrics.cache_misses().inc();
54            metrics.indexeddb_operations_total().inc();
55        }
56
57        // For WASM, check global storage for persistence across instances
58        #[cfg(target_arch = "wasm32")]
59        {
60            // Single combined lookup for commit marker, visibility, and data
61            let (data, is_visible) = vfs_sync::with_global_commit_marker(|cm| {
62                let committed = cm.borrow().get(&storage.db_name).copied().unwrap_or(0);
63                
64                // Block 0 (database header) is always visible
65                if block_id == 0 {
66                    let data = vfs_sync::with_global_storage(|gs| {
67                        gs.borrow()
68                            .get(&storage.db_name)
69                            .and_then(|db_storage| db_storage.get(&block_id))
70                            .cloned()
71                            .unwrap_or_else(|| vec![0; BLOCK_SIZE])
72                    });
73                    return (data, true);
74                }
75                
76                // For other blocks, check visibility and get data in one pass
77                vfs_sync::with_global_metadata(|meta| {
78                    let has_metadata = meta.borrow()
79                        .get(&storage.db_name)
80                        .and_then(|db_meta| db_meta.get(&block_id))
81                        .is_some();
82                    
83                    if has_metadata {
84                        // Has metadata - check if visible based on commit marker
85                        let is_visible = meta.borrow()
86                            .get(&storage.db_name)
87                            .and_then(|db_meta| db_meta.get(&block_id))
88                            .map(|m| (m.version as u64) <= committed)
89                            .unwrap_or(false);
90                        
91                        if is_visible {
92                            // Visible - return actual data
93                            let data = vfs_sync::with_global_storage(|gs| {
94                                gs.borrow()
95                                    .get(&storage.db_name)
96                                    .and_then(|db_storage| db_storage.get(&block_id))
97                                    .cloned()
98                                    .unwrap_or_else(|| vec![0; BLOCK_SIZE])
99                            });
100                            (data, true)
101                        } else {
102                            // Not visible (version > commit marker) - return zeroed data for SQLite
103                            (vec![0; BLOCK_SIZE], false)
104                        }
105                    } else {
106                        // No metadata - check if data exists in global storage
107                        let data = vfs_sync::with_global_storage(|gs| {
108                            gs.borrow()
109                                .get(&storage.db_name)
110                                .and_then(|db_storage| db_storage.get(&block_id))
111                                .cloned()
112                        });
113                        
114                        match data {
115                            Some(data) => (data, true), // Old data before metadata tracking
116                            None => (vec![0; BLOCK_SIZE], true) // Return zeros for RMW (read-modify-write)
117                        }
118                    }
119                })
120            });
121            
122            // Verify checksum ONLY for visible blocks in WASM
123            // Skip block 0 as it's the SQLite header which can be modified by SQLite
124            if is_visible && block_id != 0 {
125                if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
126                    return Err(e);
127                }
128            }
129            
130            // Cache for future reads (skip eviction check for performance)
131            storage.cache.insert(block_id, data.clone());
132            return Ok(data);
133        }
134
135        // For native fs_persist, read from filesystem if allocated
136        #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
137        {
138            let base: PathBuf = storage.base_dir.clone();
139            let mut dir = base.clone();
140            dir.push(&storage.db_name);
141            let mut blocks = dir.clone();
142            blocks.push("blocks");
143            let mut block_path = blocks.clone();
144            block_path.push(format!("block_{}.bin", block_id));
145            // If the block was explicitly deallocated (tombstoned), refuse reads
146            if storage.deallocated_blocks.contains(&block_id) {
147                return Err(DatabaseError::new(
148                    "BLOCK_NOT_ALLOCATED",
149                    &format!("Block {} is not allocated", block_id),
150                ));
151            }
152            if let Ok(mut f) = fs::File::open(&block_path) {
153                let mut data = vec![0u8; BLOCK_SIZE];
154                f.read_exact(&mut data).map_err(|e| DatabaseError::new("IO_ERROR", &format!("read block {} failed: {}", block_id, e)))?;
155                storage.cache.insert(block_id, data.clone());
156                if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) { return Err(e); }
157                storage.touch_lru(block_id);
158                storage.evict_if_needed();
159                return Ok(data);
160            }
161            // If file missing, treat as zeroed data (compat). This covers never-written blocks
162            // and avoids depending on allocated_blocks for read behavior.
163            let data = vec![0; BLOCK_SIZE];
164            storage.cache.insert(block_id, data.clone());
165            if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) { return Err(e); }
166            storage.touch_lru(block_id);
167            storage.evict_if_needed();
168            return Ok(data);
169        }
170
171        // For native tests, check test-global storage for persistence across instances (when fs_persist disabled)
172        #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
173        {
174            // Enforce commit gating in native test path as well
175            let committed: u64 = vfs_sync::with_global_commit_marker(|cm| {
176                let cm = cm.borrow();
177                cm.get(&storage.db_name).copied().unwrap_or(0)
178            });
179            let is_visible: bool = GLOBAL_METADATA_TEST.with(|meta| {
180                let meta_map = meta.borrow();
181                if let Some(db_meta) = meta_map.get(&storage.db_name) {
182                    if let Some(m) = db_meta.get(&block_id) {
183                        return (m.version as u64) <= committed;
184                    }
185                }
186                false
187            });
188            let data = if is_visible {
189                vfs_sync::with_global_storage(|gs| {
190                    let storage_map = gs.borrow();
191                    if let Some(db_storage) = storage_map.get(&storage.db_name) {
192                        if let Some(data) = db_storage.get(&block_id) {
193                            log::debug!("[test] Block {} found in global storage (sync, committed visible)", block_id);
194                            return data.clone();
195                        }
196                    }
197                    vec![0; BLOCK_SIZE]
198                })
199            } else {
200                log::debug!(
201                    "[test] Block {} not visible due to commit gating (committed={}, treating as zeroed)",
202                    block_id,
203                    committed
204                );
205                vec![0; BLOCK_SIZE]
206            };
207
208            // Check if block is actually allocated before returning zeroed data
209            if !storage.allocated_blocks.contains(&block_id) && !is_visible {
210                let error = DatabaseError::new(
211                    "BLOCK_NOT_FOUND",
212                    &format!("Block {} not found in storage", block_id)
213                );
214                // Record error for observability
215                storage.observability.record_error(&error);
216                return Err(error);
217            }
218            
219            storage.cache.insert(block_id, data.clone());
220            log::debug!("[test] Block {} cached from global storage (sync)", block_id);
221            // Verify checksum only if the block is visible under the commit marker
222            if is_visible {
223                if let Err(e) = storage.verify_against_stored_checksum(block_id, &data) {
224                    log::error!(
225                        "[test] Checksum verification failed for block {} (test storage): {}",
226                        block_id, e.message
227                    );
228                    storage.observability.record_error(&e);
229                    return Err(e);
230                }
231            }
232            storage.touch_lru(block_id);
233            storage.evict_if_needed();
234            return Ok(data);
235        }
236    }
237
238/// Synchronous block write implementation
239pub fn write_block_sync_impl(storage: &mut BlockStorage, block_id: u64, data: Vec<u8>) -> Result<(), DatabaseError> {
240    // Record IndexedDB write operation
241    #[cfg(feature = "telemetry")]
242    if let Some(ref metrics) = storage.metrics {
243        metrics.indexeddb_operations_total().inc();
244    }
245    
246    storage.maybe_auto_sync();
247    
248    // Check for backpressure conditions
249    let dirty_count = storage.get_dirty_count();
250    if dirty_count > 100 { // Threshold for backpressure
251        storage.observability.record_backpressure("high", "too_many_dirty_blocks");
252    }
253            
254            if data.len() != BLOCK_SIZE {
255                return Err(DatabaseError::new(
256                    "INVALID_BLOCK_SIZE", 
257                    &format!("Block size must be {} bytes, got {}", BLOCK_SIZE, data.len())
258                ));
259            }
260    
261            // If requested by policy, verify existing data integrity BEFORE accepting the new write.
262            // This prevents overwriting a block whose prior contents no longer match the stored checksum.
263            let verify_before = storage
264                .policy
265                .as_ref()
266                .map(|p| p.verify_after_write)
267                .unwrap_or(false);
268            if verify_before {
269                #[cfg(not(target_arch = "wasm32"))]
270                {
271                    if let Some(bytes) = storage.cache.get(&block_id).cloned() {
272                        if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
273                            log::error!(
274                                "verify_after_write: pre-write checksum verification failed for block {}: {}",
275                                block_id, e.message
276                            );
277                            return Err(e);
278                        }
279                    }
280                }
281                #[cfg(target_arch = "wasm32")]
282                {
283                    if let Some(bytes) = storage.cache.get(&block_id).cloned() {
284                        if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
285                            log::error!(
286                                "verify_after_write: pre-write checksum verification failed for block {}: {}",
287                                block_id, e.message
288                            );
289                            return Err(e);
290                        }
291                    } else {
292                        let maybe_bytes = vfs_sync::with_global_storage(|gs| {
293                            let storage_map = gs.borrow();
294                            storage_map
295                                .get(&storage.db_name)
296                                .and_then(|db| db.get(&block_id))
297                                .cloned()
298                        });
299                        if let Some(bytes) = maybe_bytes {
300                            if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
301                                log::error!(
302                                    "verify_after_write: pre-write checksum verification failed for block {}: {}",
303                                    block_id, e.message
304                                );
305                                return Err(e);
306                            }
307                        }
308                    }
309                }
310                #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions)))]
311                {
312                    if let Some(bytes) = storage.cache.get(&block_id).cloned() {
313                        if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
314                            log::error!(
315                                "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
316                                block_id, e.message
317                            );
318                            return Err(e);
319                        }
320                    } else {
321                        let maybe_bytes = vfs_sync::with_global_storage(|gs| {
322                            let storage_map = gs.borrow();
323                            storage_map
324                                .get(&storage.db_name)
325                                .and_then(|db| db.get(&block_id))
326                                .cloned()
327                        });
328                        if let Some(bytes) = maybe_bytes {
329                            if let Err(e) = storage.verify_against_stored_checksum(block_id, &bytes) {
330                                log::error!(
331                                    "[test] verify_after_write: pre-write checksum verification failed for block {}: {}",
332                                    block_id, e.message
333                                );
334                                return Err(e);
335                            }
336                        }
337                    }
338                }
339            }
340    
341            // For WASM, immediately persist to global storage FIRST for cross-instance visibility
342            #[cfg(target_arch = "wasm32")]
343            {
344                // Check if this block already exists in global storage with committed data
345                let existing_data = vfs_sync::with_global_storage(|gs| {
346                    let storage_map = gs.borrow();
347                    if let Some(db_storage) = storage_map.get(&storage.db_name) {
348                        db_storage.get(&block_id).cloned()
349                    } else {
350                        None
351                    }
352                });
353                
354                // Check if there's existing metadata for this block
355                let has_committed_metadata = vfs_sync::with_global_metadata(|meta| {
356                    let meta_map = meta.borrow();
357                    if let Some(db_meta) = meta_map.get(&storage.db_name) {
358                        if let Some(metadata) = db_meta.get(&block_id) {
359                            // If version > 0, this block has been committed before
360                            metadata.version > 0
361                        } else {
362                            false
363                        }
364                    } else {
365                        false
366                    }
367                });
368                
369                // Only overwrite if there's no committed data or if this is a legitimate update
370                let should_write = if let Some(existing) = existing_data {
371                    if has_committed_metadata {
372                        // CRITICAL FIX: Always allow writes during transactions to ensure schema changes persist
373                        true  // Always allow writes when there's committed metadata
374                    } else if existing.iter().zip(data.iter()).all(|(a, b)| a == b) {
375                        // If the data is identical, skip the write
376                        false
377                    } else {
378                        // Check if the new data is richer (has more non-zero bytes) than existing
379                        let existing_non_zero = existing.iter().filter(|&&b| b != 0).count();
380                        let new_non_zero = data.iter().filter(|&&b| b != 0).count();
381                        
382                        if new_non_zero > existing_non_zero {
383                            true
384                        } else if new_non_zero < existing_non_zero {
385                            false
386                        } else {
387                            true
388                        }
389                    }
390                } else {
391                    // Check if there's committed data in global storage that we haven't seen yet
392                    let has_global_committed_data = vfs_sync::with_global_metadata(|meta| {
393                        let meta_map = meta.borrow();
394                        if let Some(db_meta) = meta_map.get(&storage.db_name) {
395                            if let Some(metadata) = db_meta.get(&block_id) {
396                                metadata.version > 0
397                            } else {
398                                false
399                            }
400                        } else {
401                            false
402                        }
403                    });
404                    
405                    if has_global_committed_data {
406                        true  // Allow transactional writes even when committed data exists
407                    } else {
408                        // No existing data and no committed metadata, safe to write
409                        true
410                    }
411                };
412                
413                if should_write {
414                    vfs_sync::with_global_storage(|gs| {
415                        let mut storage_map = gs.borrow_mut();
416                        let db_storage = storage_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
417                        
418                        // Log what we're about to write vs what exists
419                        // Block overwrite (debug logging removed for performance)
420                        
421                        db_storage.insert(block_id, data.clone());
422                    });
423                }
424                
425                // Always ensure metadata exists for the block, and UPDATE checksum if we wrote new data
426                vfs_sync::with_global_metadata(|meta| {
427                    let mut meta_map = meta.borrow_mut();
428                    let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
429                    
430                    // Calculate checksum for the data that will be stored (either new or existing)
431                    let stored_data = if should_write {
432                        data.clone()
433                    } else {
434                        // Use existing data from global storage
435                        vfs_sync::with_global_storage(|gs| {
436                            let storage_map = gs.borrow();
437                            if let Some(db_storage) = storage_map.get(&storage.db_name) {
438                                if let Some(existing) = db_storage.get(&block_id) {
439                                    existing.clone()
440                                } else {
441                                    data.clone() // Fallback to new data
442                                }
443                            } else {
444                                data.clone() // Fallback to new data
445                            }
446                        })
447                    };
448                    
449                    let checksum = {
450                        let mut hasher = crc32fast::Hasher::new();
451                        hasher.update(&stored_data);
452                        hasher.finalize() as u64
453                    };
454                    
455                    // If metadata exists, preserve the version number but update the checksum
456                    let version = if let Some(existing_meta) = db_meta.get(&block_id) {
457                        existing_meta.version
458                    } else {
459                        1  // Start at version 1 so uncommitted data is hidden (commit marker starts at 0)
460                    };
461                    
462                    db_meta.insert(block_id, BlockMetadataPersist {
463                        checksum,
464                        version,
465                        last_modified_ms: 0, // Will be updated during sync
466                        algo: ChecksumAlgorithm::CRC32,
467                    });
468                });
469                
470                // Also create/update metadata for native test path
471                #[cfg(all(not(target_arch = "wasm32"), any(test, debug_assertions), not(feature = "fs_persist")))]
472                GLOBAL_METADATA_TEST.with(|meta| {
473                    let mut meta_map = meta.borrow_mut();
474                    let db_meta = meta_map.entry(storage.db_name.clone()).or_insert_with(HashMap::new);
475                    
476                    // Calculate checksum for the data that will be stored (either new or existing)
477                    let stored_data = if should_write {
478                        data.clone()
479                    } else {
480                        // Use existing data from global test storage
481                        vfs_sync::with_global_storage(|gs| {
482                            let storage_map = gs.borrow();
483                            if let Some(db_storage) = storage_map.get(&storage.db_name) {
484                                if let Some(existing) = db_storage.get(&block_id) {
485                                    existing.clone()
486                                } else {
487                                    data.clone() // Fallback to new data
488                                }
489                            } else {
490                                data.clone() // Fallback to new data
491                            }
492                        })
493                    };
494                    
495                    let checksum = {
496                        let mut hasher = crc32fast::Hasher::new();
497                        hasher.update(&stored_data);
498                        hasher.finalize() as u64
499                    };
500                    
501                    // If metadata exists, preserve the version number but update the checksum
502                    let version = if let Some(existing_meta) = db_meta.get(&block_id) {
503                        existing_meta.version
504                    } else {
505                        1  // Start at version 1 so uncommitted data is hidden (commit marker starts at 0)
506                    };
507                    
508                    db_meta.insert(block_id, BlockMetadataPersist {
509                        checksum,
510                        version,
511                        last_modified_ms: 0, // Will be updated during sync
512                        algo: ChecksumAlgorithm::CRC32,
513                    });
514                    log::debug!("Updated test metadata for block {} with checksum {} (version {})", block_id, checksum, version);
515                });
516            }
517            
518            // Update cache and mark as dirty
519            storage.cache.insert(block_id, data.clone());
520            {
521                let mut dirty = storage.dirty_blocks.lock();
522                dirty.insert(block_id, data);
523            }
524            // Update checksum metadata on write
525            if let Some(bytes) = storage.cache.get(&block_id) {
526                storage.checksum_manager.store_checksum(block_id, bytes);
527            }
528            // Record write time for debounce tracking (native)
529            #[cfg(not(target_arch = "wasm32"))]
530            {
531                storage.last_write_ms.store(BlockStorage::now_millis(), Ordering::SeqCst);
532            }
533    
534            // Policy-based triggers: thresholds
535            let (max_dirty_opt, max_bytes_opt) = storage
536                .policy
537                .as_ref()
538                .map(|p| (p.max_dirty, p.max_dirty_bytes))
539                .unwrap_or((None, None));
540    
541            let mut threshold_reached = false;
542            if let Some(max_dirty) = max_dirty_opt {
543                let cur = storage.dirty_blocks.lock().len();
544                if cur >= max_dirty { threshold_reached = true; }
545            }
546            if let Some(max_bytes) = max_bytes_opt {
547                let cur_bytes: usize = {
548                    let m = storage.dirty_blocks.lock();
549                    m.values().map(|v| v.len()).sum()
550                };
551                if cur_bytes >= max_bytes { threshold_reached = true; }
552            }
553    
554            if threshold_reached {
555                let debounce_ms_opt = storage.policy.as_ref().and_then(|p| p.debounce_ms);
556                if let Some(_debounce) = debounce_ms_opt {
557                    // Debounce enabled: mark threshold and let debounce thread flush after inactivity
558                    #[cfg(not(target_arch = "wasm32"))]
559                    {
560                        storage.threshold_hit.store(true, Ordering::SeqCst);
561                    }
562                } else {
563                    // No debounce: flush immediately
564                    let _ = storage.sync_now();
565                }
566            }
567            
568            storage.touch_lru(block_id);
569            storage.evict_if_needed();
570            
571            // Update storage and cache size gauges
572            #[cfg(feature = "telemetry")]
573            if let Some(ref metrics) = storage.metrics {
574                // Update storage bytes gauge
575                let total_bytes: usize = storage.cache.values().map(|v| v.len()).sum();
576                metrics.storage_bytes().set(total_bytes as f64);
577                
578                // Update cache size bytes gauge
579                let cache_bytes: usize = storage.cache.len() * BLOCK_SIZE;
580                metrics.cache_size_bytes().set(cache_bytes as f64);
581            }
582            
583            Ok(())
584}
585