json_mutex_db/
lib.rs

1#![warn(clippy::pedantic)]
2use serde_json::Value;
3use std::cell::RefCell;
4use std::fs::{self, File, OpenOptions};
5use std::io::{BufWriter, Error as IoError, ErrorKind, Write};
6use std::path::Path;
7// Use std::sync::Mutex for RefUnwindSafe compatibility if needed, or stick to parking_lot otherwise
8use std::sync::Mutex; // Switched back in previous user code
9use std::thread;
10use tempfile::NamedTempFile;
11
12// Use crossbeam_channel for select! and unbounded/bounded channels
13use crossbeam_channel::{Receiver, RecvError, SendError, Sender, bounded, select, unbounded};
14
15// --- Communication types for async mode ---
16
17// Command sent from main thread to background thread
18enum BackgroundCommand {
19    // Request the current state, providing a channel to send the response back
20    GetState(Sender<Value>),
21    // Shut down the background thread gracefully
22    Shutdown,
23}
24
25// An update closure sent via a separate channel
26type UpdateTask = Box<dyn FnOnce(&mut Value) + Send>;
27
28// Error type for operations that might fail due to background thread issues
29#[derive(Debug)]
30pub enum DbError {
31    Io(IoError),
32    Sync(String), // Errors related to background thread communication
33}
34
35impl serde::Serialize for DbError {
36    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
37    where
38        S: serde::Serializer,
39    {
40        match self {
41            DbError::Io(err) => serializer.serialize_str(&format!("IoError: {err}")),
42            DbError::Sync(msg) => serializer.serialize_str(&format!("SyncError: {msg}")),
43        }
44    }
45}
46
47impl From<IoError> for DbError {
48    fn from(e: IoError) -> Self {
49        DbError::Io(e)
50    }
51}
52
53// Helper for channel send errors
54impl<T> From<SendError<T>> for DbError {
55    fn from(e: SendError<T>) -> Self {
56        DbError::Sync(format!("Failed to send command to background thread: {e}"))
57    }
58}
59
60// Helper for channel receive errors
61impl From<RecvError> for DbError {
62    fn from(e: RecvError) -> Self {
63        DbError::Sync(format!(
64            "Failed to receive response from background thread: {e}"
65        ))
66    }
67}
68
69// Thread-local buffer (optional, kept from previous version)
70thread_local! {
71    static SERIALIZE_BUF: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(1024 * 64));
72}
73
74pub struct JsonMutexDB {
75    // Shared state for both sync and async modes
76    path: String,
77    pretty: bool,
78    fast_serialization: bool,
79
80    // Mode-specific state
81    sync_data: Option<Mutex<Value>>, // Only used when async_updates is false
82    async_comm: Option<AsyncCommunicator>, // Only used when async_updates is true
83}
84
85// Holds channels and thread handle for async mode
86struct AsyncCommunicator {
87    command_tx: Sender<BackgroundCommand>,
88    update_tx: Sender<UpdateTask>,
89    update_handle: Option<thread::JoinHandle<()>>, // Option to allow taking during drop
90}
91
92// Implement UnwindSafe and RefUnwindSafe manually for JsonMutexDB
93impl std::panic::UnwindSafe for JsonMutexDB {}
94impl std::panic::RefUnwindSafe for JsonMutexDB {}
95
96impl JsonMutexDB {
97    /// Creates a new instance of `JsonMutexDB`.
98    ///
99    /// # Errors
100    ///
101    /// This function will return an error if the specified file cannot be read,
102    /// if the JSON data is invalid, or if there are issues with file I/O operations.
103    pub fn new(
104        path: &str,
105        pretty: bool,
106        async_updates: bool,
107        fast_serialization: bool,
108    ) -> Result<Self, DbError> {
109        // Load initial JSON data (common logic)
110        let initial_json = match fs::read_to_string(path) {
111            Ok(content) if content.trim().is_empty() => Value::Object(serde_json::Map::new()),
112            Ok(mut content) => unsafe {
113                simd_json::from_str::<Value>(content.as_mut_str()).map_err(|e| {
114                    DbError::Io(IoError::new(
115                        ErrorKind::InvalidData,
116                        format!("Invalid JSON (simd_json): {e}"),
117                    ))
118                })?
119            },
120            Err(err) if err.kind() == ErrorKind::NotFound => Value::Object(serde_json::Map::new()),
121            Err(err) => return Err(DbError::Io(err)),
122        };
123
124        let mut sync_data = None;
125        let mut async_comm = None;
126
127        if async_updates {
128            // --- Async Mode Setup ---
129            let (command_tx, command_rx) = unbounded::<BackgroundCommand>();
130            let (update_tx, update_rx) = unbounded::<UpdateTask>();
131
132            // Clone necessary info for the background thread
133            let bg_initial_data = initial_json; // No Mutex needed here
134            let bg_path = path.to_string();
135            let bg_pretty = pretty;
136            let bg_fast_serialization = fast_serialization;
137
138            let update_handle = thread::spawn(move || {
139                background_thread_loop(
140                    bg_initial_data,
141                    &bg_path,
142                    bg_pretty,
143                    bg_fast_serialization,
144                    &command_rx,
145                    &update_rx,
146                );
147            });
148
149            async_comm = Some(AsyncCommunicator {
150                command_tx,
151                update_tx,
152                update_handle: Some(update_handle),
153            });
154        } else {
155            // --- Sync Mode Setup ---
156            sync_data = Some(Mutex::new(initial_json));
157        }
158
159        Ok(JsonMutexDB {
160            path: path.to_string(),
161            pretty,
162            fast_serialization,
163            sync_data,
164            async_comm,
165        })
166    }
167
168    /// Returns a clone of the JSON data.
169    /// If `async_updates` is enabled, fetches the latest state from the background thread.
170    ///
171    /// # Errors
172    ///
173    /// This function will return an error if:
174    /// - The background thread fails to send or receive data in async mode.
175    /// - The mutex is poisoned in sync mode.
176    /// - The database is in an invalid state (neither sync nor async mode).
177    pub fn get(&self) -> Result<Value, DbError> {
178        if let Some(comm) = &self.async_comm {
179            // --- Async Mode: Request state from background ---
180            // Create a temporary channel for the response
181            let (response_tx, response_rx) = bounded(1); // Size 1 for one-shot behavior
182            comm.command_tx
183                .send(BackgroundCommand::GetState(response_tx))?;
184            // Block waiting for the response
185            let value = response_rx.recv()?;
186            Ok(value)
187        } else if let Some(mutex) = &self.sync_data {
188            // --- Sync Mode: Lock local mutex ---
189            let guard = mutex
190                .lock()
191                .map_err(|_| DbError::Sync("Mutex poisoned".to_string()))?;
192            Ok(guard.clone())
193            // Handle potential poisoning if using std::sync::Mutex
194            // match mutex.lock() {
195            //     Ok(guard) => Ok(guard.clone()),
196            //     Err(poisoned) => Ok(poisoned.into_inner().clone()), // Or return DbError::Sync
197            // }
198        } else {
199            Err(DbError::Sync(
200                "DB is in an invalid state (neither sync nor async)".to_string(),
201            ))
202        }
203    }
204
205    /// Updates the JSON data.
206    /// If `async_updates` is enabled, sends the update closure to the background thread.
207    /// Otherwise, applies the update synchronously.
208    /// Updates the JSON data.
209    ///
210    /// If `async_updates` is enabled, sends the update closure to the background thread.
211    /// Otherwise, applies the update synchronously.
212    ///
213    /// # Errors
214    ///
215    /// This function will return an error if:
216    /// - The background thread fails to send the update task in async mode.
217    /// - The mutex is poisoned in sync mode.
218    /// - The database is in an invalid state (neither sync nor async mode).
219    pub fn update<F>(&self, update_fn: F) -> Result<(), DbError>
220    where
221        F: FnOnce(&mut Value) + Send + 'static,
222    {
223        if let Some(comm) = &self.async_comm {
224            // --- Async Mode: Send update task ---
225            comm.update_tx.send(Box::new(update_fn))?;
226            Ok(())
227        } else if let Some(mutex) = &self.sync_data {
228            // --- Sync Mode: Lock and apply ---
229            let mut guard = mutex
230                .lock()
231                .map_err(|_| DbError::Sync("Mutex poisoned".to_string()))?;
232            update_fn(&mut guard);
233            Ok(())
234            // Handle potential poisoning
235            // match mutex.lock() {
236            //     Ok(mut guard) => {
237            //         update_fn(&mut guard);
238            //         Ok(())
239            //     }
240            //     Err(_) => Err(DbError::Sync("Mutex poisoned during update".to_string())),
241            // }
242        } else {
243            Err(DbError::Sync(
244                "DB is in an invalid state (neither sync nor async)".to_string(),
245            ))
246        }
247    }
248
249    /// Synchronously saves the current JSON data to disk atomically.
250    /// If `async_updates` is enabled, fetches the latest state before saving.
251    ///
252    /// # Errors
253    ///
254    /// This function will return an error if:
255    /// - The current state cannot be fetched (e.g., due to background thread issues in async mode).
256    /// - There are file I/O errors during the save operation.
257    pub fn save_sync(&self) -> Result<(), DbError> {
258        // 1. Get the current state (fetches from background if async)
259        let data_to_save = self.get()?;
260
261        // 2. Use the optimized internal save function with atomic=true
262        Self::save_data_to_disk(
263            &self.path,
264            &data_to_save,
265            self.pretty,
266            self.fast_serialization,
267            true, // Atomic save
268        )
269    }
270
271    /// Asynchronously saves the current JSON data atomically.
272    /// If `async_updates` is enabled, fetches the latest state first, then spawns the save thread.
273    /// Asynchronously saves the current JSON data atomically.
274    ///
275    /// If `async_updates` is enabled, fetches the latest state first, then spawns the save thread.
276    ///
277    /// # Errors
278    ///
279    /// This function will return an error if:
280    /// - The current state cannot be fetched (e.g., due to background thread issues in async mode).
281    /// - There are file I/O errors during the save operation.
282    pub fn save_async(&self) -> Result<(), DbError> {
283        // 1. Get the current state (fetches from background if async)
284        // This block ensures we get the state *before* spawning the thread.
285        let data_clone = self.get()?;
286
287        // Clone necessary data for the background thread
288        let path_clone = self.path.clone();
289        let pretty_clone = self.pretty;
290        let fast_serial_clone = self.fast_serialization;
291
292        // 2. Spawn a thread to perform the save using the optimized helper
293        thread::spawn(move || {
294            if let Err(e) = Self::save_data_to_disk(
295                &path_clone,
296                &data_clone,
297                pretty_clone,
298                fast_serial_clone,
299                true, // Perform atomic save in background thread
300            ) {
301                // Consider more robust error reporting than just stderr
302                eprintln!("Async save failed: {e:?}");
303            }
304        });
305        Ok(())
306    }
307
308    // --- Internal helper remains largely the same ---
309    fn save_data_to_disk(
310        path_str: &str,
311        data_to_save: &Value,
312        pretty: bool,
313        fast_serialization: bool,
314        atomic: bool,
315    ) -> Result<(), DbError> {
316        // Changed return type to DbError
317        let path = Path::new(path_str);
318        let final_path = path.to_path_buf();
319
320        let write_logic = |writer: Box<dyn Write>| -> Result<(), DbError> {
321            let mut buffered_writer = BufWriter::new(writer);
322            if pretty {
323                serde_json::to_writer_pretty(&mut buffered_writer, data_to_save)
324                    .map_err(|e| DbError::Io(IoError::other(e.to_string())))?;
325            } else if fast_serialization {
326                simd_json::to_writer(&mut buffered_writer, data_to_save)
327                    .map_err(|e| DbError::Io(IoError::other(format!("{e:?}"))))?;
328            } else {
329                serde_json::to_writer(&mut buffered_writer, data_to_save)
330                    .map_err(|e| DbError::Io(IoError::other(e.to_string())))?;
331            }
332            buffered_writer.flush()?; // Ensure buffer is flushed
333            Ok(())
334        };
335
336        if atomic {
337            let parent_dir = path.parent().ok_or_else(|| {
338                DbError::Io(IoError::new(
339                    ErrorKind::InvalidInput,
340                    "Invalid path: cannot determine parent directory",
341                ))
342            })?;
343            fs::create_dir_all(parent_dir)?; // Ensure parent dir exists
344
345            // Create temp file
346            let temp_file = NamedTempFile::new_in(parent_dir)?;
347            let temp_path = temp_file.path().to_path_buf(); // Keep path
348
349            // Write to temp file (using explicit file handle for Box<dyn Write>)
350            let file = OpenOptions::new().write(true).open(&temp_path)?;
351            write_logic(Box::new(file))?; // Write happens here
352
353            // Persist atomically (consumes temp_file)
354            temp_file.persist(&final_path).map_err(|e| {
355                DbError::Io(IoError::other(format!(
356                    "Failed to atomically rename temp file: {}",
357                    e.error
358                )))
359            })?;
360        } else {
361            // Non-atomic: Create/truncate target file directly
362            let file = File::create(path)?;
363            write_logic(Box::new(file))?;
364        }
365
366        Ok(())
367    }
368}
369
370// --- Background Thread Logic ---
371fn background_thread_loop(
372    mut local_data: Value,
373    path: &str,
374    pretty: bool,
375    fast_serialization: bool,
376    command_rx: &Receiver<BackgroundCommand>,
377    update_rx: &Receiver<UpdateTask>,
378) {
379    println!("Background thread started.");
380    loop {
381        select! {
382            // Received an update task
383            recv(update_rx) -> msg => {
384                if let Ok(update_fn) = msg {
385                    // Apply the update to the local state
386                    update_fn(&mut local_data);
387                    // Optional: log update application
388                } else {
389                    // Update channel closed, probably shutting down.
390                    println!("Update channel closed.");
391                    break; // Exit loop
392                }
393            },
394
395            // Received a command (GetState or Shutdown)
396            recv(command_rx) -> msg => {
397                match msg {
398                    Ok(BackgroundCommand::GetState(response_tx)) => {
399                        // Clone current state and send it back
400                        let _ = response_tx.send(local_data.clone()); // Ignore error if main thread hung up
401                    }
402                     Ok(BackgroundCommand::Shutdown) => {
403                         println!("Received Shutdown command.");
404                         break; // Exit loop
405                     }
406                    Err(_) => {
407                        // Command channel closed, main thread likely dropped.
408                        println!("Command channel closed.");
409                        break; // Exit loop
410                    }
411                }
412            }
413        }
414    }
415
416    // --- Shutdown sequence ---
417    println!("Background thread shutting down. Performing final save...");
418    // Perform a final non-atomic save of the last known state
419    if let Err(e) = JsonMutexDB::save_data_to_disk(
420        path,
421        &local_data,
422        pretty,
423        fast_serialization,
424        false, // Non-atomic during this final shutdown save
425    ) {
426        eprintln!("Error during final background save: {e:?}");
427    }
428    println!("Background thread finished.");
429}
430
431impl Drop for JsonMutexDB {
432    fn drop(&mut self) {
433        if let Some(mut comm) = self.async_comm.take() {
434            println!("Dropping JsonMutexDB (async)...");
435            // 1. Signal background thread to shut down (optional, closing channels might suffice)
436            let _ = comm.command_tx.send(BackgroundCommand::Shutdown);
437
438            // 2. Drop senders - this will cause receivers in background to error/terminate select! loop
439            drop(comm.command_tx);
440            drop(comm.update_tx);
441
442            // 3. Wait for the background thread to finish
443            if let Some(handle) = comm.update_handle.take() {
444                match handle.join() {
445                    Ok(()) => println!("Background thread joined cleanly."),
446                    Err(e) => eprintln!("Background thread panicked: {e:?}"),
447                }
448            }
449        } else {
450            println!("Dropping JsonMutexDB (sync)...");
451            // Optional: Save synchronously if in sync mode and desired
452            // if let Some(mutex) = &self.sync_data {
453            //     match mutex.lock() {
454            //         Ok(guard) => {
455            //             if let Err(e) = Self::save_data_to_disk(&self.path, &guard, self.pretty, self.fast_serialization, true) {
456            //                  eprintln!("Error during final sync save on drop: {:?}", e);
457            //             }
458            //         },
459            //         Err(_) => eprintln!("Mutex poisoned during drop, could not save."),
460            //     }
461            // }
462        }
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use serde_json::json;
470    use std::fs;
471    use std::sync::Arc;
472    use std::thread;
473    use std::time::{Duration, Instant}; // Added Duration
474
475    // Helper to remove test file quietly
476    fn cleanup_file(path: &str) {
477        let _ = fs::remove_file(path);
478    }
479
480    #[test]
481    fn test_jsonmutexdb_new_and_get() {
482        let tmp_path = "test_db_new_get.json";
483        cleanup_file(tmp_path);
484        let db =
485            JsonMutexDB::new(tmp_path, false, false, false).expect("Failed to create JsonMutexDB");
486        assert_eq!(db.get().unwrap(), json!({}));
487        cleanup_file(tmp_path);
488
489        // Test loading existing valid JSON
490        let initial_json = json!({"hello": "world"});
491        fs::write(tmp_path, initial_json.to_string()).unwrap();
492        let db = JsonMutexDB::new(tmp_path, false, false, false)
493            .expect("Failed to load existing JsonMutexDB");
494        assert_eq!(db.get().unwrap(), initial_json);
495        cleanup_file(tmp_path);
496    }
497
498    #[test]
499    fn test_jsonmutexdb_update_and_save_sync_compact_fast() {
500        let tmp_path = "test_db_set_save_compact_fast.json";
501        cleanup_file(tmp_path);
502        // Enable fast serialization
503        let db =
504            JsonMutexDB::new(tmp_path, false, false, true).expect("Failed to create JsonMutexDB");
505        let new_data = json!({"key": "value", "numbers": [1, 2, 3], "nested": {"a": true}});
506        let new_data_clone = new_data.clone();
507        if let Err(e) = db.update(move |d| *d = new_data_clone) {
508            eprintln!("Failed to update database: {e:?}");
509        }
510        db.save_sync()
511            .expect("Failed to save JSON data sync (compact/fast)");
512
513        let file_content = fs::read_to_string(tmp_path).expect("Failed to read file");
514        let file_json: Value = unsafe {
515            simd_json::from_str(&mut file_content.clone())
516                .expect("Invalid JSON in file (simd_json)")
517        };
518        assert_eq!(file_json, new_data);
519        // Check it's compact (no newlines besides maybe one at EOF)
520        assert!(
521            !file_content.trim().contains('\n'),
522            "JSON file should be compact"
523        );
524        cleanup_file(tmp_path);
525    }
526
527    #[test]
528    fn test_jsonmutexdb_update_and_save_sync_compact_standard() {
529        let tmp_path = "test_db_set_save_compact_std.json";
530        cleanup_file(tmp_path);
531        // Disable fast serialization
532        let db =
533            JsonMutexDB::new(tmp_path, false, false, false).expect("Failed to create JsonMutexDB");
534        let new_data = json!({"key": "value", "numbers": [1, 2, 3], "nested": {"a": true}});
535        let new_data_clone = new_data.clone();
536        if let Err(e) = db.update(move |d| *d = new_data_clone) {
537            eprintln!("Failed to update database: {e:?}");
538        }
539        db.save_sync()
540            .expect("Failed to save JSON data sync (compact/standard)");
541
542        let file_content = fs::read_to_string(tmp_path).expect("Failed to read file");
543        let file_json: Value =
544            serde_json::from_str(&file_content).expect("Invalid JSON in file (serde_json)");
545        assert_eq!(file_json, new_data);
546        assert!(
547            !file_content.trim().contains('\n'),
548            "JSON file should be compact"
549        );
550        cleanup_file(tmp_path);
551    }
552
553    #[test]
554    fn test_jsonmutexdb_update_and_save_sync_pretty() {
555        let tmp_path = "test_db_set_save_pretty.json";
556        cleanup_file(tmp_path);
557        // Enable pretty printing
558        let db =
559            JsonMutexDB::new(tmp_path, true, false, false).expect("Failed to create JsonMutexDB");
560        let new_data = json!({"key": "value", "numbers": [1, 2, 3], "nested": {"a": true}});
561        let new_data_clone = new_data.clone();
562        if let Err(e) = db.update(move |d| *d = new_data_clone) {
563            eprintln!("Failed to update database: {e:?}");
564        }
565        db.save_sync()
566            .expect("Failed to save JSON data sync (pretty)");
567
568        let file_content = fs::read_to_string(tmp_path).expect("Failed to read file");
569        let file_json: Value = serde_json::from_str(&file_content).expect("Invalid JSON in file");
570        assert_eq!(file_json, new_data);
571        // Basic check for pretty printing (contains newlines and spaces for indentation)
572        assert!(
573            file_content.contains('\n'),
574            "JSON file not pretty printed (no newlines)"
575        );
576        assert!(
577            file_content.contains("  "),
578            "JSON file not pretty printed (no indentation)"
579        );
580        cleanup_file(tmp_path);
581    }
582
583    #[test]
584    fn test_save_async_works() {
585        let tmp_path = "test_db_save_async.json";
586        cleanup_file(tmp_path);
587        let db =
588            JsonMutexDB::new(tmp_path, false, false, true).expect("Failed to create JsonMutexDB");
589        let new_data = json!({"async_key": "async_value", "id": 123});
590        let new_data_clone = new_data.clone();
591        if let Err(e) = db.update(move |d| *d = new_data_clone) {
592            eprintln!("Failed to update database: {e:?}");
593        }
594
595        if let Err(e) = db.save_async() {
596            eprintln!("Failed to save asynchronously: {e:?}");
597        } // Call the async save
598
599        // Wait for the async save to likely complete. This is brittle in tests!
600        // In a real app, you might need a callback or future.
601        thread::sleep(Duration::from_millis(150));
602
603        // Verify the file content
604        let file_content =
605            fs::read_to_string(tmp_path).expect("Failed to read file after async save");
606        let file_json: Value = unsafe {
607            simd_json::from_str(&mut file_content.clone())
608                .expect("Invalid JSON in file (simd_json)")
609        };
610        assert_eq!(file_json, new_data);
611        cleanup_file(tmp_path);
612    }
613
614    #[test]
615    fn test_atomic_save_prevents_corruption() {
616        let tmp_path = "test_db_atomic.json";
617        cleanup_file(tmp_path);
618
619        // 1. Create an initial valid file
620        let initial_data = json!({"initial": "data"});
621        // Use sync mode for simpler setup in this specific test
622        let db_initial = JsonMutexDB::new(tmp_path, false, false, false).unwrap();
623        let initial_data_clone = initial_data.clone();
624        db_initial.update(|d| *d = initial_data_clone).unwrap();
625        db_initial.save_sync().unwrap(); // Save initial state
626
627        // 2. Setup data that would be written if crash didn't happen
628        let db_corrupting = JsonMutexDB::new(tmp_path, false, false, false).unwrap();
629        let large_bad_data = json!({"corrupted": "data".repeat(1000)}); // Data to write
630        db_corrupting.update(|d| *d = large_bad_data).unwrap();
631        let data_to_write = db_corrupting.get().unwrap(); // Get data to write
632
633        // Manually simulate the atomic save process up to the write phase
634        let path = Path::new(tmp_path);
635        let final_path = path.to_path_buf(); // Target path
636        let parent_dir = path.parent().unwrap();
637        fs::create_dir_all(parent_dir).unwrap();
638        let temp_file_res = NamedTempFile::new_in(parent_dir);
639
640        assert!(temp_file_res.is_ok(), "Failed to create NamedTempFile");
641        let temp_file = temp_file_res.unwrap(); // temp_file lives outside catch_unwind
642        let temp_path = temp_file.path().to_path_buf(); // Get path needed *after* close/drop
643
644        // Simulate write failure (panic) before rename/persist
645        // Use AssertUnwindSafe because we are borrowing temp_file across unwind boundary
646        let write_attempt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
647            // --- Write directly to the NamedTempFile using BufWriter ---
648            let mut buffered_writer = BufWriter::new(&temp_file);
649
650            serde_json::to_writer(&mut buffered_writer, &data_to_write)
651                .expect("Write to temp file failed during simulation");
652
653            buffered_writer
654                .flush()
655                .expect("Flush failed during simulation");
656            // Explicitly drop writer before panic to release borrow
657            drop(buffered_writer);
658
659            // *** Simulate crash DURING or AFTER write/flush but BEFORE persist ***
660            panic!("Simulated crash during write!");
661        }));
662
663        // Assert that the write attempt panicked as expected
664        assert!(
665            write_attempt.is_err(),
666            "Write process did not panic as expected"
667        );
668
669        // IMPORTANT: Check the original file content hasn't changed
670        let file_content_after_crash =
671            fs::read_to_string(&final_path) // Read the original target path
672                .expect("Failed to read original file after simulated crash");
673        let file_json_after_crash: Value =
674            serde_json::from_str(&file_content_after_crash).expect("Invalid JSON in original file");
675
676        assert_eq!(
677            file_json_after_crash, initial_data,
678            "Original file was modified despite simulated crash during atomic save"
679        );
680
681        // --- Explicit Cleanup Attempt ---
682        // Explicitly try to close (which deletes if not persisted).
683        // This consumes temp_file.
684        match temp_file.close() {
685            Ok(()) => println!("NamedTempFile closed and deleted successfully."),
686            // PersistError contains the tempfile, allowing retry or manual cleanup
687            Err(persist_error) => {
688                eprintln!("NamedTempFile close failed: {persist_error}. Attempting manual delete.");
689                // If close fails, the Drop impl might also fail, so try manual delete
690                if let Err(remove_err) = fs::remove_file(&temp_path) {
691                    eprintln!("Manual deletion of temp file failed: {remove_err}");
692                    // Don't panic here, let the assertion below handle the final state check.
693                }
694            }
695        }
696
697        // Cleanup the target file
698        cleanup_file(tmp_path); // Remove the original test file (final_path)
699
700        // Assert that the temp file path no longer exists.
701        // Give the OS a tiny bit of time in case deletion has slight delay (optional)
702        // std::thread::sleep(std::time::Duration::from_millis(20));
703        assert!(
704            !temp_path.exists(),
705            "Temporary file was not cleaned up after explicit close/delete attempt"
706        );
707    }
708
709    // --- Async Update Tests (Require careful handling of state synchronization) ---
710
711    // Helper to wait for async updates to likely propagate (use with caution in real tests)
712    fn wait_for_async(db: &Arc<JsonMutexDB>, expected_key: &str, expected_value: &Value) {
713        let start = Instant::now();
714        let timeout = Duration::from_secs(2); // Adjust timeout as needed
715        loop {
716            // To check the *actual* state including async updates, we need a way
717            // to query the background thread or ensure it flushes to the main state.
718            // This current test structure only checks the main thread's view via `get()`,
719            // which IS NOT guaranteed to be up-to-date in async mode immediately after `update`.
720            //
721            // WORKAROUND for testing: Send a no-op update to potentially cycle the event loop,
722            // then check the file *after* a save triggered externally.
723            if let Err(e) = db.update(|_| {}) {
724                eprintln!("Failed to perform no-op update: {e:?}");
725            } // No-op to potentially push queue
726            thread::sleep(Duration::from_millis(20)); // Small delay
727
728            // Let's save the *main* state and check the file. This still doesn't
729            // guarantee the async update landed *before* the save, demonstrating the challenge.
730            db.save_sync().expect("Intermediate save failed");
731            if let Ok(content) = fs::read_to_string(&db.path) {
732                // Use simd_json for parsing if used for saving
733                if let Ok(current_val) =
734                    unsafe { simd_json::from_str::<Value>(&mut content.clone()) }
735                {
736                    if current_val.get(expected_key) == Some(expected_value) {
737                        return; // Found the expected state
738                    }
739                } else if let Ok(current_val) = serde_json::from_str::<Value>(&content) {
740                    // Fallback if not using simd_json or it failed
741                    if current_val.get(expected_key) == Some(expected_value) {
742                        return; // Found the expected state
743                    }
744                }
745            }
746
747            assert!(
748                (start.elapsed() <= timeout),
749                "Timeout waiting for async update to reflect for key '{expected_key}'"
750            );
751        }
752    }
753
754    // Test marked ignore because the interaction between main state and async state
755    // in this simple implementation makes reliable testing difficult without
756    // more complex synchronization/query mechanisms.
757    #[test]
758    #[ignore]
759    fn test_async_updates_basic_propagation() {
760        let tmp_path = "test_db_async_prop.json";
761        cleanup_file(tmp_path);
762
763        // Enable async updates
764        let db = Arc::new(JsonMutexDB::new(tmp_path, false, true, true).unwrap());
765
766        let key = "async_key_1";
767        let value = json!("async_value_1");
768
769        // Perform an asynchronous update
770        let db_clone = Arc::clone(&db);
771        let value_clone = value.clone();
772        thread::spawn(move || {
773            if let Err(e) = db_clone.update(move |data| {
774                data.as_object_mut()
775                    .unwrap()
776                    .insert(key.to_string(), value_clone);
777            }) {
778                eprintln!("Failed to update database: {e:?}");
779            }
780            println!("Async update sent for {key}");
781        })
782        .join()
783        .unwrap();
784
785        // Wait for the update to likely be processed and reflected (using helper)
786        // This relies on the background thread processing and potentially saving.
787        wait_for_async(&db, key, &value);
788
789        // Final check via get() - MAY STILL BE STALE depending on implementation details
790        // let final_data = db.get();
791        // assert_eq!(final_data[key], value);
792        // Instead, check the file content as wait_for_async does implicitly
793        let final_content = fs::read_to_string(tmp_path).unwrap();
794        let final_json: Value = unsafe { simd_json::from_str(&mut final_content.clone()).unwrap() };
795        assert_eq!(final_json[key], value);
796
797        // Test multiple async updates
798        let key2 = "async_key_2";
799        let value2 = json!(999);
800        let db_clone2 = Arc::clone(&db);
801        let value_clone2 = value2.clone();
802        thread::spawn(move || {
803            if let Err(e) = db_clone2.update(move |data| {
804                data.as_object_mut()
805                    .unwrap()
806                    .insert(key2.to_string(), value_clone2);
807            }) {
808                eprintln!("Failed to update database: {e:?}");
809            }
810            println!("Async update sent for {key2}");
811        })
812        .join()
813        .unwrap();
814
815        wait_for_async(&db, key2, &value2);
816
817        let final_content2 = fs::read_to_string(tmp_path).unwrap();
818        let final_json2: Value =
819            unsafe { simd_json::from_str(&mut final_content2.clone()).unwrap() };
820        assert_eq!(final_json2[key], value); // Check previous value still exists
821        assert_eq!(final_json2[key2], value2);
822
823        // Drop the DB explicitly to trigger shutdown and potential final save
824        drop(db);
825        thread::sleep(Duration::from_millis(50)); // Allow Drop time
826
827        cleanup_file(tmp_path);
828    }
829
830    // --- Performance Benchmarks (Ignored by default) ---
831
832    #[test]
833    #[ignore] // Performance sensitive, run explicitly
834    #[allow(clippy::cast_sign_loss)]
835    fn benchmark_save_sync_compact_fast_optimized() {
836        let tmp_path = "test_db_perf_save_sync_fast.json";
837        cleanup_file(tmp_path);
838        let db = JsonMutexDB::new(tmp_path, false, false, true).unwrap(); // Compact, fast
839        let mut large_obj = serde_json::Map::new();
840        for i in 0..1000 {
841            // 1000 key-value pairs
842            large_obj.insert(format!("key{i}"), json!(i));
843        }
844        let large_obj_clone = large_obj.clone();
845        if let Err(e) = db.update(move |d| *d = json!(large_obj_clone)) {
846            eprintln!("Failed to update database: {e:?}");
847        }
848
849        let iterations = 500; // Fewer iterations needed maybe
850        let start = Instant::now();
851        for _ in 0..iterations {
852            db.save_sync().expect("Save failed during benchmark");
853        }
854        let elapsed = start.elapsed();
855        println!(
856            "[Optimized] Elapsed time for {iterations} atomic sync saves (compact/fast): {elapsed:?}"
857        );
858        let avg_micros = elapsed.as_micros() / iterations as u128;
859        println!("[Optimized] Average time per save: {avg_micros} microseconds");
860        // Adjust assertion based on expected performance on target machine
861        assert!(
862            avg_micros < 500,
863            "Average save time too slow: {avg_micros} micros"
864        );
865
866        cleanup_file(tmp_path);
867    }
868
869    // Benchmark for async updates (measures enqueue/processing time)
870    // Still potentially ignores final persistence time.
871    #[test]
872    #[ignore]
873    fn benchmark_multithread_update_async_optimized() {
874        let tmp_path = "test_db_perf_async_update.json";
875        cleanup_file(tmp_path);
876        let db = Arc::new(JsonMutexDB::new(tmp_path, false, true, false).unwrap()); // Async enabled
877        let num_threads = 8;
878        let updates_per_thread = 5000;
879        let total_updates = num_threads * updates_per_thread;
880
881        let start = Instant::now();
882        let mut handles = vec![];
883        for thread_id in 0..num_threads {
884            let db_clone = Arc::clone(&db);
885            let handle = thread::spawn(move || {
886                for i in 0..updates_per_thread {
887                    let key = format!("thread{thread_id}_key{i}");
888                    let value = json!(i);
889                    if let Err(e) = db_clone.update(move |json| {
890                        json.as_object_mut().unwrap().insert(key, value);
891                    }) {
892                        eprintln!("Failed to update database: {e:?}");
893                    }
894                }
895            });
896            handles.push(handle);
897        }
898
899        for handle in handles {
900            handle.join().expect("Thread panicked");
901        }
902        let elapsed_enqueue = start.elapsed();
903        println!(
904            "[Optimized] Time to enqueue {total_updates} updates from {num_threads} threads: {elapsed_enqueue:?}"
905        );
906
907        // IMPORTANT: Now wait for the background thread to likely process these.
908        // Drop the Arc reference held by the main thread. The background thread
909        // holds the last one. Drop it to signal shutdown.
910        drop(db);
911
912        // Wait a bit for the background thread to potentially finish and drop.
913        // This is NOT a guarantee it processed everything or saved finally.
914        thread::sleep(Duration::from_millis(200)); // Adjust as needed
915
916        let elapsed_total = start.elapsed();
917        println!(
918            "[Optimized] Total time (enqueue + potential processing/shutdown): {elapsed_total:?}"
919        );
920
921        // Optional: Verify final file state IF the Drop implementation guarantees a final save
922        // let content = fs::read_to_string(tmp_path).unwrap();
923        // let final_json: Value = serde_json::from_str(&content).unwrap();
924        // assert_eq!(final_json.as_object().unwrap().len(), total_updates);
925
926        cleanup_file(tmp_path);
927    }
928
929    // Add other tests as needed: error handling, concurrent reads/writes etc.
930    #[test]
931    fn test_concurrent_read_write_sync() {
932        let tmp_path = "test_db_concurrent_sync.json";
933        cleanup_file(tmp_path);
934        let db = Arc::new(JsonMutexDB::new(tmp_path, false, false, false).unwrap());
935        let num_writers = 4;
936        let num_readers = 4;
937        let writes_per_thread = 50;
938        let reads_per_thread = 200;
939
940        let mut handles = vec![];
941
942        // Writers
943        for i in 0..num_writers {
944            let db_clone = Arc::clone(&db);
945            handles.push(thread::spawn(move || {
946                for j in 0..writes_per_thread {
947                    let key = format!("writer{i}_key{j}");
948                    let value = json!(j);
949                    if let Err(e) = db_clone.update(move |d| {
950                        d.as_object_mut().unwrap().insert(key, value);
951                    }) {
952                        eprintln!("Failed to update database: {e:?}");
953                    }
954                    // Small yield to increase chance of interleaving
955                    thread::yield_now();
956                }
957            }));
958        }
959
960        // Readers
961        for _ in 0..num_readers {
962            let db_clone = Arc::clone(&db);
963            handles.push(thread::spawn(move || {
964                for _ in 0..reads_per_thread {
965                    let _data = db_clone.get(); // Perform read
966                    // Optional: Add assertions on data consistency if needed,
967                    // but could make test flaky depending on timing.
968                    thread::yield_now();
969                }
970            }));
971        }
972
973        for handle in handles {
974            handle.join().expect("Thread panicked");
975        }
976
977        // Verify final state
978        let binding = db.get().unwrap();
979        let final_data_unwrapped = binding.as_object().unwrap();
980        assert_eq!(final_data_unwrapped.len(), num_writers * writes_per_thread);
981        // Check one key per writer to be reasonably sure
982        assert_eq!(final_data_unwrapped["writer0_key49"], 49);
983        assert_eq!(final_data_unwrapped["writer1_key49"], 49);
984
985        cleanup_file(tmp_path);
986    }
987}