1use crate::common::errors::Error;
6use crate::common::defs::Sequence;
7use crate::common::defs::ObjectId;
8use crate::common::defs::SeekFrom;
9use crate::common::defs::SharedSequences;
10use crate::tran_mgr::tran_mgr::TranMgr;
11use crate::log_mgr::log_mgr::RecType;
12use crate::log_mgr::log_mgr::LogMgr;
13use crate::log_mgr::log_mgr::LogReader;
14use crate::system::config::ConfigMt;
15use crate::system::checkpointer::Checkpointer;
16use crate::storage::driver::StorageDriver;
17use crate::storage::driver::Handle;
18use crate::storage::driver::StorageDriverSharedState;
19use crate::storage::datastore::FileType;
20use crate::storage::datastore::FileDesc;
21use crate::storage::datastore::DataStore;
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26
27
28pub struct Instance {
31 conf: ConfigMt,
32 csns: SharedSequences,
33 tran_mgr: TranMgr,
34 log_mgr: LogMgr,
35 storage_driver: StorageDriver,
36 checkpointer: Arc<Checkpointer>,
37}
38
39impl Instance {
40
41 pub fn new(conf: ConfigMt) -> Result<Instance, Error> {
43
44 let tran_mgr = TranMgr::new(conf.clone())?;
45 let log_mgr = LogMgr::new(conf.clone())?;
46
47 let csn = Sequence::new(log_mgr.starting_csn());
48 let latest_commit_csn = Arc::new(AtomicU64::new(log_mgr.latest_commit_csn()));
49 let checkpoint_csn = Sequence::new(1);
50 let csns = SharedSequences {
51 csn,
52 latest_commit_csn,
53 checkpoint_csn,
54 };
55
56 let storage_driver = StorageDriver::new(conf.clone(), csns.clone())?;
57
58 let checkpointer = Arc::new(Checkpointer::new(log_mgr.clone(),
59 csns.clone(),
60 conf.clone(),
61 tran_mgr.clone())?);
62
63 let ret = Instance {
64 conf,
65 csns,
66 tran_mgr,
67 log_mgr,
68 storage_driver,
69 checkpointer,
70 };
71
72 ret.restore_state()?;
73
74 Ok(ret)
75 }
76
77 pub fn initialize_datastore(path: &str, block_size: usize, desc_set: &[FileDesc]) -> Result<(), Error> {
79 DataStore::initialize_datastore(path, block_size, desc_set)
80 }
81
82 pub fn add_datafile(&self, file_type: FileType, extent_size: u16, extent_num: u16, max_extent_num: u16) -> Result<u16, Error> {
84 self.storage_driver.add_datafile(file_type, extent_size, extent_num, max_extent_num)
85 }
86
87 pub fn begin_transaction(&self) -> Result<Transaction, Error> {
89 let csn = self.csns.csn.get_cur();
90 let tsn = self.tran_mgr.start_tran();
91
92 Ok(Transaction {
93 instance: &self,
94 tsn,
95 start_csn: csn,
96 last_write_csn: csn,
97 read_committed_csn: self.csns.latest_commit_csn.load(Ordering::Relaxed),
98 })
99 }
100
101 pub fn commit(&self, t: Transaction) -> Result<(), Error> {
103 if t.last_write_csn > t.start_csn {
104 let commit_csn = self.csns.csn.get_next();
105 self.log_mgr.write_commit(commit_csn, t.tsn)?;
106 self.update_latest_commit_csn(commit_csn);
107 }
108 self.storage_driver.finish_tran(t.tsn);
109 self.tran_mgr.delete_tran(t.tsn);
110 Ok(())
111 }
112
113 pub fn rollback(&self, t: Transaction) -> Result<(), Error> {
115 if t.last_write_csn > t.start_csn {
116 self.log_mgr.write_rollback(t.last_write_csn, t.tsn)?;
117 self.storage_driver.rollback_transaction(t.tsn)?;
118 }
119 self.storage_driver.finish_tran(t.tsn);
120 self.tran_mgr.delete_tran(t.tsn);
121 Ok(())
122 }
123
124 pub fn open_read(&self, obj_id: &ObjectId, t: &Transaction) -> Result<Object, Error> {
127 let handle = self.storage_driver.begin_read(obj_id, t.tsn, t.read_committed_csn)?;
128 Ok(Object {
129 id: *obj_id,
130 instance: &self,
131 handle,
132 cur_pos: 0,
133 })
134 }
135
136 pub fn open_write<'a>(&'a self, obj_id: &ObjectId, t: &'a mut Transaction, timeout: i64) -> Result<ObjectWrite, Error> {
142
143 let guard = self.tran_mgr.lock_object(t.tsn, obj_id);
144
145 if let Some(txn) = self.storage_driver.is_locked(obj_id)? {
146 if txn != t.tsn {
147 if !self.tran_mgr.wait_for(txn, timeout) {
148 return Err(Error::timeout());
149 }
150 }
151 }
152
153 t.last_write_csn = self.csns.csn.get_next();
154
155 let handle = self.storage_driver.begin_write(obj_id, t.tsn, t.last_write_csn)?;
156
157 drop(guard);
158
159 Ok(ObjectWrite {
160 obj: Object {
161 id: *obj_id,
162 instance: &self,
163 handle,
164 cur_pos: 0,
165 },
166 txn: t,
167 })
168 }
169
170 pub fn open_create<'a>(&'a self, file_id: u16, t: &'a mut Transaction, initial_size: usize) -> Result<ObjectWrite, Error> {
174
175 t.last_write_csn = self.csns.csn.get_next();
176 let (id, handle) = self.storage_driver.create(file_id, t.tsn, t.last_write_csn, initial_size)?;
177
178 Ok(ObjectWrite {
179 obj: Object {
180 id,
181 instance: &self,
182 handle,
183 cur_pos: 0,
184 },
185 txn: t,
186 })
187 }
188
189 pub fn delete(&self, obj_id: &ObjectId, t: &mut Transaction, timeout: i64) -> Result<(), Error> {
192
193 let guard = self.tran_mgr.lock_object(t.tsn, obj_id);
194
195 if let Some(txn) = self.storage_driver.is_locked(obj_id)? {
196 if txn != t.tsn {
197 if !self.tran_mgr.wait_for(txn, timeout) {
198 return Err(Error::timeout());
199 }
200 }
201 }
202
203 t.last_write_csn = self.csns.csn.get_next();
204
205 let checkpoint_csn = self.storage_driver.delete(obj_id, t.tsn, t.last_write_csn)?;
206
207 drop(guard);
208
209 self.log_mgr.write_delete(t.last_write_csn, checkpoint_csn, t.tsn, &obj_id)
210 }
211
212 fn update_latest_commit_csn(&self, csn: u64) {
213 let cur_csn = self.csns.latest_commit_csn.load(Ordering::Relaxed);
214 while let Err(cur_csn) = self.csns.latest_commit_csn.compare_exchange(cur_csn, csn, Ordering::Relaxed, Ordering::Relaxed) {
215 if cur_csn >= csn {
216 return
217 }
218 }
219 }
220
221 fn restore_state(&self) -> Result<(), Error> {
222
223 let mut log_reader = self.log_mgr.get_reader()?;
224 if let Some((checkpoint_csn, tsn)) = log_reader.seek_to_latest_checkpoint()? {
225
226 self.tran_mgr.set_tsn(tsn);
227 self.csns.checkpoint_csn.set(checkpoint_csn);
228 self.storage_driver.restore_checkpoint(checkpoint_csn)?;
229 } else {
230 log_reader = self.log_mgr.get_reader()?;
231 }
232
233 let tsn = self.replay_changes(log_reader)?;
234 self.tran_mgr.set_tsn(tsn);
235
236 Ok(())
237 }
238
239 fn replay_changes(&self, mut log_reader: LogReader) -> Result<u64, Error> {
240 let mut trn_set = HashMap::new();
241 let mut max_tsn = 1;
242
243 while let Some(lrh) = log_reader.read_next()? {
244
245 let tsn = lrh.tsn;
246 let csn = lrh.csn;
247
248 if tsn > max_tsn {
249 max_tsn = tsn;
250 }
251
252 match lrh.rec_type {
253 RecType::Commit => {
254 trn_set.remove(&tsn);
255 self.update_latest_commit_csn(csn);
256 self.storage_driver.finish_tran(tsn);
257 },
258 RecType::Rollback => {
259 self.storage_driver.rollback_transaction(lrh.tsn)?;
260 trn_set.remove(&tsn);
261 },
262 RecType::Data => {
263 if !trn_set.contains_key(&tsn) {
264 trn_set.insert(tsn, HashMap::new());
265 }
266
267 let obj_set = trn_set.get_mut(&tsn).unwrap();
268
269 let obj = log_reader.get_object_id();
270 if !obj_set.contains_key(&obj) {
271 let vec = log_reader.get_vector();
272 let rh = self.storage_driver.begin_replay(&vec.obj_id(), vec.entry_pos(), tsn, csn);
273 obj_set.insert(obj, rh);
274 }
275
276 let rh = obj_set.get_mut(&obj).unwrap();
277 let vec = log_reader.get_vector();
278 rh.update(&vec, tsn, csn);
279 self.storage_driver.replay(rh, log_reader.get_data())?;
280 },
281 RecType::Delete => {
282 if !trn_set.contains_key(&tsn) {
283 trn_set.insert(tsn, HashMap::new());
284 }
285
286 self.storage_driver.delete(&log_reader.get_object_id(), tsn, csn)?;
287 },
288 RecType::CheckpointBegin => {
289 return Err(Error::unexpected_checkpoint());
290 },
291 RecType::CheckpointCompleted => {
292 return Err(Error::unexpected_checkpoint());
293 },
294 RecType::Unspecified => {
295 },
296 }
297 }
298
299 for tsn in trn_set.keys() {
300 self.storage_driver.rollback_transaction(*tsn)?;
301 }
302
303 Ok(max_tsn)
304 }
305
306 pub fn from_shared_state(ss: InstanceSharedState) -> Result<Self, Error> {
308
309 let InstanceSharedState {
310 conf,
311 csns,
312 tran_mgr,
313 log_mgr,
314 checkpointer,
315 storage_ss,
316 } = ss;
317
318 let storage_driver = StorageDriver::from_shared_state(storage_ss)?;
319
320 Ok(Instance {
321 conf,
322 csns,
323 tran_mgr,
324 log_mgr,
325 storage_driver,
326 checkpointer,
327 })
328 }
329
330 pub fn get_shared_state(&self) -> Result<InstanceSharedState, Error> {
332 Ok(InstanceSharedState {
333 conf: self.conf.clone(),
334 csns: self.csns.clone(),
335 tran_mgr: self.tran_mgr.clone(),
336 log_mgr: self.log_mgr.clone(),
337 checkpointer: self.checkpointer.clone(),
338 storage_ss: self.storage_driver.get_shared_state()?,
339 })
340 }
341
342 pub fn terminate(self) {
347 let Instance {
348 conf: _,
349 csns: _,
350 tran_mgr: _,
351 log_mgr,
352 storage_driver,
353 checkpointer
354 } = self;
355
356 storage_driver.terminate();
357
358 log_mgr.terminate();
359
360 if let Ok(checkpointer) = Arc::try_unwrap(checkpointer) {
361 checkpointer.terminate();
362 }
363 }
364
365}
366
367pub struct Transaction<'a> {
369 instance: &'a Instance,
370 tsn: u64,
371 start_csn: u64,
372 last_write_csn: u64,
373 read_committed_csn: u64,
374}
375
376impl<'a> Transaction<'a> {
377
378 pub fn update_read_csn(&mut self) {
385 self.read_committed_csn = self.instance.csns.latest_commit_csn.load(Ordering::Relaxed);
386 }
387}
388
389pub struct Object<'a> {
391 id: ObjectId,
392 instance: &'a Instance,
393 handle: Handle,
394 cur_pos: u64,
395}
396
397
398impl<'a> Object<'a> {
399
400 fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error> {
402 let res = self.instance.storage_driver.seek(&mut self.handle, from, pos, &self.id)?;
403 self.cur_pos += res;
404 Ok(res)
405 }
406
407 fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
411 self.instance.storage_driver.read(&mut self.handle, buf)
412 }
413
414 fn get_id(&self) -> ObjectId {
415 self.id
416 }
417
418}
419
420
421pub struct ObjectWrite<'a> {
423 obj: Object<'a>,
424 txn: &'a Transaction<'a>,
425}
426
427impl<'a> ObjectWrite<'a> {
428
429 fn write_next(&mut self, data: &[u8]) -> Result<(), Error> {
430
431 let mut written = 0;
432
433 while written < data.len() {
434 let (mut vector, w, checkpoint_csn) = self.obj.instance.storage_driver.write(&mut self.obj.handle, &data[written..])?;
435 let written_data = &data[written..written+w];
436 self.obj.instance.log_mgr.write_data(self.txn.last_write_csn, checkpoint_csn, self.txn.tsn, &self.obj.id, &mut vector, written_data)?;
437 self.obj.instance.checkpointer.register_processed_data_size(written_data.len() as u64);
438 written += w;
439 }
440
441 self.obj.cur_pos += data.len() as u64;
442
443 Ok(())
444 }
445}
446
447pub trait Read {
449
450 fn get_id(&self) -> ObjectId;
452
453 fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error>;
455
456 fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error>;
458}
459
460pub trait Write {
462
463 fn write_next(&mut self, data: &[u8]) -> Result<(), Error>;
465}
466
467
468impl<'a> Write for ObjectWrite<'a> {
469
470 fn write_next(&mut self, data: &[u8]) -> Result<(), Error> {
471 self.write_next(data)
472 }
473}
474
475impl<'a> Read for ObjectWrite<'a> {
476
477 fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error> {
478 self.obj.seek(from, pos)
479 }
480
481 fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
482 self.obj.read_next(buf)
483 }
484
485 fn get_id(&self) -> ObjectId {
486 self.obj.get_id()
487 }
488}
489
490impl<'a> Read for Object<'a> {
491
492 fn seek(&mut self, from: SeekFrom, pos: u64) -> Result<u64, Error> {
493 self.seek(from, pos)
494 }
495
496 fn read_next(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
497 self.read_next(buf)
498 }
499
500 fn get_id(&self) -> ObjectId {
501 self.get_id()
502 }
503}
504
505
506pub struct InstanceSharedState {
508 conf: ConfigMt,
509 csns: SharedSequences,
510 tran_mgr: TranMgr,
511 log_mgr: LogMgr,
512 checkpointer: Arc<Checkpointer>,
513 storage_ss: StorageDriverSharedState,
514}
515
516
517#[cfg(test)]
518mod tests {
519
520 use super::*;
521 use crate::storage::datastore::FileState;
522 use crate::storage::datastore::FileType;
523 use crate::storage::datastore::FileDesc;
524 use std::path::Path;
525
526
527 fn init_datastore(dspath: &str, block_size: usize) -> Vec<FileDesc> {
528
529 if Path::new(dspath).exists() {
530 std::fs::remove_dir_all(dspath).expect("Failed to delete test dir on cleanup");
531 }
532 std::fs::create_dir(dspath).expect("Failed to create test dir");
533
534 let mut fdset = vec![];
535 let desc1 = FileDesc {
536 state: FileState::InUse,
537 file_id: 3,
538 extent_size: 16,
539 extent_num: 3,
540 max_extent_num: 65500,
541 file_type: FileType::DataStoreFile,
542 };
543 let desc2 = FileDesc {
544 state: FileState::InUse,
545 file_id: 4,
546 extent_size: 10,
547 extent_num: 3,
548 max_extent_num: 65500,
549 file_type: FileType::VersioningStoreFile,
550 };
551 let desc3 = FileDesc {
552 state: FileState::InUse,
553 file_id: 5,
554 extent_size: 10,
555 extent_num: 3,
556 max_extent_num: 65500,
557 file_type: FileType::CheckpointStoreFile,
558 };
559
560 fdset.push(desc1);
561 fdset.push(desc2);
562 fdset.push(desc3);
563
564 Instance::initialize_datastore(dspath, block_size, &fdset).expect("Failed to init datastore");
565 fdset
566 }
567
568
569 fn create_data(len: usize) -> Vec<u8> {
570 let mut ret = vec![];
571 for _i in 0..len {
572 ret.push(rand::random::<u8>());
573 }
574 ret
575 }
576
577 fn write_full(obj: &mut ObjectWrite, data: &[u8]) {
578 obj.write_next(data).expect("Failed to write data");
579 }
580
581 fn read_full<T: Read>(obj: &mut T, read_buf: &mut [u8]) {
582 let mut read = 0;
583 let len = read_buf.len();
584 while read < len {
585 let r = obj.read_next(&mut read_buf[read..len]).expect("Failed to read");
586 if r == 0 {break;}
587 read += r;
588 }
589 assert_eq!(read, len);
590 }
591
592 fn read_and_check<T: Read>(obj: &mut T, data: &[u8]) {
593 let mut read_buf = vec![0u8;data.len()];
594 read_full(obj, &mut read_buf);
595 assert_eq!(read_buf, data);
596 }
597
598
599 #[test]
600 fn test_instance() {
601 let dspath = "/tmp/test_instance_098123";
602 let log_dir = "/tmp/test_instance_56445";
603 let block_size = 8192;
604 let file_id1 = 3;
605
606 let _init_fdesc = init_datastore(dspath, block_size);
607
608
609 if Path::new(log_dir).exists() {
610 std::fs::remove_dir_all(log_dir).expect("Failed to delete test dir on cleanup");
611 }
612 std::fs::create_dir(log_dir).expect("Failed to create test dir");
613
614 let conf = ConfigMt::new();
615 let mut c = conf.get_conf();
616 c.set_log_dir(log_dir.to_owned());
617 c.set_datastore_path(dspath.to_owned());
618 drop(c);
619
620 let instance = Instance::new(conf.clone()).expect("Failed to create instance");
621
622
623 let ss = instance.get_shared_state().expect("Failed to get shared state");
627
628 let th = std::thread::spawn(move || {
629 let instance2 = Instance::from_shared_state(ss).expect("Failed to create instance");
630 let trn = instance2.begin_transaction().expect("Failed to begin transaction 1");
631 instance2.rollback(trn).expect("Failed to rollback_transaction");
632 instance2.terminate();
633 });
634
635 let trn = instance.begin_transaction().expect("Failed to begin transaction 2");
636 instance.rollback(trn).expect("Failed to rollback_transaction");
637
638 th.join().unwrap();
639
640
641 let file_id2 = instance.add_datafile(FileType::DataStoreFile, 1000, 10, 1000).expect("Failed to add data file");
645
646
647 let data = create_data(block_size * 3);
650
651 let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
652
653 let mut ocr = instance.open_create(file_id1, &mut trn, 200).expect("Failed to create object");
654 let obj_id = ocr.get_id();
655 write_full(&mut ocr, &data);
656 drop(ocr);
657
658 let mut ord = instance.open_read(&obj_id, &trn).expect("Failed to open for reading");
659 assert_eq!(ord.get_id(), obj_id);
660 read_and_check(&mut ord, &data);
661 drop(ord);
662
663 let data2 = create_data(block_size);
664 let mut owr = instance.open_write(&obj_id, &mut trn, 1000).expect("Failed to open for writing");
665 assert_eq!(owr.get_id(), obj_id);
666 write_full(&mut owr, &data2);
667 owr.seek(SeekFrom::Current, (block_size + block_size/2) as u64).expect("Failed to seek");
668 write_full(&mut owr, &data2);
669 drop(owr);
670
671 let mut new_data = vec![0; block_size * 3 + block_size / 2];
672 new_data[0..data.len()].copy_from_slice(&data);
673 new_data[0..data2.len()].copy_from_slice(&data2);
674 let offset = block_size * 2 + block_size / 2;
675 new_data[offset..offset + data2.len()].copy_from_slice(&data2);
676
677 let mut ord = instance.open_read(&obj_id, &trn).expect("Failed to open for reading");
678 assert_eq!(ord.get_id(), obj_id);
679 read_and_check(&mut ord, &new_data);
680 drop(ord);
681
682 instance.delete(&obj_id, &mut trn, 1000).expect("Failed to delete object");
683 instance.commit(trn).expect("Failed to commit transaction");
684
685 let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
686 let res = instance.open_read(&obj_id, &trn);
687 assert!(res.is_err());
688
689 let mut ocr = instance.open_create(file_id2, &mut trn, 300).expect("Failed to create object");
690 let obj_id = ocr.get_id();
691 write_full(&mut ocr, &data);
692 drop(ocr);
693 instance.commit(trn).expect("Failed to commit transaction");
694
695 instance.terminate();
696
697
698 let instance = Instance::new(conf.clone()).expect("Failed to create instance");
702
703 let trn = instance.begin_transaction().expect("Failed to begin transaction");
704
705 let mut ord = instance.open_read(&obj_id, &trn).expect("Failed to open for reading");
706 read_and_check(&mut ord, &data);
707 drop(ord);
708
709
710 instance.rollback(trn).expect("Failed to rollback transaction");
711
712
713 let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
718 let mut ocr = instance.open_create(file_id1, &mut trn, 4).expect("Failed to create object");
719 let obj_id = ocr.get_id();
720 write_full(&mut ocr, &[0u8,0u8,0u8,0u8]);
721 drop(ocr);
722 instance.commit(trn).expect("Failed to commit transaction");
723
724 let instance_num = 4;
725 let iterations = 100;
726 let mut threads = vec![];
727 for _instn in 0..instance_num {
728
729 let ss = instance.get_shared_state().expect("Failed to get shared state");
730
731 let th = std::thread::spawn(move || {
732 let mut data = [0u8;4];
733 let instance2 = Instance::from_shared_state(ss).expect("Failed to create instance");
734 for _itrn in 0..iterations {
735 let mut trn = instance2.begin_transaction().expect("Failed to begin transaction 1");
736 let mut owr = instance2.open_write(&obj_id, &mut trn, -1).expect("Failed to create object");
737 read_full(&mut owr, &mut data);
738 let val = u32::from_ne_bytes(data) + 1;
739 owr.seek(SeekFrom::Start, 0).expect("Failed to seek");
740 write_full(&mut owr, &val.to_ne_bytes());
741 drop(owr);
742 instance2.commit(trn).expect("Failed to commit transaction");
743 }
744 instance2.terminate();
745 });
746
747 threads.push(th);
748 }
749
750 for th in threads.drain(..) {
751 th.join().unwrap();
752 }
753
754 let mut data = [0u8;4];
755 let mut trn = instance.begin_transaction().expect("Failed to begin transaction");
756 let mut ord = instance.open_read(&obj_id, &mut trn).expect("Failed to open object");
757 read_full(&mut ord, &mut data);
758 assert_eq!(iterations * instance_num, u32::from_ne_bytes(data));
759 instance.commit(trn).expect("Failed to commit transaction");
760
761 instance.terminate();
762 }
763
764}