1#![warn(clippy::pedantic)]
2use serde_json::Value;
3use std::cell::RefCell;
4use std::fs::{self, File, OpenOptions};
5use std::io::{BufWriter, Error as IoError, ErrorKind, Write};
6use std::path::Path;
7use std::sync::Mutex; use std::thread;
10use tempfile::NamedTempFile;
11
12use crossbeam_channel::{Receiver, RecvError, SendError, Sender, bounded, select, unbounded};
14
15enum BackgroundCommand {
19 GetState(Sender<Value>),
21 Shutdown,
23}
24
25type UpdateTask = Box<dyn FnOnce(&mut Value) + Send>;
27
28#[derive(Debug)]
30pub enum DbError {
31 Io(IoError),
32 Sync(String), }
34
35impl serde::Serialize for DbError {
36 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
37 where
38 S: serde::Serializer,
39 {
40 match self {
41 DbError::Io(err) => serializer.serialize_str(&format!("IoError: {err}")),
42 DbError::Sync(msg) => serializer.serialize_str(&format!("SyncError: {msg}")),
43 }
44 }
45}
46
47impl From<IoError> for DbError {
48 fn from(e: IoError) -> Self {
49 DbError::Io(e)
50 }
51}
52
53impl<T> From<SendError<T>> for DbError {
55 fn from(e: SendError<T>) -> Self {
56 DbError::Sync(format!("Failed to send command to background thread: {e}"))
57 }
58}
59
60impl From<RecvError> for DbError {
62 fn from(e: RecvError) -> Self {
63 DbError::Sync(format!(
64 "Failed to receive response from background thread: {e}"
65 ))
66 }
67}
68
69thread_local! {
71 static SERIALIZE_BUF: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(1024 * 64));
72}
73
74pub struct JsonMutexDB {
75 path: String,
77 pretty: bool,
78 fast_serialization: bool,
79
80 sync_data: Option<Mutex<Value>>, async_comm: Option<AsyncCommunicator>, }
84
85struct AsyncCommunicator {
87 command_tx: Sender<BackgroundCommand>,
88 update_tx: Sender<UpdateTask>,
89 update_handle: Option<thread::JoinHandle<()>>, }
91
92impl std::panic::UnwindSafe for JsonMutexDB {}
94impl std::panic::RefUnwindSafe for JsonMutexDB {}
95
96impl JsonMutexDB {
97 pub fn new(
104 path: &str,
105 pretty: bool,
106 async_updates: bool,
107 fast_serialization: bool,
108 ) -> Result<Self, DbError> {
109 let initial_json = match fs::read_to_string(path) {
111 Ok(content) if content.trim().is_empty() => Value::Object(serde_json::Map::new()),
112 Ok(mut content) => unsafe {
113 simd_json::from_str::<Value>(content.as_mut_str()).map_err(|e| {
114 DbError::Io(IoError::new(
115 ErrorKind::InvalidData,
116 format!("Invalid JSON (simd_json): {e}"),
117 ))
118 })?
119 },
120 Err(err) if err.kind() == ErrorKind::NotFound => Value::Object(serde_json::Map::new()),
121 Err(err) => return Err(DbError::Io(err)),
122 };
123
124 let mut sync_data = None;
125 let mut async_comm = None;
126
127 if async_updates {
128 let (command_tx, command_rx) = unbounded::<BackgroundCommand>();
130 let (update_tx, update_rx) = unbounded::<UpdateTask>();
131
132 let bg_initial_data = initial_json; let bg_path = path.to_string();
135 let bg_pretty = pretty;
136 let bg_fast_serialization = fast_serialization;
137
138 let update_handle = thread::spawn(move || {
139 background_thread_loop(
140 bg_initial_data,
141 &bg_path,
142 bg_pretty,
143 bg_fast_serialization,
144 &command_rx,
145 &update_rx,
146 );
147 });
148
149 async_comm = Some(AsyncCommunicator {
150 command_tx,
151 update_tx,
152 update_handle: Some(update_handle),
153 });
154 } else {
155 sync_data = Some(Mutex::new(initial_json));
157 }
158
159 Ok(JsonMutexDB {
160 path: path.to_string(),
161 pretty,
162 fast_serialization,
163 sync_data,
164 async_comm,
165 })
166 }
167
168 pub fn get(&self) -> Result<Value, DbError> {
178 if let Some(comm) = &self.async_comm {
179 let (response_tx, response_rx) = bounded(1); comm.command_tx
183 .send(BackgroundCommand::GetState(response_tx))?;
184 let value = response_rx.recv()?;
186 Ok(value)
187 } else if let Some(mutex) = &self.sync_data {
188 let guard = mutex
190 .lock()
191 .map_err(|_| DbError::Sync("Mutex poisoned".to_string()))?;
192 Ok(guard.clone())
193 } else {
199 Err(DbError::Sync(
200 "DB is in an invalid state (neither sync nor async)".to_string(),
201 ))
202 }
203 }
204
205 pub fn update<F>(&self, update_fn: F) -> Result<(), DbError>
220 where
221 F: FnOnce(&mut Value) + Send + 'static,
222 {
223 if let Some(comm) = &self.async_comm {
224 comm.update_tx.send(Box::new(update_fn))?;
226 Ok(())
227 } else if let Some(mutex) = &self.sync_data {
228 let mut guard = mutex
230 .lock()
231 .map_err(|_| DbError::Sync("Mutex poisoned".to_string()))?;
232 update_fn(&mut guard);
233 Ok(())
234 } else {
243 Err(DbError::Sync(
244 "DB is in an invalid state (neither sync nor async)".to_string(),
245 ))
246 }
247 }
248
249 pub fn save_sync(&self) -> Result<(), DbError> {
258 let data_to_save = self.get()?;
260
261 Self::save_data_to_disk(
263 &self.path,
264 &data_to_save,
265 self.pretty,
266 self.fast_serialization,
267 true, )
269 }
270
271 pub fn save_async(&self) -> Result<(), DbError> {
283 let data_clone = self.get()?;
286
287 let path_clone = self.path.clone();
289 let pretty_clone = self.pretty;
290 let fast_serial_clone = self.fast_serialization;
291
292 thread::spawn(move || {
294 if let Err(e) = Self::save_data_to_disk(
295 &path_clone,
296 &data_clone,
297 pretty_clone,
298 fast_serial_clone,
299 true, ) {
301 eprintln!("Async save failed: {e:?}");
303 }
304 });
305 Ok(())
306 }
307
308 fn save_data_to_disk(
310 path_str: &str,
311 data_to_save: &Value,
312 pretty: bool,
313 fast_serialization: bool,
314 atomic: bool,
315 ) -> Result<(), DbError> {
316 let path = Path::new(path_str);
318 let final_path = path.to_path_buf();
319
320 let write_logic = |writer: Box<dyn Write>| -> Result<(), DbError> {
321 let mut buffered_writer = BufWriter::new(writer);
322 if pretty {
323 serde_json::to_writer_pretty(&mut buffered_writer, data_to_save)
324 .map_err(|e| DbError::Io(IoError::other(e.to_string())))?;
325 } else if fast_serialization {
326 simd_json::to_writer(&mut buffered_writer, data_to_save)
327 .map_err(|e| DbError::Io(IoError::other(format!("{e:?}"))))?;
328 } else {
329 serde_json::to_writer(&mut buffered_writer, data_to_save)
330 .map_err(|e| DbError::Io(IoError::other(e.to_string())))?;
331 }
332 buffered_writer.flush()?; Ok(())
334 };
335
336 if atomic {
337 let parent_dir = path.parent().ok_or_else(|| {
338 DbError::Io(IoError::new(
339 ErrorKind::InvalidInput,
340 "Invalid path: cannot determine parent directory",
341 ))
342 })?;
343 fs::create_dir_all(parent_dir)?; let temp_file = NamedTempFile::new_in(parent_dir)?;
347 let temp_path = temp_file.path().to_path_buf(); let file = OpenOptions::new().write(true).open(&temp_path)?;
351 write_logic(Box::new(file))?; temp_file.persist(&final_path).map_err(|e| {
355 DbError::Io(IoError::other(format!(
356 "Failed to atomically rename temp file: {}",
357 e.error
358 )))
359 })?;
360 } else {
361 let file = File::create(path)?;
363 write_logic(Box::new(file))?;
364 }
365
366 Ok(())
367 }
368}
369
370fn background_thread_loop(
372 mut local_data: Value,
373 path: &str,
374 pretty: bool,
375 fast_serialization: bool,
376 command_rx: &Receiver<BackgroundCommand>,
377 update_rx: &Receiver<UpdateTask>,
378) {
379 println!("Background thread started.");
380 loop {
381 select! {
382 recv(update_rx) -> msg => {
384 if let Ok(update_fn) = msg {
385 update_fn(&mut local_data);
387 } else {
389 println!("Update channel closed.");
391 break; }
393 },
394
395 recv(command_rx) -> msg => {
397 match msg {
398 Ok(BackgroundCommand::GetState(response_tx)) => {
399 let _ = response_tx.send(local_data.clone()); }
402 Ok(BackgroundCommand::Shutdown) => {
403 println!("Received Shutdown command.");
404 break; }
406 Err(_) => {
407 println!("Command channel closed.");
409 break; }
411 }
412 }
413 }
414 }
415
416 println!("Background thread shutting down. Performing final save...");
418 if let Err(e) = JsonMutexDB::save_data_to_disk(
420 path,
421 &local_data,
422 pretty,
423 fast_serialization,
424 false, ) {
426 eprintln!("Error during final background save: {e:?}");
427 }
428 println!("Background thread finished.");
429}
430
431impl Drop for JsonMutexDB {
432 fn drop(&mut self) {
433 if let Some(mut comm) = self.async_comm.take() {
434 println!("Dropping JsonMutexDB (async)...");
435 let _ = comm.command_tx.send(BackgroundCommand::Shutdown);
437
438 drop(comm.command_tx);
440 drop(comm.update_tx);
441
442 if let Some(handle) = comm.update_handle.take() {
444 match handle.join() {
445 Ok(()) => println!("Background thread joined cleanly."),
446 Err(e) => eprintln!("Background thread panicked: {e:?}"),
447 }
448 }
449 } else {
450 println!("Dropping JsonMutexDB (sync)...");
451 }
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use serde_json::json;
470 use std::fs;
471 use std::sync::Arc;
472 use std::thread;
473 use std::time::{Duration, Instant}; fn cleanup_file(path: &str) {
477 let _ = fs::remove_file(path);
478 }
479
480 #[test]
481 fn test_jsonmutexdb_new_and_get() {
482 let tmp_path = "test_db_new_get.json";
483 cleanup_file(tmp_path);
484 let db =
485 JsonMutexDB::new(tmp_path, false, false, false).expect("Failed to create JsonMutexDB");
486 assert_eq!(db.get().unwrap(), json!({}));
487 cleanup_file(tmp_path);
488
489 let initial_json = json!({"hello": "world"});
491 fs::write(tmp_path, initial_json.to_string()).unwrap();
492 let db = JsonMutexDB::new(tmp_path, false, false, false)
493 .expect("Failed to load existing JsonMutexDB");
494 assert_eq!(db.get().unwrap(), initial_json);
495 cleanup_file(tmp_path);
496 }
497
498 #[test]
499 fn test_jsonmutexdb_update_and_save_sync_compact_fast() {
500 let tmp_path = "test_db_set_save_compact_fast.json";
501 cleanup_file(tmp_path);
502 let db =
504 JsonMutexDB::new(tmp_path, false, false, true).expect("Failed to create JsonMutexDB");
505 let new_data = json!({"key": "value", "numbers": [1, 2, 3], "nested": {"a": true}});
506 let new_data_clone = new_data.clone();
507 if let Err(e) = db.update(move |d| *d = new_data_clone) {
508 eprintln!("Failed to update database: {e:?}");
509 }
510 db.save_sync()
511 .expect("Failed to save JSON data sync (compact/fast)");
512
513 let file_content = fs::read_to_string(tmp_path).expect("Failed to read file");
514 let file_json: Value = unsafe {
515 simd_json::from_str(&mut file_content.clone())
516 .expect("Invalid JSON in file (simd_json)")
517 };
518 assert_eq!(file_json, new_data);
519 assert!(
521 !file_content.trim().contains('\n'),
522 "JSON file should be compact"
523 );
524 cleanup_file(tmp_path);
525 }
526
527 #[test]
528 fn test_jsonmutexdb_update_and_save_sync_compact_standard() {
529 let tmp_path = "test_db_set_save_compact_std.json";
530 cleanup_file(tmp_path);
531 let db =
533 JsonMutexDB::new(tmp_path, false, false, false).expect("Failed to create JsonMutexDB");
534 let new_data = json!({"key": "value", "numbers": [1, 2, 3], "nested": {"a": true}});
535 let new_data_clone = new_data.clone();
536 if let Err(e) = db.update(move |d| *d = new_data_clone) {
537 eprintln!("Failed to update database: {e:?}");
538 }
539 db.save_sync()
540 .expect("Failed to save JSON data sync (compact/standard)");
541
542 let file_content = fs::read_to_string(tmp_path).expect("Failed to read file");
543 let file_json: Value =
544 serde_json::from_str(&file_content).expect("Invalid JSON in file (serde_json)");
545 assert_eq!(file_json, new_data);
546 assert!(
547 !file_content.trim().contains('\n'),
548 "JSON file should be compact"
549 );
550 cleanup_file(tmp_path);
551 }
552
553 #[test]
554 fn test_jsonmutexdb_update_and_save_sync_pretty() {
555 let tmp_path = "test_db_set_save_pretty.json";
556 cleanup_file(tmp_path);
557 let db =
559 JsonMutexDB::new(tmp_path, true, false, false).expect("Failed to create JsonMutexDB");
560 let new_data = json!({"key": "value", "numbers": [1, 2, 3], "nested": {"a": true}});
561 let new_data_clone = new_data.clone();
562 if let Err(e) = db.update(move |d| *d = new_data_clone) {
563 eprintln!("Failed to update database: {e:?}");
564 }
565 db.save_sync()
566 .expect("Failed to save JSON data sync (pretty)");
567
568 let file_content = fs::read_to_string(tmp_path).expect("Failed to read file");
569 let file_json: Value = serde_json::from_str(&file_content).expect("Invalid JSON in file");
570 assert_eq!(file_json, new_data);
571 assert!(
573 file_content.contains('\n'),
574 "JSON file not pretty printed (no newlines)"
575 );
576 assert!(
577 file_content.contains(" "),
578 "JSON file not pretty printed (no indentation)"
579 );
580 cleanup_file(tmp_path);
581 }
582
583 #[test]
584 fn test_save_async_works() {
585 let tmp_path = "test_db_save_async.json";
586 cleanup_file(tmp_path);
587 let db =
588 JsonMutexDB::new(tmp_path, false, false, true).expect("Failed to create JsonMutexDB");
589 let new_data = json!({"async_key": "async_value", "id": 123});
590 let new_data_clone = new_data.clone();
591 if let Err(e) = db.update(move |d| *d = new_data_clone) {
592 eprintln!("Failed to update database: {e:?}");
593 }
594
595 if let Err(e) = db.save_async() {
596 eprintln!("Failed to save asynchronously: {e:?}");
597 } thread::sleep(Duration::from_millis(150));
602
603 let file_content =
605 fs::read_to_string(tmp_path).expect("Failed to read file after async save");
606 let file_json: Value = unsafe {
607 simd_json::from_str(&mut file_content.clone())
608 .expect("Invalid JSON in file (simd_json)")
609 };
610 assert_eq!(file_json, new_data);
611 cleanup_file(tmp_path);
612 }
613
614 #[test]
615 fn test_atomic_save_prevents_corruption() {
616 let tmp_path = "test_db_atomic.json";
617 cleanup_file(tmp_path);
618
619 let initial_data = json!({"initial": "data"});
621 let db_initial = JsonMutexDB::new(tmp_path, false, false, false).unwrap();
623 let initial_data_clone = initial_data.clone();
624 db_initial.update(|d| *d = initial_data_clone).unwrap();
625 db_initial.save_sync().unwrap(); let db_corrupting = JsonMutexDB::new(tmp_path, false, false, false).unwrap();
629 let large_bad_data = json!({"corrupted": "data".repeat(1000)}); db_corrupting.update(|d| *d = large_bad_data).unwrap();
631 let data_to_write = db_corrupting.get().unwrap(); let path = Path::new(tmp_path);
635 let final_path = path.to_path_buf(); let parent_dir = path.parent().unwrap();
637 fs::create_dir_all(parent_dir).unwrap();
638 let temp_file_res = NamedTempFile::new_in(parent_dir);
639
640 assert!(temp_file_res.is_ok(), "Failed to create NamedTempFile");
641 let temp_file = temp_file_res.unwrap(); let temp_path = temp_file.path().to_path_buf(); let write_attempt = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
647 let mut buffered_writer = BufWriter::new(&temp_file);
649
650 serde_json::to_writer(&mut buffered_writer, &data_to_write)
651 .expect("Write to temp file failed during simulation");
652
653 buffered_writer
654 .flush()
655 .expect("Flush failed during simulation");
656 drop(buffered_writer);
658
659 panic!("Simulated crash during write!");
661 }));
662
663 assert!(
665 write_attempt.is_err(),
666 "Write process did not panic as expected"
667 );
668
669 let file_content_after_crash =
671 fs::read_to_string(&final_path) .expect("Failed to read original file after simulated crash");
673 let file_json_after_crash: Value =
674 serde_json::from_str(&file_content_after_crash).expect("Invalid JSON in original file");
675
676 assert_eq!(
677 file_json_after_crash, initial_data,
678 "Original file was modified despite simulated crash during atomic save"
679 );
680
681 match temp_file.close() {
685 Ok(()) => println!("NamedTempFile closed and deleted successfully."),
686 Err(persist_error) => {
688 eprintln!("NamedTempFile close failed: {persist_error}. Attempting manual delete.");
689 if let Err(remove_err) = fs::remove_file(&temp_path) {
691 eprintln!("Manual deletion of temp file failed: {remove_err}");
692 }
694 }
695 }
696
697 cleanup_file(tmp_path); assert!(
704 !temp_path.exists(),
705 "Temporary file was not cleaned up after explicit close/delete attempt"
706 );
707 }
708
709 fn wait_for_async(db: &Arc<JsonMutexDB>, expected_key: &str, expected_value: &Value) {
713 let start = Instant::now();
714 let timeout = Duration::from_secs(2); loop {
716 if let Err(e) = db.update(|_| {}) {
724 eprintln!("Failed to perform no-op update: {e:?}");
725 } thread::sleep(Duration::from_millis(20)); db.save_sync().expect("Intermediate save failed");
731 if let Ok(content) = fs::read_to_string(&db.path) {
732 if let Ok(current_val) =
734 unsafe { simd_json::from_str::<Value>(&mut content.clone()) }
735 {
736 if current_val.get(expected_key) == Some(expected_value) {
737 return; }
739 } else if let Ok(current_val) = serde_json::from_str::<Value>(&content) {
740 if current_val.get(expected_key) == Some(expected_value) {
742 return; }
744 }
745 }
746
747 assert!(
748 (start.elapsed() <= timeout),
749 "Timeout waiting for async update to reflect for key '{expected_key}'"
750 );
751 }
752 }
753
754 #[test]
758 #[ignore]
759 fn test_async_updates_basic_propagation() {
760 let tmp_path = "test_db_async_prop.json";
761 cleanup_file(tmp_path);
762
763 let db = Arc::new(JsonMutexDB::new(tmp_path, false, true, true).unwrap());
765
766 let key = "async_key_1";
767 let value = json!("async_value_1");
768
769 let db_clone = Arc::clone(&db);
771 let value_clone = value.clone();
772 thread::spawn(move || {
773 if let Err(e) = db_clone.update(move |data| {
774 data.as_object_mut()
775 .unwrap()
776 .insert(key.to_string(), value_clone);
777 }) {
778 eprintln!("Failed to update database: {e:?}");
779 }
780 println!("Async update sent for {key}");
781 })
782 .join()
783 .unwrap();
784
785 wait_for_async(&db, key, &value);
788
789 let final_content = fs::read_to_string(tmp_path).unwrap();
794 let final_json: Value = unsafe { simd_json::from_str(&mut final_content.clone()).unwrap() };
795 assert_eq!(final_json[key], value);
796
797 let key2 = "async_key_2";
799 let value2 = json!(999);
800 let db_clone2 = Arc::clone(&db);
801 let value_clone2 = value2.clone();
802 thread::spawn(move || {
803 if let Err(e) = db_clone2.update(move |data| {
804 data.as_object_mut()
805 .unwrap()
806 .insert(key2.to_string(), value_clone2);
807 }) {
808 eprintln!("Failed to update database: {e:?}");
809 }
810 println!("Async update sent for {key2}");
811 })
812 .join()
813 .unwrap();
814
815 wait_for_async(&db, key2, &value2);
816
817 let final_content2 = fs::read_to_string(tmp_path).unwrap();
818 let final_json2: Value =
819 unsafe { simd_json::from_str(&mut final_content2.clone()).unwrap() };
820 assert_eq!(final_json2[key], value); assert_eq!(final_json2[key2], value2);
822
823 drop(db);
825 thread::sleep(Duration::from_millis(50)); cleanup_file(tmp_path);
828 }
829
830 #[test]
833 #[ignore] #[allow(clippy::cast_sign_loss)]
835 fn benchmark_save_sync_compact_fast_optimized() {
836 let tmp_path = "test_db_perf_save_sync_fast.json";
837 cleanup_file(tmp_path);
838 let db = JsonMutexDB::new(tmp_path, false, false, true).unwrap(); let mut large_obj = serde_json::Map::new();
840 for i in 0..1000 {
841 large_obj.insert(format!("key{i}"), json!(i));
843 }
844 let large_obj_clone = large_obj.clone();
845 if let Err(e) = db.update(move |d| *d = json!(large_obj_clone)) {
846 eprintln!("Failed to update database: {e:?}");
847 }
848
849 let iterations = 500; let start = Instant::now();
851 for _ in 0..iterations {
852 db.save_sync().expect("Save failed during benchmark");
853 }
854 let elapsed = start.elapsed();
855 println!(
856 "[Optimized] Elapsed time for {iterations} atomic sync saves (compact/fast): {elapsed:?}"
857 );
858 let avg_micros = elapsed.as_micros() / iterations as u128;
859 println!("[Optimized] Average time per save: {avg_micros} microseconds");
860 assert!(
862 avg_micros < 500,
863 "Average save time too slow: {avg_micros} micros"
864 );
865
866 cleanup_file(tmp_path);
867 }
868
869 #[test]
872 #[ignore]
873 fn benchmark_multithread_update_async_optimized() {
874 let tmp_path = "test_db_perf_async_update.json";
875 cleanup_file(tmp_path);
876 let db = Arc::new(JsonMutexDB::new(tmp_path, false, true, false).unwrap()); let num_threads = 8;
878 let updates_per_thread = 5000;
879 let total_updates = num_threads * updates_per_thread;
880
881 let start = Instant::now();
882 let mut handles = vec![];
883 for thread_id in 0..num_threads {
884 let db_clone = Arc::clone(&db);
885 let handle = thread::spawn(move || {
886 for i in 0..updates_per_thread {
887 let key = format!("thread{thread_id}_key{i}");
888 let value = json!(i);
889 if let Err(e) = db_clone.update(move |json| {
890 json.as_object_mut().unwrap().insert(key, value);
891 }) {
892 eprintln!("Failed to update database: {e:?}");
893 }
894 }
895 });
896 handles.push(handle);
897 }
898
899 for handle in handles {
900 handle.join().expect("Thread panicked");
901 }
902 let elapsed_enqueue = start.elapsed();
903 println!(
904 "[Optimized] Time to enqueue {total_updates} updates from {num_threads} threads: {elapsed_enqueue:?}"
905 );
906
907 drop(db);
911
912 thread::sleep(Duration::from_millis(200)); let elapsed_total = start.elapsed();
917 println!(
918 "[Optimized] Total time (enqueue + potential processing/shutdown): {elapsed_total:?}"
919 );
920
921 cleanup_file(tmp_path);
927 }
928
929 #[test]
931 fn test_concurrent_read_write_sync() {
932 let tmp_path = "test_db_concurrent_sync.json";
933 cleanup_file(tmp_path);
934 let db = Arc::new(JsonMutexDB::new(tmp_path, false, false, false).unwrap());
935 let num_writers = 4;
936 let num_readers = 4;
937 let writes_per_thread = 50;
938 let reads_per_thread = 200;
939
940 let mut handles = vec![];
941
942 for i in 0..num_writers {
944 let db_clone = Arc::clone(&db);
945 handles.push(thread::spawn(move || {
946 for j in 0..writes_per_thread {
947 let key = format!("writer{i}_key{j}");
948 let value = json!(j);
949 if let Err(e) = db_clone.update(move |d| {
950 d.as_object_mut().unwrap().insert(key, value);
951 }) {
952 eprintln!("Failed to update database: {e:?}");
953 }
954 thread::yield_now();
956 }
957 }));
958 }
959
960 for _ in 0..num_readers {
962 let db_clone = Arc::clone(&db);
963 handles.push(thread::spawn(move || {
964 for _ in 0..reads_per_thread {
965 let _data = db_clone.get(); thread::yield_now();
969 }
970 }));
971 }
972
973 for handle in handles {
974 handle.join().expect("Thread panicked");
975 }
976
977 let binding = db.get().unwrap();
979 let final_data_unwrapped = binding.as_object().unwrap();
980 assert_eq!(final_data_unwrapped.len(), num_writers * writes_per_thread);
981 assert_eq!(final_data_unwrapped["writer0_key49"], 49);
983 assert_eq!(final_data_unwrapped["writer1_key49"], 49);
984
985 cleanup_file(tmp_path);
986 }
987}