Skip to main content

interstice_core/persistence/
transaction_log.rs

1use super::log_rotation::{LogRotator, RotationConfig};
2use crate::error::IntersticeError;
3use crate::runtime::transaction::Transaction;
4use interstice_abi::decode;
5use std::fs::{File, OpenOptions};
6use std::io::{Read, Seek, SeekFrom, Write};
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10/// Append-only transaction log for durable storage
11pub struct TransactionLog {
12    file: Arc<Mutex<File>>,
13    path: PathBuf,
14    /// Number of transactions recorded (in-memory cache)
15    tx_count: usize,
16    /// Log rotator for managing file size
17    rotator: LogRotator,
18}
19
20impl TransactionLog {
21    /// Create or open a transaction log at the given path
22    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, IntersticeError> {
23        Self::with_rotation(path, RotationConfig::default())
24    }
25
26    /// Create or open a transaction log with custom rotation config
27    pub fn with_rotation<P: AsRef<Path>>(
28        path: P,
29        rotation_config: RotationConfig,
30    ) -> Result<Self, IntersticeError> {
31        let path = path.as_ref().to_path_buf();
32
33        // Open or create the file
34        let file = OpenOptions::new()
35            .read(true)
36            .write(true)
37            .create(true)
38            .open(&path)
39            .map_err(|err| {
40                IntersticeError::Internal(format!("Couldn't open transaction log file: {}", err))
41            })?;
42
43        let rotator = LogRotator::new(rotation_config);
44        let mut transaction_log = Self {
45            file: Arc::new(Mutex::new(file)),
46            path,
47            tx_count: 0,
48            rotator,
49        };
50
51        // Count existing transactions
52        transaction_log.tx_count = transaction_log.read_all()?.len();
53
54        Ok(transaction_log)
55    }
56
57    pub fn append(&mut self, tx: &Transaction) -> Result<(), IntersticeError> {
58        let mut file = self.file.lock().unwrap();
59
60        // Seek to end of file
61        file.seek(SeekFrom::End(0)).map_err(|err| {
62            IntersticeError::Internal(format!("Couldn't seek log transaction end file: {}", err))
63        })?;
64
65        let encoded = tx.encode()?;
66        // Write encoded transaction and its length in bytes
67        let length = (encoded.len() as u32).to_le_bytes(); // Convert length to bytes
68        file.write_all(&length).map_err(|err| {
69            IntersticeError::Internal(format!("Couldn't write transaction to log file: {}", err))
70        })?;
71        file.write_all(&encoded).map_err(|err| {
72            IntersticeError::Internal(format!("Couldn't write transaction to log file: {}", err))
73        })?;
74
75        // Ensure data is synced to disk
76        file.sync_all().map_err(|err| {
77            IntersticeError::Internal(format!(
78                "Couldn't sync transaction log file to disk: {}",
79                err
80            ))
81        })?;
82
83        drop(file); // Release lock before rotation
84
85        self.tx_count += 1;
86
87        // Check if rotation is needed
88        if self.rotator.should_rotate(&self.path)? {
89            self.rotator.rotate(&self.path)?;
90            // Reopen the file after rotation
91            let file = OpenOptions::new()
92                .read(true)
93                .write(true)
94                .create(true)
95                .open(&self.path)
96                .map_err(|err| {
97                    IntersticeError::Internal(format!(
98                        "Couldn't open new log transaction file: {}",
99                        err
100                    ))
101                })?;
102            self.file = Arc::new(Mutex::new(file));
103        }
104
105        Ok(())
106    }
107
108    pub fn read_all(&self) -> Result<Vec<Transaction>, IntersticeError> {
109        let mut file = self.file.lock().unwrap();
110        file.seek(SeekFrom::Start(0)).map_err(|err| {
111            IntersticeError::Internal(format!("Error when opening transaction logs file: {}", err))
112        })?;
113
114        let mut transactions = Vec::new();
115        loop {
116            // Read the length of the next object
117            let mut length_buf = [0; 4]; // Buffer for length (4 bytes for u32)
118            if file.read_exact(&mut length_buf).is_err() {
119                break; // Exit loop if the end of the file is reached
120            }
121
122            let length = u32::from_le_bytes(length_buf) as usize; // Convert bytes to usize
123            let mut encoded = vec![0; length]; // Create a buffer for the encoded data
124
125            // Read the encoded object
126            file.read_exact(&mut encoded).map_err(|err| {
127                IntersticeError::Internal(format!("Error when decoding transaction logs: {}", err))
128            })?;
129
130            // Deserialize the object
131            let transaction = decode(&encoded).map_err(|err| {
132                IntersticeError::Internal(format!("Error when decoding transaction logs: {}", err))
133            })?;
134            transactions.push(transaction);
135        }
136
137        Ok(transactions)
138    }
139
140    /// Get the number of transactions in the log
141    pub fn transaction_count(&self) -> usize {
142        self.tx_count
143    }
144
145    /// Get the path to the log file
146    pub fn path(&self) -> &Path {
147        &self.path
148    }
149
150    pub fn delete_all_logs(&self) -> Result<(), IntersticeError> {
151        // Open the file with write and truncate options
152        let mut file = OpenOptions::new()
153            .write(true)
154            .truncate(true) // Truncate the file to zero length
155            .open(self.path()) // Assuming `self.file_path` is the path to the log file
156            .map_err(|err| {
157                IntersticeError::Internal(format!("Error when opening transaction logs: {}", err))
158            })?;
159
160        // Optionally, you can also write an empty byte buffer if you want to ensure it's empty
161        file.write_all(&[]).map_err(|err| {
162            IntersticeError::Internal(format!("Error when deleting transaction logs: {}", err))
163        })?;
164
165        Ok(())
166    }
167}