use std::path::Path;
use std::sync::Arc;
use super::error::{PersistentARTrieError, Result};
use super::wal::{
AsyncWalConfig, AsyncWalError, AsyncWalWriter, Lsn, SyncHandle, WalConfig, WalRecord,
};
pub trait WalManaged {
fn wal_writer(&self) -> Option<&Arc<AsyncWalWriter>>;
fn log_insert(&self, term: &[u8], value: Option<Vec<u8>>) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::Insert {
term: term.to_vec(),
value,
};
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_insert",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_remove(&self, term: &[u8]) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::Remove {
term: term.to_vec(),
};
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_remove",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_batch(&self, entries: &[(Vec<u8>, Option<Vec<u8>>)]) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let lsn = wal.append_batch(entries).map_err(|e| {
PersistentARTrieError::io_error(
"log_batch",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_increment(&self, term: &[u8], delta: i64, result: i64) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::Increment {
term: term.to_vec(),
delta,
result,
};
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_increment",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_upsert(&self, term: &[u8], value: Vec<u8>) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::Upsert {
term: term.to_vec(),
value,
};
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_upsert",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_compare_and_swap(
&self,
term: &[u8],
expected: Option<Vec<u8>>,
new_value: Vec<u8>,
success: bool,
) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::CompareAndSwap {
term: term.to_vec(),
expected,
new_value,
success,
};
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_compare_and_swap",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_begin_tx(&self, tx_id: u64) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::BeginTx { tx_id };
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_begin_tx",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_commit_tx(&self, tx_id: u64) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::CommitTx { tx_id };
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_commit_tx",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn log_abort_tx(&self, tx_id: u64) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let record = WalRecord::AbortTx { tx_id };
let lsn = wal.append(record).map_err(|e| {
PersistentARTrieError::io_error(
"log_abort_tx",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn wal_sync(&self) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let lsn = wal.sync().map_err(|e| {
PersistentARTrieError::io_error(
"wal_sync",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn wal_sync_async(&self) -> Result<Option<SyncHandle>> {
if let Some(wal) = self.wal_writer() {
let handle = wal.sync_async().map_err(|e| {
PersistentARTrieError::io_error(
"wal_sync_async",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(handle))
} else {
Ok(None)
}
}
fn wal_current_lsn(&self) -> Option<Lsn> {
self.wal_writer().map(|w| w.current_lsn())
}
fn wal_synced_lsn(&self) -> Option<Lsn> {
self.wal_writer().map(|w| w.synced_lsn())
}
fn log_checkpoint(&self, checkpoint_lsn: Lsn) -> Result<Option<Lsn>> {
if let Some(wal) = self.wal_writer() {
let lsn = wal.checkpoint(checkpoint_lsn).map_err(|e| {
PersistentARTrieError::io_error(
"log_checkpoint",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(Some(lsn))
} else {
Ok(None)
}
}
fn wal_truncate(&self) -> Result<()> {
if let Some(wal) = self.wal_writer() {
wal.truncate().map_err(|e| {
PersistentARTrieError::io_error(
"wal_truncate",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
}
Ok(())
}
fn wal_rotate_to_archive(&self, config: &WalConfig) -> Result<Option<std::path::PathBuf>> {
if let Some(wal) = self.wal_writer() {
let path = wal.rotate_to_archive(config).map_err(|e| {
PersistentARTrieError::io_error(
"wal_rotate_to_archive",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(path)
} else {
Ok(None)
}
}
fn wal_records_empty_on_disk(&self) -> bool {
self.wal_writer()
.map(|wal| wal.records_empty_on_disk())
.unwrap_or(false)
}
fn wal_rotate_and_restamp_overlay(
&self,
config: &WalConfig,
) -> Result<Option<std::path::PathBuf>> {
if let Some(wal) = self.wal_writer() {
let path = wal.rotate_and_restamp_overlay(config).map_err(|e| {
PersistentARTrieError::io_error(
"wal_rotate_and_restamp_overlay",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(path)
} else {
Ok(None)
}
}
fn wal_stamp_overlay_regime_records_empty(&self) -> Result<()> {
if let Some(wal) = self.wal_writer() {
wal.set_overlay_regime_records_empty().map_err(|e| {
PersistentARTrieError::io_error(
"wal_stamp_overlay_regime_records_empty",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
}
Ok(())
}
fn wal_collect_segments(&self, config: &WalConfig) -> Result<Vec<std::path::PathBuf>> {
if let Some(wal) = self.wal_writer() {
let segments = wal.collect_wal_segments(config).map_err(|e| {
PersistentARTrieError::io_error(
"wal_collect_segments",
"WAL",
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
Ok(segments)
} else {
Ok(Vec::new())
}
}
}
pub fn create_async_wal<P: AsRef<Path>>(
wal_path: P,
data_path: &Path,
) -> std::result::Result<AsyncWalWriter, AsyncWalError> {
let async_config = AsyncWalConfig {
pending_dir: data_path
.parent()
.unwrap_or(Path::new("."))
.join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig::default();
AsyncWalWriter::create(wal_path, async_config, archive_config)
}
pub fn open_or_create_async_wal<P: AsRef<Path>>(
wal_path: P,
data_path: &Path,
) -> std::result::Result<AsyncWalWriter, AsyncWalError> {
let async_config = AsyncWalConfig {
pending_dir: data_path
.parent()
.unwrap_or(Path::new("."))
.join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig::default();
AsyncWalWriter::open_or_create(wal_path, async_config, archive_config)
}
pub fn open_async_wal<P: AsRef<Path>>(
wal_path: P,
data_path: &Path,
) -> std::result::Result<AsyncWalWriter, AsyncWalError> {
let async_config = AsyncWalConfig {
pending_dir: data_path
.parent()
.unwrap_or(Path::new("."))
.join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig::default();
AsyncWalWriter::open(wal_path, async_config, archive_config)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
struct TestWalManaged {
wal_writer: Option<Arc<AsyncWalWriter>>,
}
impl WalManaged for TestWalManaged {
fn wal_writer(&self) -> Option<&Arc<AsyncWalWriter>> {
self.wal_writer.as_ref()
}
}
#[test]
fn test_wal_managed_without_wal() {
let managed = TestWalManaged { wal_writer: None };
assert_eq!(managed.log_insert(b"test", None).unwrap(), None);
assert_eq!(managed.log_remove(b"test").unwrap(), None);
assert_eq!(managed.log_batch(&[]).unwrap(), None);
assert_eq!(managed.wal_sync().unwrap(), None);
assert!(managed.wal_sync_async().unwrap().is_none());
assert_eq!(managed.wal_current_lsn(), None);
assert_eq!(managed.wal_synced_lsn(), None);
assert!(managed.wal_truncate().is_ok());
}
#[test]
fn test_wal_managed_with_wal() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
let data_path = dir.path().join("test.data");
let wal = create_async_wal(&wal_path, &data_path).unwrap();
let managed = TestWalManaged {
wal_writer: Some(Arc::new(wal)),
};
let lsn = managed.log_insert(b"hello", None).unwrap();
assert!(lsn.is_some());
assert!(lsn.unwrap() > 0);
let lsn = managed.log_remove(b"hello").unwrap();
assert!(lsn.is_some());
let entries = vec![
(b"a".to_vec(), None),
(b"b".to_vec(), Some(b"value".to_vec())),
];
let lsn = managed.log_batch(&entries).unwrap();
assert!(lsn.is_some());
let lsn = managed.wal_sync().unwrap();
assert!(lsn.is_some());
assert!(managed.wal_current_lsn().is_some());
assert!(managed.wal_synced_lsn().is_some());
}
#[test]
fn test_wal_sync_async() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test_async.wal");
let data_path = dir.path().join("test_async.data");
let wal = create_async_wal(&wal_path, &data_path).unwrap();
let managed = TestWalManaged {
wal_writer: Some(Arc::new(wal)),
};
managed
.log_insert(b"key1", Some(b"value1".to_vec()))
.unwrap();
managed
.log_insert(b"key2", Some(b"value2".to_vec()))
.unwrap();
let handle = managed.wal_sync_async().unwrap();
assert!(handle.is_some());
let sync_handle = handle.unwrap();
sync_handle
.wait()
.expect("sync should complete successfully");
assert!(managed.wal_synced_lsn().is_some());
}
#[test]
fn test_create_and_open_helpers() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
let data_path = dir.path().join("test.data");
let wal = create_async_wal(&wal_path, &data_path).unwrap();
drop(wal);
let wal = open_async_wal(&wal_path, &data_path).unwrap();
drop(wal);
let wal = open_or_create_async_wal(&wal_path, &data_path).unwrap();
drop(wal);
let new_path = dir.path().join("new.wal");
let wal = open_or_create_async_wal(&new_path, &data_path).unwrap();
drop(wal);
}
}