use std::sync::{Arc, Mutex, MutexGuard};
use lora_store::{MutationEvent, MutationRecorder};
use super::errors::{WalBufferedCommitError, WalCommitError, WalPoisonError, WroteCommit};
use super::mirror::WalMirror;
use crate::errors::WalError;
use crate::lsn::Lsn;
use crate::wal::Wal;
#[derive(Default)]
struct RecorderState {
armed: bool,
active_tx: Option<Lsn>,
buffer: Vec<MutationEvent>,
poisoned: Option<String>,
}
pub struct WalRecorder {
wal: Arc<Wal>,
mirror: Option<Arc<dyn WalMirror>>,
state: Mutex<RecorderState>,
}
impl WalRecorder {
pub fn new(wal: Arc<Wal>) -> Self {
Self::new_with_mirror(wal, None)
}
pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
Self {
wal,
mirror,
state: Mutex::new(RecorderState::default()),
}
}
pub fn wal(&self) -> &Arc<Wal> {
&self.wal
}
fn state_lock(&self) -> MutexGuard<'_, RecorderState> {
self.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub fn arm(&self) -> Result<(), WalError> {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
if state.armed {
state.poisoned = Some("WalRecorder::arm called while already armed".into());
return Err(WalError::Poisoned);
}
state.armed = true;
state.buffer.clear();
Ok(())
}
pub fn commit(&self) -> Result<WroteCommit, WalError> {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
if !state.armed {
state.poisoned = Some("WalRecorder::commit called without an armed query".into());
return Err(WalError::Poisoned);
}
state.armed = false;
if state.buffer.is_empty() && state.active_tx.is_none() {
return Ok(WroteCommit::No);
}
let events = std::mem::take(&mut state.buffer);
let tx = match state.active_tx {
Some(tx) => tx,
None => self.wal.begin().inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?,
};
state.active_tx = Some(tx);
self.wal.append_batch(tx, events).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
self.wal.commit(tx).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
state.active_tx = None;
Ok(WroteCommit::Yes)
}
pub fn commit_and_flush_if_needed(&self) -> Result<WroteCommit, WalCommitError> {
let wrote_commit = self.commit().map_err(WalCommitError::Commit)?;
if wrote_commit.wrote() {
self.flush().map_err(WalCommitError::Flush)?;
}
Ok(wrote_commit)
}
pub fn commit_events(
&self,
events: impl IntoIterator<Item = MutationEvent>,
) -> Result<WroteCommit, WalBufferedCommitError> {
let mut events = events.into_iter().peekable();
if events.peek().is_none() {
self.ensure_not_poisoned()
.map_err(|e| WalBufferedCommitError::Poisoned(e.reason().to_string()))?;
return Ok(WroteCommit::No);
}
self.arm().map_err(WalBufferedCommitError::Arm)?;
for event in events {
self.record(event);
if let Some(reason) = self.poisoned_reason() {
return Err(WalBufferedCommitError::ReplayPoisoned(reason));
}
}
self.commit_and_flush_if_needed().map_err(Into::into)
}
pub fn abort(&self) -> Result<bool, WalError> {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
state.armed = false;
let had_buffered_events = !state.buffer.is_empty();
state.buffer.clear();
match state.active_tx.take() {
Some(tx) => {
self.wal.abort(tx).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
Ok(true)
}
None => Ok(had_buffered_events),
}
}
pub fn flush(&self) -> Result<(), WalError> {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
self.wal.flush().inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
if let Some(mirror) = &self.mirror {
mirror.persist(self.wal.dir()).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
}
Ok(())
}
pub fn force_fsync(&self) -> Result<(), WalError> {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
self.wal.force_fsync().inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
if let Some(mirror) = &self.mirror {
mirror.persist_force(self.wal.dir()).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})?;
}
Ok(())
}
pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return Err(WalError::Poisoned);
}
self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
state.poisoned = Some(e.to_string());
})
}
pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
if let Some(mirror) = &self.mirror {
mirror.persist_force(self.wal.dir())?;
return Ok(());
}
self.wal.truncate_up_to(fence_lsn)?;
Ok(())
}
pub fn is_poisoned(&self) -> bool {
self.poisoned_reason().is_some()
}
pub fn poisoned_reason(&self) -> Option<String> {
let state = self.state_lock();
if let Some(msg) = state.poisoned.clone() {
return Some(msg);
}
self.wal.bg_failure()
}
pub fn ensure_not_poisoned(&self) -> Result<(), WalPoisonError> {
if let Some(reason) = self.poisoned_reason() {
return Err(WalPoisonError { reason });
}
Ok(())
}
pub fn poison(&self, reason: impl Into<String>) {
let mut state = self.state_lock();
state.poisoned.get_or_insert_with(|| reason.into());
state.active_tx = None;
state.armed = false;
state.buffer.clear();
}
#[doc(hidden)]
pub fn clear_poisoned_for_tests(&self) {
let mut state = self.state_lock();
state.poisoned = None;
state.active_tx = None;
state.armed = false;
state.buffer.clear();
}
}
impl MutationRecorder for WalRecorder {
fn record(&self, event: MutationEvent) {
let mut state = self.state_lock();
if state.poisoned.is_some() {
return;
}
if !state.armed {
state.poisoned.get_or_insert_with(|| {
"MutationRecorder::record fired outside an armed query".into()
});
return;
}
state.buffer.push(event);
}
fn poisoned(&self) -> Option<String> {
self.poisoned_reason()
}
}