absurder_sql/storage/
import.rs

1//! Import functionality for SQLite databases
2//!
3//! This module handles importing SQLite .db files into the block-based storage system.
4
5use super::block_storage::BLOCK_SIZE;
6use super::export::validate_sqlite_file;
7use crate::types::DatabaseError;
8
9/// Clear all storage data for a specific database
10///
11/// Removes all blocks, metadata, commit markers, and allocation maps for the specified
12/// database from global storage. This is a destructive operation used primarily before
13/// importing a new database.
14///
15/// # Arguments
16/// * `db_name` - Name of the database to clear
17///
18/// # Returns
19/// * `Ok(())` - Storage cleared successfully
20/// * `Err(DatabaseError)` - If clearing fails
21///
22/// # Safety
23/// This operation clears data from global storage but does not affect:
24/// - Open database connections (they may still reference cleared data)
25/// - IndexedDB persistence (for WASM, requires separate clearing)
26///
27/// # Example
28/// ```rust,no_run
29/// use absurder_sql::storage::import::clear_database_storage;
30///
31/// # async fn example() -> Result<(), absurder_sql::types::DatabaseError> {
32/// // Clear all data for "mydb"
33/// clear_database_storage("mydb").await?;
34/// # Ok(())
35/// # }
36/// ```
37pub async fn clear_database_storage(db_name: &str) -> Result<(), DatabaseError> {
38    use super::vfs_sync::{
39        with_global_allocation_map, with_global_commit_marker, with_global_storage,
40    };
41
42    log::info!("Clearing storage for database: {}", db_name);
43
44    // CRITICAL: Remove from STORAGE_REGISTRY so a fresh BlockStorage is created on next open
45    // This prevents stale state (local cache, dirty_blocks) from being reused
46    #[cfg(target_arch = "wasm32")]
47    {
48        crate::vfs::indexeddb_vfs::remove_storage_from_registry(db_name);
49        log::debug!("Removed {} from STORAGE_REGISTRY", db_name);
50    }
51
52    // CRITICAL: Force close connection pool entry to reset SQLite internal state
53    // Without this, SQLite may use cached pages from the old database
54    #[cfg(target_arch = "wasm32")]
55    {
56        let pool_key = db_name.trim_end_matches(".db");
57        crate::connection_pool::force_close_connection(pool_key);
58        log::debug!("Force closed connection pool for {}", pool_key);
59    }
60
61    // Clear GLOBAL_STORAGE blocks
62    with_global_storage(|gs| {
63        let mut storage = gs.borrow_mut();
64        if let Some(blocks) = storage.get_mut(db_name) {
65            let count = blocks.len();
66            blocks.clear();
67            log::debug!(
68                "Cleared {} blocks from GLOBAL_STORAGE for {}",
69                count,
70                db_name
71            );
72        }
73        // Remove the database entry entirely
74        storage.remove(db_name);
75    });
76
77    // Clear metadata - platform specific
78    #[cfg(target_arch = "wasm32")]
79    {
80        use super::vfs_sync::with_global_metadata;
81        with_global_metadata(|gm| {
82            let mut metadata = gm.borrow_mut();
83            if let Some(meta) = metadata.get_mut(db_name) {
84                let count = meta.len();
85                meta.clear();
86                log::debug!("Cleared {} metadata entries for {} (WASM)", count, db_name);
87            }
88            metadata.remove(db_name);
89        });
90    }
91
92    #[cfg(all(
93        not(target_arch = "wasm32"),
94        any(test, debug_assertions),
95        not(feature = "fs_persist")
96    ))]
97    {
98        use super::block_storage::GLOBAL_METADATA_TEST;
99        GLOBAL_METADATA_TEST.with(|gm| {
100            #[cfg(target_arch = "wasm32")]
101            let metadata = gm;
102            #[cfg(not(target_arch = "wasm32"))]
103            let mut metadata = gm.lock();
104            if let Some(meta) = metadata.get_mut(db_name) {
105                let count = meta.len();
106                meta.clear();
107                log::debug!(
108                    "Cleared {} metadata entries from GLOBAL_METADATA_TEST for {} (native test)",
109                    count,
110                    db_name
111                );
112            }
113            metadata.remove(db_name);
114        });
115    }
116
117    // Reset GLOBAL_COMMIT_MARKER
118    with_global_commit_marker(|gcm| {
119        let mut markers = gcm.borrow_mut();
120        if markers.contains_key(db_name) {
121            markers.insert(db_name.to_string(), 0);
122            log::debug!("Reset commit marker for {}", db_name);
123        }
124        markers.remove(db_name);
125    });
126
127    // Clear GLOBAL_ALLOCATION_MAP
128    with_global_allocation_map(|gam| {
129        let mut alloc = gam.borrow_mut();
130        if let Some(ids) = alloc.get_mut(db_name) {
131            let count = ids.len();
132            ids.clear();
133            log::debug!("Cleared {} allocation IDs for {}", count, db_name);
134        }
135        alloc.remove(db_name);
136    });
137
138    // For WASM, also clear IndexedDB (if needed)
139    #[cfg(target_arch = "wasm32")]
140    {
141        // Note: IndexedDB clearing would be done via JavaScript
142        // The VFS layer will handle actual persistence clearing
143        log::debug!(
144            "WASM: In-memory storage cleared for {}. IndexedDB clearing requires VFS interaction.",
145            db_name
146        );
147    }
148
149    log::info!("Storage cleared successfully for: {}", db_name);
150    Ok(())
151}
152
153/// Import SQLite database from bytes into BlockStorage
154///
155/// Takes a complete SQLite .db file and imports it into the block-based storage system.
156/// This is the inverse of `export_database_to_bytes()`.
157///
158/// # Arguments
159/// * `db_name` - Name of the database to import into
160/// * `data` - Complete SQLite database file as bytes
161///
162/// # Returns
163/// * `Ok(())` - Import successful
164/// * `Err(DatabaseError)` - If validation or import fails
165///
166/// # Process
167/// 1. Validate SQLite file format
168/// 2. Clear existing storage for the database
169/// 3. Split data into BLOCK_SIZE (4096-byte) chunks
170/// 4. Pad last block with zeros if needed
171/// 5. Write all blocks to GLOBAL_STORAGE
172/// 6. Update allocation map
173///
174/// # Example
175/// ```rust,no_run
176/// use absurder_sql::storage::import::import_database_from_bytes;
177///
178/// # async fn example() -> Result<(), absurder_sql::types::DatabaseError> {
179/// let db_bytes = std::fs::read("mydb.db").unwrap();
180/// import_database_from_bytes("mydb", db_bytes).await?;
181/// # Ok(())
182/// # }
183/// ```
184pub async fn import_database_from_bytes(db_name: &str, data: Vec<u8>) -> Result<(), DatabaseError> {
185    use super::vfs_sync::{with_global_allocation_map, with_global_storage};
186    use std::collections::{HashMap, HashSet};
187
188    log::info!(
189        "Starting database import for: {} ({} bytes)",
190        db_name,
191        data.len()
192    );
193
194    // Step 1: Validate SQLite file format
195    validate_sqlite_file(&data)?;
196    log::debug!("SQLite file validation passed");
197
198    // Step 2: Clear existing storage from memory (this also does registry and connection pool cleanup)
199    clear_database_storage(db_name).await?;
200    log::debug!("Existing storage cleared from memory");
201
202    // Step 3: Delete ALL old blocks from IndexedDB (without needing to know block IDs)
203    // This is critical because close() clears GLOBAL_ALLOCATION_MAP, so we can't rely on it
204    // to know which blocks exist. Instead, scan IndexedDB directly.
205    #[cfg(target_arch = "wasm32")]
206    {
207        log::debug!(
208            "Deleting all existing blocks from IndexedDB for: {}",
209            db_name
210        );
211        super::wasm_indexeddb::delete_all_database_blocks_from_indexeddb(db_name).await?;
212        log::debug!("All old blocks deleted from IndexedDB");
213    }
214
215    // Step 4: Split data into BLOCK_SIZE chunks
216    let total_blocks = data.len().div_ceil(BLOCK_SIZE);
217    log::debug!(
218        "Splitting {} bytes into {} blocks of {} bytes",
219        data.len(),
220        total_blocks,
221        BLOCK_SIZE
222    );
223
224    let mut blocks = HashMap::new();
225    let mut allocated_ids = HashSet::new();
226
227    for block_id in 0..total_blocks {
228        let start = block_id * BLOCK_SIZE;
229        let end = std::cmp::min(start + BLOCK_SIZE, data.len());
230
231        let mut block_data = Vec::with_capacity(BLOCK_SIZE);
232        block_data.extend_from_slice(&data[start..end]);
233
234        // Step 4: Pad last block with zeros if needed
235        if block_data.len() < BLOCK_SIZE {
236            let padding = BLOCK_SIZE - block_data.len();
237            block_data.resize(BLOCK_SIZE, 0);
238            log::debug!(
239                "Block {} padded with {} zero bytes ({} -> {} bytes)",
240                block_id,
241                padding,
242                end - start,
243                BLOCK_SIZE
244            );
245        }
246
247        blocks.insert(block_id as u64, block_data);
248        allocated_ids.insert(block_id as u64);
249    }
250
251    log::debug!("Created {} blocks for import", blocks.len());
252
253    // Step 5: Write blocks to GLOBAL_STORAGE
254    with_global_storage(|gs| {
255        gs.borrow_mut().insert(db_name.to_string(), blocks.clone());
256    });
257
258    log::debug!("Blocks written to GLOBAL_STORAGE");
259
260    // Step 6: Update allocation map
261    with_global_allocation_map(|gam| {
262        gam.borrow_mut()
263            .insert(db_name.to_string(), allocated_ids.clone());
264    });
265
266    log::debug!("Allocation map updated");
267
268    // Step 7: Set up metadata for imported blocks (for visibility tracking)
269    // This ensures imported blocks are immediately visible when read
270
271    // For WASM, set up metadata in global storage
272    #[cfg(target_arch = "wasm32")]
273    {
274        use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm, ChecksumManager};
275        use super::vfs_sync::with_global_metadata;
276
277        with_global_metadata(|gm| {
278            let mut db_metadata = std::collections::HashMap::new();
279
280            for block_id in allocated_ids.iter() {
281                // Calculate checksum for each block using CRC32 (standard algorithm)
282                let checksum = if let Some(block_data) = blocks.get(block_id) {
283                    ChecksumManager::compute_checksum_with(block_data, ChecksumAlgorithm::CRC32)
284                } else {
285                    0
286                };
287
288                db_metadata.insert(
289                    *block_id,
290                    BlockMetadataPersist {
291                        version: 1, // All imported blocks start at version 1
292                        checksum,
293                        last_modified_ms: 0,
294                        algo: ChecksumAlgorithm::CRC32,
295                    },
296                );
297            }
298
299            gm.borrow_mut().insert(db_name.to_string(), db_metadata);
300        });
301
302        log::debug!(
303            "Metadata created for {} blocks in global storage (WASM)",
304            allocated_ids.len()
305        );
306    }
307
308    // For native tests (without fs_persist), use GLOBAL_METADATA_TEST directly
309    #[cfg(all(
310        not(target_arch = "wasm32"),
311        any(test, debug_assertions),
312        not(feature = "fs_persist")
313    ))]
314    {
315        use super::block_storage::GLOBAL_METADATA_TEST;
316        use super::metadata::{BlockMetadataPersist, ChecksumAlgorithm, ChecksumManager};
317
318        GLOBAL_METADATA_TEST.with(|gm| {
319            #[cfg(target_arch = "wasm32")]
320            let metadata = gm;
321            #[cfg(not(target_arch = "wasm32"))]
322            let mut metadata = gm.lock();
323            let mut db_metadata = std::collections::HashMap::new();
324
325            for block_id in allocated_ids.iter() {
326                // Calculate checksum for each block using CRC32 (standard algorithm)
327                let checksum = if let Some(block_data) = blocks.get(block_id) {
328                    ChecksumManager::compute_checksum_with(block_data, ChecksumAlgorithm::CRC32)
329                } else {
330                    0
331                };
332
333                db_metadata.insert(
334                    *block_id,
335                    BlockMetadataPersist {
336                        version: 1, // All imported blocks start at version 1
337                        checksum,
338                        last_modified_ms: 0,
339                        algo: ChecksumAlgorithm::CRC32,
340                    },
341                );
342            }
343
344            metadata.insert(db_name.to_string(), db_metadata);
345        });
346
347        log::debug!(
348            "Metadata created for {} blocks in GLOBAL_METADATA_TEST (native test)",
349            allocated_ids.len()
350        );
351    }
352
353    // For fs_persist (including tests), write metadata to filesystem
354    #[cfg(all(not(target_arch = "wasm32"), feature = "fs_persist"))]
355    {
356        use super::metadata::{ChecksumAlgorithm, ChecksumManager};
357        use std::path::PathBuf;
358
359        let base_path =
360            std::env::var("ABSURDERSQL_FS_BASE").unwrap_or_else(|_| "./test_storage".to_string());
361        let mut meta_path = PathBuf::from(&base_path);
362        meta_path.push(db_name);
363
364        // Create directory if needed
365        if let Err(e) = std::fs::create_dir_all(&meta_path) {
366            log::warn!("Failed to create metadata directory during import: {}", e);
367        }
368
369        meta_path.push("metadata.json");
370
371        // Build metadata structure
372        let mut meta_entries = Vec::new();
373        for block_id in allocated_ids.iter() {
374            let checksum = if let Some(block_data) = blocks.get(block_id) {
375                ChecksumManager::compute_checksum_with(block_data, ChecksumAlgorithm::CRC32)
376            } else {
377                0
378            };
379
380            meta_entries.push((
381                *block_id,
382                super::metadata::BlockMetadataPersist {
383                    version: 1,
384                    checksum,
385                    last_modified_ms: 0,
386                    algo: ChecksumAlgorithm::CRC32,
387                },
388            ));
389        }
390
391        let meta_json = serde_json::json!({
392            "entries": meta_entries,
393        });
394
395        if let Err(e) = std::fs::write(
396            &meta_path,
397            serde_json::to_string_pretty(&meta_json).unwrap(),
398        ) {
399            log::warn!("Failed to write metadata during import: {}", e);
400        } else {
401            log::debug!(
402                "Metadata written to filesystem for {} blocks",
403                allocated_ids.len()
404            );
405        }
406
407        // Write block data files to filesystem
408        let mut blocks_dir = PathBuf::from(&base_path);
409        blocks_dir.push(db_name);
410        blocks_dir.push("blocks");
411
412        if let Err(e) = std::fs::create_dir_all(&blocks_dir) {
413            log::warn!("Failed to create blocks directory during import: {}", e);
414        }
415
416        for (block_id, block_data) in blocks.iter() {
417            let mut block_path = blocks_dir.clone();
418            block_path.push(format!("block_{}.bin", block_id));
419
420            if let Err(e) = std::fs::write(&block_path, block_data) {
421                log::warn!("Failed to write block {} during import: {}", block_id, e);
422            }
423        }
424
425        log::debug!("Wrote {} block files to filesystem", blocks.len());
426
427        // Write allocations.json
428        let mut alloc_path = PathBuf::from(&base_path);
429        alloc_path.push(db_name);
430        alloc_path.push("allocations.json");
431
432        let alloc_json = serde_json::json!({
433            "allocated": allocated_ids.iter().copied().collect::<Vec<_>>(),
434        });
435
436        if let Err(e) = std::fs::write(
437            &alloc_path,
438            serde_json::to_string_pretty(&alloc_json).unwrap(),
439        ) {
440            log::warn!("Failed to write allocations during import: {}", e);
441        } else {
442            log::debug!("Allocations written to filesystem");
443        }
444    }
445
446    // Step 8: Set commit marker to 1 to make all imported blocks visible
447    use super::vfs_sync::with_global_commit_marker;
448    with_global_commit_marker(|gcm| {
449        gcm.borrow_mut().insert(db_name.to_string(), 1);
450    });
451
452    log::debug!("Commit marker set to 1 for immediate visibility");
453
454    // Step 10: For WASM, sync imported data to IndexedDB immediately and WAIT for it
455    #[cfg(target_arch = "wasm32")]
456    {
457        log::debug!("Syncing imported data to IndexedDB for {}", db_name);
458
459        // Advance commit marker
460        let next_commit = with_global_commit_marker(|cm| {
461            let current = cm.borrow().get(db_name).copied().unwrap_or(0);
462            let new_marker = current + 1;
463            cm.borrow_mut().insert(db_name.to_string(), new_marker);
464            new_marker
465        });
466
467        // Collect blocks and metadata to persist
468        let (blocks_to_persist, metadata_to_persist) = {
469            use super::vfs_sync::with_global_metadata;
470            with_global_storage(|storage| {
471                let blocks = if let Some(db_storage) = storage.borrow().get(db_name) {
472                    db_storage
473                        .iter()
474                        .map(|(&id, data)| (id, data.clone()))
475                        .collect::<Vec<_>>()
476                } else {
477                    Vec::new()
478                };
479
480                let metadata = with_global_metadata(|meta| {
481                    if let Some(db_meta) = meta.borrow().get(db_name) {
482                        db_meta
483                            .iter()
484                            .map(|(&id, metadata)| (id, metadata.checksum))
485                            .collect::<Vec<_>>()
486                    } else {
487                        Vec::new()
488                    }
489                });
490
491                (blocks, metadata)
492            })
493        };
494
495        if !blocks_to_persist.is_empty() {
496            log::debug!(
497                "Persisting {} blocks to IndexedDB with commit marker {}",
498                blocks_to_persist.len(),
499                next_commit
500            );
501
502            // CRITICAL: AWAIT the persistence to complete BEFORE returning
503            super::wasm_indexeddb::persist_to_indexeddb_event_based(
504                db_name,
505                blocks_to_persist,
506                metadata_to_persist,
507                next_commit,
508                #[cfg(feature = "telemetry")]
509                None,
510                #[cfg(feature = "telemetry")]
511                None,
512            )
513            .await
514            .map_err(|e| {
515                log::error!("Failed to persist imported data to IndexedDB: {}", e);
516                DatabaseError::new(
517                    "IMPORT_SYNC_FAILED",
518                    &format!("Failed to persist imported data: {}", e),
519                )
520            })?;
521
522            log::debug!("Import sync to IndexedDB complete for {}", db_name);
523        } else {
524            log::warn!("No blocks to persist to IndexedDB for {}", db_name);
525        }
526    }
527
528    log::info!(
529        "Database import complete: {} ({} blocks, {} bytes)",
530        db_name,
531        total_blocks,
532        data.len()
533    );
534
535    // DEBUG: Log what blocks were actually imported
536    #[cfg(target_arch = "wasm32")]
537    {
538        use super::vfs_sync::with_global_storage;
539        with_global_storage(|storage_map| {
540            if let Some(db_storage) = storage_map.borrow().get(db_name) {
541                web_sys::console::log_1(
542                    &format!(
543                        "[IMPORT] GLOBAL_STORAGE now has {} blocks for {}",
544                        db_storage.len(),
545                        db_name
546                    )
547                    .into(),
548                );
549                for (block_id, data) in db_storage.iter().take(5) {
550                    web_sys::console::log_1(
551                        &format!(
552                            "[IMPORT] Block {} has {} bytes, first 16: {:02x?}",
553                            block_id,
554                            data.len(),
555                            &data[..16.min(data.len())]
556                        )
557                        .into(),
558                    );
559                }
560            }
561        });
562    }
563
564    Ok(())
565}
566
567/// Invalidate BlockStorage caches for a specific database
568///
569/// This function removes the BlockStorage from the registry, forcing a fresh
570/// instance to be created on next open. This ensures no stale cached data
571/// is read after an import operation.
572///
573/// # Arguments
574/// * `db_name` - Name of the database whose caches should be invalidated
575///
576/// # Example
577/// ```rust
578/// use absurder_sql::storage::import::invalidate_block_storage_caches;
579///
580/// // After importing a database, clear caches
581/// invalidate_block_storage_caches("mydb");
582/// ```
583pub fn invalidate_block_storage_caches(db_name: &str) {
584    #[cfg(target_arch = "wasm32")]
585    {
586        crate::vfs::indexeddb_vfs::remove_storage_from_registry(db_name);
587        log::info!("Removed BlockStorage from registry for: {}", db_name);
588    }
589
590    #[cfg(not(target_arch = "wasm32"))]
591    {
592        log::info!(
593            "Cache invalidation for native not yet implemented for: {}",
594            db_name
595        );
596    }
597}