interstice_core/persistence/
transaction_log.rs1use super::log_rotation::{LogRotator, RotationConfig};
2use crate::error::IntersticeError;
3use crate::runtime::transaction::Transaction;
4use interstice_abi::decode;
5use std::fs::{File, OpenOptions};
6use std::io::{Read, Seek, SeekFrom, Write};
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10pub struct TransactionLog {
12 file: Arc<Mutex<File>>,
13 path: PathBuf,
14 tx_count: usize,
16 rotator: LogRotator,
18}
19
20impl TransactionLog {
21 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, IntersticeError> {
23 Self::with_rotation(path, RotationConfig::default())
24 }
25
26 pub fn with_rotation<P: AsRef<Path>>(
28 path: P,
29 rotation_config: RotationConfig,
30 ) -> Result<Self, IntersticeError> {
31 let path = path.as_ref().to_path_buf();
32
33 let file = OpenOptions::new()
35 .read(true)
36 .write(true)
37 .create(true)
38 .open(&path)
39 .map_err(|err| {
40 IntersticeError::Internal(format!("Couldn't open transaction log file: {}", err))
41 })?;
42
43 let rotator = LogRotator::new(rotation_config);
44 let mut transaction_log = Self {
45 file: Arc::new(Mutex::new(file)),
46 path,
47 tx_count: 0,
48 rotator,
49 };
50
51 transaction_log.tx_count = transaction_log.read_all()?.len();
53
54 Ok(transaction_log)
55 }
56
57 pub fn append(&mut self, tx: &Transaction) -> Result<(), IntersticeError> {
58 let mut file = self.file.lock().unwrap();
59
60 file.seek(SeekFrom::End(0)).map_err(|err| {
62 IntersticeError::Internal(format!("Couldn't seek log transaction end file: {}", err))
63 })?;
64
65 let encoded = tx.encode()?;
66 let length = (encoded.len() as u32).to_le_bytes(); file.write_all(&length).map_err(|err| {
69 IntersticeError::Internal(format!("Couldn't write transaction to log file: {}", err))
70 })?;
71 file.write_all(&encoded).map_err(|err| {
72 IntersticeError::Internal(format!("Couldn't write transaction to log file: {}", err))
73 })?;
74
75 file.sync_all().map_err(|err| {
77 IntersticeError::Internal(format!(
78 "Couldn't sync transaction log file to disk: {}",
79 err
80 ))
81 })?;
82
83 drop(file); self.tx_count += 1;
86
87 if self.rotator.should_rotate(&self.path)? {
89 self.rotator.rotate(&self.path)?;
90 let file = OpenOptions::new()
92 .read(true)
93 .write(true)
94 .create(true)
95 .open(&self.path)
96 .map_err(|err| {
97 IntersticeError::Internal(format!(
98 "Couldn't open new log transaction file: {}",
99 err
100 ))
101 })?;
102 self.file = Arc::new(Mutex::new(file));
103 }
104
105 Ok(())
106 }
107
108 pub fn read_all(&self) -> Result<Vec<Transaction>, IntersticeError> {
109 let mut file = self.file.lock().unwrap();
110 file.seek(SeekFrom::Start(0)).map_err(|err| {
111 IntersticeError::Internal(format!("Error when opening transaction logs file: {}", err))
112 })?;
113
114 let mut transactions = Vec::new();
115 loop {
116 let mut length_buf = [0; 4]; if file.read_exact(&mut length_buf).is_err() {
119 break; }
121
122 let length = u32::from_le_bytes(length_buf) as usize; let mut encoded = vec![0; length]; file.read_exact(&mut encoded).map_err(|err| {
127 IntersticeError::Internal(format!("Error when decoding transaction logs: {}", err))
128 })?;
129
130 let transaction = decode(&encoded).map_err(|err| {
132 IntersticeError::Internal(format!("Error when decoding transaction logs: {}", err))
133 })?;
134 transactions.push(transaction);
135 }
136
137 Ok(transactions)
138 }
139
140 pub fn transaction_count(&self) -> usize {
142 self.tx_count
143 }
144
145 pub fn path(&self) -> &Path {
147 &self.path
148 }
149
150 pub fn delete_all_logs(&self) -> Result<(), IntersticeError> {
151 let mut file = OpenOptions::new()
153 .write(true)
154 .truncate(true) .open(self.path()) .map_err(|err| {
157 IntersticeError::Internal(format!("Error when opening transaction logs: {}", err))
158 })?;
159
160 file.write_all(&[]).map_err(|err| {
162 IntersticeError::Internal(format!("Error when deleting transaction logs: {}", err))
163 })?;
164
165 Ok(())
166 }
167}