use crate::common::errors::Error;
use crate::common::defs::ObjectId;
use crate::common::defs::Vector;
use crate::common::defs::Sequence;
use crate::system::config::ConfigMt;
use crate::log_mgr::fs::BufferedFileStream;
use crate::log_mgr::fs::FileStream;
use crate::log_mgr::fs::FileOps;
use crate::log_mgr::io::LogWriter;
pub use crate::log_mgr::io::LogReader;
pub use crate::log_mgr::io::LogRecordHeader;
pub use crate::log_mgr::io::RecType;
#[derive(Clone)]
pub struct LogMgr {
writer: LogWriter,
log_dir: String,
starting_csn: u64,
latest_commit_csn: u64,
}
impl LogMgr {
pub fn new(conf: ConfigMt) -> Result<Self, Error> {
let conf = conf.get_conf();
let log_dir = conf.get_log_dir().to_owned();
let max_log_file_size = *conf.get_max_log_file_size();
let buf_sz = *conf.get_log_writer_buf_size();
let file_id = FileOps::init_file_logging(&log_dir, max_log_file_size)?;
let fs = FileStream::new(log_dir.clone(), max_log_file_size, file_id, 4, false, true)?;
let mut lr = LogReader::new(fs)?;
let (start_pos, lsn, starting_csn, latest_commit_csn) = lr.find_write_position()?;
drop(lr);
let bfs = BufferedFileStream::new(log_dir.clone(), max_log_file_size, buf_sz as usize, file_id, start_pos)?;
let lsn = Sequence::new(lsn);
let writer = LogWriter::new(bfs, lsn)?;
Ok(LogMgr {
writer,
log_dir,
starting_csn,
latest_commit_csn,
})
}
pub fn write_data(&self, csn: u64, checkpoint_csn: u64, tsn: u64, obj_id: &ObjectId, vector: &mut Vector, data: &[u8]) -> Result<(), Error> {
self.writer.write_data(csn, checkpoint_csn, tsn, obj_id, vector, data)
}
pub fn write_commit(&self, csn: u64, tsn: u64) -> Result<(), Error> {
self.writer.write_commit(csn, tsn)
}
pub fn write_rollback(&self, csn: u64, tsn: u64) -> Result<(), Error> {
self.writer.write_rollback(csn, tsn)
}
pub fn write_checkpoint_begin(&self, checkpoint_csn: u64, latest_commit_csn: u64) -> Result<(), Error> {
self.writer.write_checkpoint_begin(checkpoint_csn, latest_commit_csn)
}
pub fn write_checkpoint_completed(&self, checkpoint_csn: u64, latest_commit_csn: u64, current_tsn: u64) -> Result<(), Error> {
self.writer.write_checkpoint_completed(checkpoint_csn, latest_commit_csn, current_tsn)
}
pub fn write_delete(&self, csn: u64, checkpoint_csn: u64, tsn: u64, obj_id: &ObjectId) -> Result<(), Error> {
self.writer.write_delete(csn, checkpoint_csn, tsn, obj_id)
}
pub fn get_reader(&self) -> Result<LogReader, Error> {
let file_id = FileOps::find_latest_log_file(&self.log_dir)?;
let fs = FileStream::new(self.log_dir.clone(), 0, file_id, 4, false, true)?;
Ok(LogReader::new(fs)?)
}
pub fn starting_csn(&self) -> u64 {
self.starting_csn
}
pub fn latest_commit_csn(&self) -> u64 {
self.latest_commit_csn
}
pub fn terminate(self) {
self.writer.terminate();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::defs::BlockId;
use std::path::Path;
#[test]
fn test_log_mgr() {
let log_dir = "/tmp/test_log_mgr_34566576";
if Path::new(log_dir).exists() {
std::fs::remove_dir_all(log_dir).expect("Failed to delete test dir on cleanup");
}
std::fs::create_dir(log_dir).expect("Failed to create test dir");
let conf = ConfigMt::new();
let mut c = conf.get_conf();
c.set_log_dir(log_dir.to_owned());
drop(c);
let lm = LogMgr::new(conf.clone()).expect("Failed to create log mgr");
let csn = 123;
let checkpoint_csn = 124;
let tsn = 125;
let current_tsn = 234;
let obj_id = ObjectId::init(100,101,102,103);
let mut vec = Vector::init(BlockId::init(1,1,1),1,1);
let data = [0,1,2,3,4,5,6,7,8,9];
let latest_commit_csn = 126;
lm.write_data(csn, checkpoint_csn-1, tsn, &obj_id, &mut vec, &data).expect("Failed to write data");
lm.write_commit(csn+1, tsn+1).expect("Failed to write commit");
lm.write_rollback(csn+2, tsn+2).expect("Failed to write rollback");
lm.write_checkpoint_begin(checkpoint_csn, latest_commit_csn).expect("Failed to write checkpoint csn");
lm.write_checkpoint_completed(checkpoint_csn, latest_commit_csn+1, current_tsn).expect("Failed to write checkpoint completed");
lm.write_delete(csn+3, checkpoint_csn, tsn+3, &obj_id).expect("Failed to delete");
lm.write_data(csn+4, checkpoint_csn, tsn+4, &obj_id, &mut vec, &data).expect("Failed to write data");
lm.write_commit(csn+5, tsn+5).expect("Failed to write commit");
lm.write_data(csn+6, checkpoint_csn, tsn+6, &obj_id, &mut vec, &data).expect("Failed to write data");
lm.terminate();
let lm = LogMgr::new(conf.clone()).expect("Failed to create log mgr");
let starting_csn = lm.starting_csn();
assert_eq!(starting_csn, csn+6);
let latest_commit_csn = lm.latest_commit_csn();
assert_eq!(latest_commit_csn, csn+5);
let mut lr = lm.get_reader().expect("Failed to get log reader");
let (start_pos, lsn, starting_csn, latest_commit_csn) = lr.find_write_position().expect("Failed to find write position");
assert_eq!(start_pos, 457);
assert_eq!(lsn, 10);
assert_eq!(starting_csn, csn+6);
assert_eq!(latest_commit_csn, csn+5);
let mut lr = lm.get_reader().expect("Failed to get log reader");
let (ccsn, ctsn) = lr.seek_to_latest_checkpoint().expect("Failed to get latest checkpoint").unwrap();
assert_eq!(ccsn, checkpoint_csn);
assert_eq!(ctsn, current_tsn);
let _lrh = lr.read_next().expect("Failed to get latest checkpoint");
let _lrh = lr.read_next().expect("Failed to get latest checkpoint");
let obj = lr.get_object_id();
assert_eq!(obj, obj_id);
let vec2 = lr.get_vector();
assert_eq!(vec2.obj_id(), vec.obj_id());
assert_eq!(vec2.entry_pos(), vec.entry_pos());
let data1 = lr.get_data();
assert_eq!(data1, data);
lm.terminate();
}
}