db_core/system/
instance.rs

1//! Interface to the DBMS.
2//!
3
4
5use crate::common::errors::Error;
6use crate::common::defs::Sequence;
7use crate::common::defs::ObjectId;
8use crate::common::defs::SeekFrom;
9use crate::common::defs::SharedSequences;
10use crate::tran_mgr::tran_mgr::TranMgr;
11use crate::log_mgr::log_mgr::RecType;
12use crate::log_mgr::log_mgr::LogMgr;
13use crate::log_mgr::log_mgr::LogReader;
14use crate::system::config::ConfigMt;
15use crate::system::checkpointer::Checkpointer;
16use crate::storage::driver::StorageDriver;
17use crate::storage::driver::Handle;
18use crate::storage::driver::StorageDriverSharedState;
19use crate::storage::datastore::FileType;
20use crate::storage::datastore::FileDesc;
21use crate::storage::datastore::DataStore;
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26
27
28/// Instance provides interface for the client to interact with the system: initiate and complete
29/// transaction, write and read data, etc.
30pub struct Instance {
31    conf:               ConfigMt,
32    csns:               SharedSequences,
33    tran_mgr:           TranMgr,
34    log_mgr:            LogMgr,
35    storage_driver:     StorageDriver,
36    checkpointer:       Arc<Checkpointer>,
37}
38
39impl Instance {
40
41    /// Create a new instance with given configuration.
42    pub fn new(conf: ConfigMt) -> Result<Instance, Error> {
43
44        let tran_mgr =          TranMgr::new(conf.clone())?;
45        let log_mgr =           LogMgr::new(conf.clone())?;
46
47        let csn =               Sequence::new(log_mgr.starting_csn());
48        let latest_commit_csn = Arc::new(AtomicU64::new(log_mgr.latest_commit_csn()));
49        let checkpoint_csn =    Sequence::new(1);
50        let csns = SharedSequences {
51                csn,
52                latest_commit_csn,
53                checkpoint_csn,
54            };
55
56        let storage_driver =    StorageDriver::new(conf.clone(), csns.clone())?;
57
58        let checkpointer =      Arc::new(Checkpointer::new(log_mgr.clone(), 
59                                                           csns.clone(),
60                                                           conf.clone(),
61                                                           tran_mgr.clone())?);
62
63        let ret = Instance {
64            conf,
65            csns,
66            tran_mgr,
67            log_mgr,
68            storage_driver,
69            checkpointer,
70        };
71
72        ret.restore_state()?;
73
74        Ok(ret)
75    }
76
77    /// Initialize datastore: create data, checkpoint, versioning store files, and lock file.
78    pub fn initialize_datastore(path: &str, block_size: usize, desc_set: &[FileDesc]) -> Result<(), Error> {
79        DataStore::initialize_datastore(path, block_size, desc_set)
80    }
81
82    /// Add a new file to datastore and return it's file_id.
83    pub fn add_datafile(&self, file_type: FileType, extent_size: u16, extent_num: u16, max_extent_num: u16) -> Result<u16, Error> {
84        self.storage_driver.add_datafile(file_type, extent_size, extent_num, max_extent_num)
85    }
86
87    /// Begin a new transaction.
88    pub fn begin_transaction(&self) -> Result<Transaction, Error> {
89        let csn = self.csns.csn.get_cur();
90        let tsn = self.tran_mgr.start_tran();
91
92        Ok(Transaction {
93            instance:   &self,
94            tsn,
95            start_csn:  csn,
96            last_write_csn:  csn,
97            read_committed_csn: self.csns.latest_commit_csn.load(Ordering::Relaxed),
98        })
99    }
100
101    /// Commit transaction.
102    pub fn commit(&self, t: Transaction) -> Result<(), Error> {
103        if t.last_write_csn > t.start_csn {
104            let commit_csn = self.csns.csn.get_next();
105            self.log_mgr.write_commit(commit_csn, t.tsn)?;
106            self.update_latest_commit_csn(commit_csn);
107        }
108        self.storage_driver.finish_tran(t.tsn);
109        self.tran_mgr.delete_tran(t.tsn);
110        Ok(())
111    }
112
113    /// Rollback transaction.
114    pub fn rollback(&self, t: Transaction) -> Result<(), Error> {
115        if t.last_write_csn > t.start_csn {
116            self.log_mgr.write_rollback(t.last_write_csn, t.tsn)?;
117            self.storage_driver.rollback_transaction(t.tsn)?;
118        }
119        self.storage_driver.finish_tran(t.tsn);
120        self.tran_mgr.delete_tran(t.tsn);
121        Ok(())
122    }
123
124    /// Open an existing object for read.
125    /// After object is opened it is possible to read and seek through object's data.
126    pub fn open_read(&self, obj_id: &ObjectId, t: &Transaction) -> Result<Object, Error> {
127        let handle = self.storage_driver.begin_read(obj_id, t.tsn, t.read_committed_csn)?;
128        Ok(Object {
129            id: *obj_id,
130            instance: &self,
131            handle,
132            cur_pos: 0,
133        })
134    }
135
136    /// Open an existing object for modification by its id.
137    /// After object is opened it is possible to read, write and seek through object data.
138    /// This operation puts lock on the object which is released after transaction commit or rollback.
139    /// If timeout is -1 then wait indefinitely, otherwise wait for requested time in ms before
140    /// returning error, or until transaction holding lock on the object has finished.
141    pub fn open_write<'a>(&'a self, obj_id: &ObjectId, t: &'a mut Transaction, timeout: i64) -> Result<ObjectWrite, Error> {
142
143        let guard = self.tran_mgr.lock_object(t.tsn, obj_id);
144
145        if let Some(txn) = self.storage_driver.is_locked(obj_id)? {
146            if txn != t.tsn {
147                if !self.tran_mgr.wait_for(txn, timeout) {
148                    return Err(Error::timeout());
149                }
150            }
151        }
152
153        t.last_write_csn = self.csns.csn.get_next();
154
155        let handle = self.storage_driver.begin_write(obj_id, t.tsn, t.last_write_csn)?;
156
157        drop(guard);
158
159        Ok(ObjectWrite {
160            obj: Object {
161                id: *obj_id,
162                instance: &self,
163                handle,
164                cur_pos: 0,
165            },
166            txn: t,
167        })
168    }
169
170    /// Create a new object and open it for write.
171    /// After object is opened it is possible to read, write and seek object data.
172    /// This operation puts lock on the object which is released after transaction commit or rollback.
173    pub fn open_create<'a>(&'a self, file_id: u16, t: &'a mut Transaction, initial_size: usize) -> Result<ObjectWrite, Error> {
174
175        t.last_write_csn = self.csns.csn.get_next();
176        let (id, handle) = self.storage_driver.create(file_id, t.tsn, t.last_write_csn, initial_size)?;
177
178        Ok(ObjectWrite {
179            obj: Object {
180                id,
181                instance: &self,
182                handle,
183                cur_pos: 0,
184            },
185            txn: t,
186        })
187    }
188
189    /// Delete an object. If object is in use, timeout can be specified, and current transaction
190    /// will wait given time until transaction holding the lock on the object has finished.
191    pub fn delete(&self, obj_id: &ObjectId, t: &mut Transaction, timeout: i64) -> Result<(), Error> {
192
193        let guard = self.tran_mgr.lock_object(t.tsn, obj_id);
194
195        if let Some(txn) = self.storage_driver.is_locked(obj_id)? {
196            if txn != t.tsn {
197                if !self.tran_mgr.wait_for(txn, timeout) {
198                    return Err(Error::timeout());
199                }
200            }
201        }
202
203        t.last_write_csn = self.csns.csn.get_next();
204
205        let checkpoint_csn = self.storage_driver.delete(obj_id, t.tsn, t.last_write_csn)?;
206
207        drop(guard);
208
209        self.log_mgr.write_delete(t.last_write_csn, checkpoint_csn, t.tsn, &obj_id)
210    }
211
212    fn update_latest_commit_csn(&self, csn: u64) {
213        let cur_csn = self.csns.latest_commit_csn.load(Ordering::Relaxed);
214        while let Err(cur_csn)  = self.csns.latest_commit_csn.compare_exchange(cur_csn, csn, Ordering::Relaxed, Ordering::Relaxed) {
215            if cur_csn >= csn {
216                return
217            }
218        }
219    }
220
221    fn restore_state(&self) -> Result<(), Error> {
222
223        let mut log_reader = self.log_mgr.get_reader()?;
224        if let Some((checkpoint_csn, tsn)) = log_reader.seek_to_latest_checkpoint()? {
225
226            self.tran_mgr.set_tsn(tsn);
227            self.csns.checkpoint_csn.set(checkpoint_csn);
228            self.storage_driver.restore_checkpoint(checkpoint_csn)?;
229        } else {
230            log_reader = self.log_mgr.get_reader()?;
231        }
232
233        let tsn = self.replay_changes(log_reader)?;
234        self.tran_mgr.set_tsn(tsn);
235
236        Ok(())
237    }
238
239    fn replay_changes(&self, mut log_reader: LogReader) -> Result<u64, Error> {
240        let mut trn_set = HashMap::new();
241        let mut max_tsn = 1;
242
243        while let Some(lrh) = log_reader.read_next()? {
244
245            let tsn = lrh.tsn;
246            let csn = lrh.csn;
247
248            if tsn > max_tsn {
249                max_tsn = tsn;
250            }
251
252            match lrh.rec_type {
253                RecType::Commit => {
254                    trn_set.remove(&tsn);
255                    self.update_latest_commit_csn(csn);
256                    self.storage_driver.finish_tran(tsn);
257                },
258                RecType::Rollback => {
259                    self.storage_driver.rollback_transaction(lrh.tsn)?;
260                    trn_set.remove(&tsn);
261                },
262                RecType::Data => {
263                    if !trn_set.contains_key(&tsn) {
264                        trn_set.insert(tsn, HashMap::new());
265                    }
266
267                    let obj_set = trn_set.get_mut(&tsn).unwrap();
268
269                    let obj = log_reader.get_object_id();
270                    if !obj_set.contains_key(&obj) {
271                        let vec = log_reader.get_vector();
272                        let rh = self.storage_driver.begin_replay(&vec.obj_id(), vec.entry_pos(), tsn, csn);
273                        obj_set.insert(obj, rh);
274                    }
275
276                    let rh = obj_set.get_mut(&obj).unwrap();
277                    let vec = log_reader.get_vector();
278                    rh.update(&vec, tsn, csn);
279                    self.storage_driver.replay(rh, log_reader.get_data())?;
280                },
281                RecType::Delete => {
282                    if !trn_set.contains_key(&tsn) {
283                        trn_set.insert(tsn, HashMap::new());
284                    }
285
286                    self.storage_driver.delete(&log_reader.get_object_id(), tsn, csn)?;
287                },
288                RecType::CheckpointBegin => {
289                    return Err(Error::unexpected_checkpoint());
290                },
291                RecType::CheckpointCompleted => {
292                    return Err(Error::unexpected_checkpoint());
293                },
294                RecType::Unspecified => {
295                },
296            }
297        }
298
299        for tsn in trn_set.keys() {
300            self.storage_driver.rollback_transaction(*tsn)?;
301        }
302
303        Ok(max_tsn)
304    }
305
306    /// Build instance using shared state.
307    pub fn from_shared_state(ss: InstanceSharedState) -> Result<Self, Error> {
308
309        let InstanceSharedState {
310            conf,
311            csns,
312            tran_mgr,
313            log_mgr,
314            checkpointer,
315            storage_ss,
316        } = ss;
317
318        let storage_driver =    StorageDriver::from_shared_state(storage_ss)?;
319
320        Ok(Instance {
321            conf,
322            csns,
323            tran_mgr,
324            log_mgr,
325            storage_driver,
326            checkpointer,
327        })
328    }
329
330    /// Return shared state that can be shared between threads.
331    pub fn get_shared_state(&self) -> Result<InstanceSharedState, Error> {
332        Ok(InstanceSharedState {
333            conf:           self.conf.clone(),
334            csns:           self.csns.clone(),
335            tran_mgr:       self.tran_mgr.clone(),
336            log_mgr:        self.log_mgr.clone(),
337            checkpointer:   self.checkpointer.clone(),
338            storage_ss:     self.storage_driver.get_shared_state()?,
339        })
340    }
341
342    /// Terminate the instance. If several instances are running and sharing state then function
343    /// has no effect for any of them but the last one.
344    /// It is up to client to make sure all transactions are finished, otherwise unfinished
345    /// transactions will be rolled back on the next start.
346    pub fn terminate(self) {
347        let Instance {
348            conf: _,
349            csns: _,
350            tran_mgr: _,
351            log_mgr,
352            storage_driver,
353            checkpointer
354        } = self;
355
356        storage_driver.terminate();
357
358        log_mgr.terminate();
359
360        if let Ok(checkpointer) = Arc::try_unwrap(checkpointer) {
361            checkpointer.terminate();
362        }
363    }
364
365}
366
367/// Transaction.
368pub struct Transaction<'a> {
369    instance: &'a Instance,
370    tsn: u64,
371    start_csn: u64,
372    last_write_csn: u64,
373    read_committed_csn: u64,
374}
375
376impl<'a> Transaction<'a> {
377
378    /// Update transaction's change seqence number to the latest commit in the system.
379    /// By default csn of a transaction is set just once when the transaction is created. And any
380    /// changes committed by other transactions after that csn are not visible to the current transaction. 
381    /// This allows current transaction to have a consistent view of data as of certain point in time. 
382    /// This function updates csn to the latest commit, making the latest changes in the system visible 
383    /// to the current transaction.
384    pub fn update_read_csn(&mut self) {
385        self.read_committed_csn = self.instance.csns.latest_commit_csn.load(Ordering::Relaxed);
386    }
387}
388
389/// Object for reading operations.
390pub struct Object<'a> {
391    id:         ObjectId,
392    instance:   &'a Instance,
393    handle:     Handle,
394    cur_pos:    u64,
395}
396
397
398impl<'a> Object<'a> {
399
400    /// seek to certain position in an opened object.
401    fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error> {
402        let res = self.instance.storage_driver.seek(&mut self.handle, from, pos, &self.id)?;
403        self.cur_pos += res;
404        Ok(res)
405    }
406
407    /// get chunk of data for reading
408    /// after sucessfull call slice of the data bytes is returned
409    /// if no data available, slice is empty
410    fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
411        self.instance.storage_driver.read(&mut self.handle, buf)
412    }
413
414    fn get_id(&self) -> ObjectId {
415        self.id
416    }
417
418}
419
420
421/// Object for reading and writing.
422pub struct ObjectWrite<'a> {
423    obj: Object<'a>,
424    txn: &'a Transaction<'a>,
425}
426
427impl<'a> ObjectWrite<'a> {
428
429    fn write_next(&mut self, data: &[u8]) -> Result<(), Error> {
430
431        let mut written = 0;
432
433        while written < data.len() {
434            let (mut vector, w, checkpoint_csn) = self.obj.instance.storage_driver.write(&mut self.obj.handle, &data[written..])?;
435            let written_data = &data[written..written+w];
436            self.obj.instance.log_mgr.write_data(self.txn.last_write_csn, checkpoint_csn, self.txn.tsn, &self.obj.id, &mut vector, written_data)?;
437            self.obj.instance.checkpointer.register_processed_data_size(written_data.len() as u64);
438            written += w;
439        }
440
441        self.obj.cur_pos += data.len() as u64;
442
443        Ok(())
444    }
445}
446
447/// Reading operations.
448pub trait Read {
449
450    /// Returns object id.
451    fn get_id(&self) -> ObjectId;
452
453    /// Seek to a certain position starting from the beginnging or from the current position.
454    fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error>;
455
456    /// Read next portion of data.
457    fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error>;
458}
459
460/// Writing operations.
461pub trait Write {
462
463    /// Write next portion of data.
464    fn write_next(&mut self, data: &[u8]) -> Result<(), Error>;
465}
466
467
468impl<'a> Write for ObjectWrite<'a> {
469
470    fn write_next(&mut self, data: &[u8]) -> Result<(), Error> {
471        self.write_next(data)
472    }
473}
474
475impl<'a> Read for ObjectWrite<'a> {
476
477    fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error> {
478        self.obj.seek(from, pos)
479    }
480
481    fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
482        self.obj.read_next(buf)
483    }
484
485    fn get_id(&self) -> ObjectId {
486        self.obj.get_id()
487    }
488}
489
490impl<'a> Read for Object<'a> {
491
492    fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error> {
493        self.seek(from, pos)
494    }
495
496    fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
497        self.read_next(buf)
498    }
499
500    fn get_id(&self) -> ObjectId {
501        self.get_id()
502    }
503}
504
505
506/// Parts of instance that are shared between threads.
507pub struct InstanceSharedState {
508    conf:           ConfigMt,
509    csns:           SharedSequences,
510    tran_mgr:       TranMgr,
511    log_mgr:        LogMgr,
512    checkpointer:   Arc<Checkpointer>,
513    storage_ss:     StorageDriverSharedState, 
514}
515
516
517#[cfg(test)]
518mod tests {
519
520    use super::*;
521    use crate::storage::datastore::FileState;
522    use crate::storage::datastore::FileType;
523    use crate::storage::datastore::FileDesc;
524    use std::path::Path;
525
526
527    fn init_datastore(dspath: &str, block_size: usize) -> Vec<FileDesc> {
528
529        if Path::new(dspath).exists() {
530            std::fs::remove_dir_all(dspath).expect("Failed to delete test dir on cleanup");
531        }
532        std::fs::create_dir(dspath).expect("Failed to create test dir");
533
534        let mut fdset = vec![];
535        let desc1 = FileDesc {
536            state:          FileState::InUse,
537            file_id:        3,
538            extent_size:    16,
539            extent_num:     3,
540            max_extent_num: 65500,
541            file_type:      FileType::DataStoreFile,
542        };
543        let desc2 = FileDesc {
544            state:          FileState::InUse,
545            file_id:        4,
546            extent_size:    10,
547            extent_num:     3,
548            max_extent_num: 65500,
549            file_type:      FileType::VersioningStoreFile,
550        };
551        let desc3 = FileDesc {
552            state:          FileState::InUse,
553            file_id:        5,
554            extent_size:    10,
555            extent_num:     3,
556            max_extent_num: 65500,
557            file_type:      FileType::CheckpointStoreFile,
558        };
559
560        fdset.push(desc1);
561        fdset.push(desc2);
562        fdset.push(desc3);
563
564        Instance::initialize_datastore(dspath, block_size, &fdset).expect("Failed to init datastore");
565        fdset
566    }
567
568
569    fn create_data(len: usize) -> Vec<u8> {
570        let mut ret = vec![];
571        for _i in 0..len {
572            ret.push(rand::random::<u8>());
573        }
574        ret
575    }
576
577    fn write_full(obj: &mut ObjectWrite, data: &[u8]) {
578        obj.write_next(data).expect("Failed to write data");
579    }
580
581    fn read_full<T: Read>(obj: &mut T, read_buf: &mut [u8]) {
582        let mut read = 0;
583        let len = read_buf.len();
584        while read < len {
585            let r = obj.read_next(&mut read_buf[read..len]).expect("Failed to read");
586            if r == 0 {break;}
587            read += r;
588        }
589        assert_eq!(read, len);
590    }
591
592    fn read_and_check<T: Read>(obj: &mut T, data: &[u8]) {
593        let mut read_buf = vec![0u8;data.len()];
594        read_full(obj, &mut read_buf);
595        assert_eq!(read_buf, data);
596    }
597
598
599    #[test]
600    fn test_instance() {
601        let dspath = "/tmp/test_instance_098123";
602        let log_dir = "/tmp/test_instance_56445";
603        let block_size = 8192;
604        let file_id1 = 3;
605
606        let _init_fdesc = init_datastore(dspath, block_size);
607
608
609        if Path::new(log_dir).exists() {
610            std::fs::remove_dir_all(log_dir).expect("Failed to delete test dir on cleanup");
611        }
612        std::fs::create_dir(log_dir).expect("Failed to create test dir");
613
614        let conf = ConfigMt::new();
615        let mut c = conf.get_conf();
616        c.set_log_dir(log_dir.to_owned());
617        c.set_datastore_path(dspath.to_owned());
618        drop(c);
619
620        let instance = Instance::new(conf.clone()).expect("Failed to create instance");
621
622
623        // create second instance from shared state and move to other thread 
624
625
626        let ss = instance.get_shared_state().expect("Failed to get shared state");
627
628        let th = std::thread::spawn(move || {
629            let instance2 = Instance::from_shared_state(ss).expect("Failed to create instance");
630            let trn = instance2.begin_transaction().expect("Failed to begin transaction 1");
631            instance2.rollback(trn).expect("Failed to rollback_transaction");
632            instance2.terminate();
633        });
634
635        let trn = instance.begin_transaction().expect("Failed to begin transaction 2");
636        instance.rollback(trn).expect("Failed to rollback_transaction");
637
638        th.join().unwrap();
639
640
641        // add data file 
642
643
644        let file_id2 = instance.add_datafile(FileType::DataStoreFile, 1000, 10, 1000).expect("Failed to add data file");
645
646
647        // read, write & delete
648
649        let data = create_data(block_size * 3);
650
651        let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
652
653        let mut ocr = instance.open_create(file_id1, &mut trn, 200).expect("Failed to create object");
654        let obj_id = ocr.get_id();
655        write_full(&mut ocr, &data);
656        drop(ocr);
657
658        let mut ord = instance.open_read(&obj_id, &trn).expect("Failed to open for reading");
659        assert_eq!(ord.get_id(), obj_id);
660        read_and_check(&mut ord, &data);
661        drop(ord);
662
663        let data2 = create_data(block_size);
664        let mut owr = instance.open_write(&obj_id, &mut trn, 1000).expect("Failed to open for writing");
665        assert_eq!(owr.get_id(), obj_id);
666        write_full(&mut owr, &data2);
667        owr.seek(SeekFrom::Current, (block_size + block_size/2) as u64).expect("Failed to seek");
668        write_full(&mut owr, &data2);
669        drop(owr);
670
671        let mut new_data = vec![0; block_size * 3 + block_size / 2];
672        new_data[0..data.len()].copy_from_slice(&data);
673        new_data[0..data2.len()].copy_from_slice(&data2);
674        let offset = block_size * 2 + block_size / 2;
675        new_data[offset..offset + data2.len()].copy_from_slice(&data2);
676
677        let mut ord = instance.open_read(&obj_id, &trn).expect("Failed to open for reading");
678        assert_eq!(ord.get_id(), obj_id);
679        read_and_check(&mut ord, &new_data);
680        drop(ord);
681
682        instance.delete(&obj_id, &mut trn, 1000).expect("Failed to delete object");
683        instance.commit(trn).expect("Failed to commit transaction");
684
685        let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
686        let res = instance.open_read(&obj_id, &trn);
687        assert!(res.is_err());
688
689        let mut ocr = instance.open_create(file_id2, &mut trn, 300).expect("Failed to create object");
690        let obj_id = ocr.get_id();
691        write_full(&mut ocr, &data);
692        drop(ocr);
693        instance.commit(trn).expect("Failed to commit transaction");
694
695        instance.terminate();
696
697
698        // open existing database
699        
700
701        let instance = Instance::new(conf.clone()).expect("Failed to create instance");
702
703        let trn = instance.begin_transaction().expect("Failed to begin transaction");
704
705        let mut ord = instance.open_read(&obj_id, &trn).expect("Failed to open for reading");
706        read_and_check(&mut ord, &data);
707        drop(ord);
708
709
710        instance.rollback(trn).expect("Failed to rollback transaction");
711
712
713        // concurrent writes & reads
714
715
716        // create an object and update it concurrently
717        let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
718        let mut ocr = instance.open_create(file_id1, &mut trn, 4).expect("Failed to create object");
719        let obj_id = ocr.get_id();
720        write_full(&mut ocr, &[0u8,0u8,0u8,0u8]);
721        drop(ocr);
722        instance.commit(trn).expect("Failed to commit transaction");
723
724        let instance_num = 4;
725        let iterations = 100;
726        let mut threads = vec![];
727        for _instn in 0..instance_num {
728
729            let ss = instance.get_shared_state().expect("Failed to get shared state");
730
731            let th = std::thread::spawn(move || {
732                let mut data = [0u8;4];
733                let instance2 = Instance::from_shared_state(ss).expect("Failed to create instance");
734                for _itrn in 0..iterations {
735                    let mut trn = instance2.begin_transaction().expect("Failed to begin transaction 1");
736                    let mut owr = instance2.open_write(&obj_id, &mut trn, -1).expect("Failed to create object");
737                    read_full(&mut owr, &mut data);
738                    let val = u32::from_ne_bytes(data) + 1;
739                    owr.seek(SeekFrom::Start, 0).expect("Failed to seek");
740                    write_full(&mut owr, &val.to_ne_bytes());
741                    drop(owr);
742                    instance2.commit(trn).expect("Failed to commit transaction");
743                }
744                instance2.terminate();
745            });
746
747            threads.push(th);
748        }
749
750        for th in threads.drain(..) {
751            th.join().unwrap();
752        }
753
754        let mut data = [0u8;4];
755        let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
756        let mut ord = instance.open_read(&obj_id, &mut trn).expect("Failed to open object");
757        read_full(&mut ord, &mut data);
758        assert_eq!(iterations * instance_num, u32::from_ne_bytes(data));
759        instance.commit(trn).expect("Failed to commit transaction");
760
761        instance.terminate();
762    }
763
764}