use std::{fmt, pin::Pin, sync::Arc};
use fusio::{
Error as FusioError,
dynamic::{MaybeSend, MaybeSendFuture, MaybeSync},
fs::{CasCondition, FsCas},
path::{Path, PathPart},
};
use serde::{Deserialize, Serialize};
use super::{WalError, WalResult};
use crate::{id::FileId, mvcc::Timestamp};
const STATE_FILE_NAME: &str = "state.json";
type WalStateLoadFuture<'a> =
Pin<Box<dyn MaybeSendFuture<Output = WalResult<Option<(Vec<u8>, String)>>> + 'a>>;
type WalStatePutFuture<'a> = Pin<Box<dyn MaybeSendFuture<Output = WalResult<String>> + 'a>>;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct WalState {
pub last_segment_seq: Option<u64>,
pub last_frame_seq: Option<u64>,
pub last_commit_ts: Option<u64>,
pub sealed_segments: Vec<WalSegmentBounds>,
pub active_segment: Option<WalSegmentBounds>,
}
impl WalState {
#[inline]
pub(crate) fn commit_ts(&self) -> Option<Timestamp> {
self.last_commit_ts.map(Timestamp::new)
}
#[inline]
pub(crate) fn set_commit_ts(&mut self, ts: Timestamp) {
self.last_commit_ts = Some(ts.get());
}
#[inline]
pub fn set_frame_seq(&mut self, seq: u64) {
self.last_frame_seq = Some(seq);
}
#[inline]
pub fn set_segment_seq(&mut self, seq: u64) {
self.last_segment_seq = Some(seq);
}
pub fn replace_sealed_segments(&mut self, segments: Vec<WalSegmentBounds>) {
self.sealed_segments = segments;
}
pub fn upsert_sealed_segment(&mut self, segment: WalSegmentBounds) {
self.sealed_segments
.retain(|existing| existing.seq != segment.seq);
self.sealed_segments.push(segment);
}
pub fn retain_sealed_segments<F>(&mut self, mut predicate: F)
where
F: FnMut(&WalSegmentBounds) -> bool,
{
self.sealed_segments.retain(|segment| predicate(segment));
}
pub fn sealed_segments(&self) -> &[WalSegmentBounds] {
&self.sealed_segments
}
pub fn set_active_segment(&mut self, bounds: WalSegmentBounds) {
self.active_segment = Some(bounds);
}
pub fn clear_active_segment(&mut self) {
self.active_segment = None;
}
pub fn active_segment(&self) -> Option<&WalSegmentBounds> {
self.active_segment.as_ref()
}
}
#[derive(Clone)]
pub struct WalStateHandle {
store: Arc<dyn WalStateStore>,
path: Path,
tag: Option<String>,
state: WalState,
}
impl fmt::Debug for WalStateHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalStateHandle")
.field("path", &self.path)
.field("tag_present", &self.tag.is_some())
.field("state", &self.state)
.finish()
}
}
impl WalStateHandle {
pub async fn load(store: Arc<dyn WalStateStore>, dir: &Path) -> WalResult<Self> {
let path = state_path(dir)?;
let (state, tag) = match store.load(&path).await? {
Some((bytes, tag)) if !bytes.is_empty() => {
let state = serde_json::from_slice(&bytes)
.map_err(|err| WalError::State(format!("decode wal state json: {err}")))?;
(state, Some(tag))
}
Some((_bytes, tag)) => (WalState::default(), Some(tag)),
None => (WalState::default(), None),
};
Ok(Self {
store,
path,
tag,
state,
})
}
#[inline]
pub fn state(&self) -> &WalState {
&self.state
}
#[inline]
pub fn state_mut(&mut self) -> &mut WalState {
&mut self.state
}
pub fn sealed_segments(&self) -> &[WalSegmentBounds] {
self.state.sealed_segments()
}
pub fn active_segment(&self) -> Option<&WalSegmentBounds> {
self.state.active_segment()
}
pub async fn persist(&mut self) -> WalResult<()> {
let payload = serde_json::to_vec(&self.state)
.map_err(|err| WalError::State(format!("encode wal state json: {err}")))?;
let new_tag = self
.store
.put(&self.path, &payload, self.tag.as_deref())
.await?;
self.tag = Some(new_tag);
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WalSegmentBounds {
pub seq: u64,
pub file_id: FileId,
pub first_frame: u64,
pub last_frame: u64,
}
impl WalSegmentBounds {
pub fn new(seq: u64, file_id: FileId, first_frame: u64, last_frame: u64) -> Self {
Self {
seq,
file_id,
first_frame,
last_frame,
}
}
pub fn extend_to(&mut self, last_frame: u64) {
self.last_frame = last_frame;
}
}
fn state_path(dir: &Path) -> WalResult<Path> {
let part = PathPart::parse(STATE_FILE_NAME)
.map_err(|err| WalError::State(format!("invalid state path component: {err}")))?;
Ok(dir.child(part))
}
pub trait WalStateStore: MaybeSend + MaybeSync {
fn load<'a>(&'a self, path: &'a Path) -> WalStateLoadFuture<'a>;
fn put<'a>(
&'a self,
path: &'a Path,
payload: &'a [u8],
expect: Option<&'a str>,
) -> WalStatePutFuture<'a>;
}
#[derive(Clone)]
pub struct FsWalStateStore {
cas: Arc<dyn FsCas>,
}
impl FsWalStateStore {
pub fn new(cas: Arc<dyn FsCas>) -> Self {
Self { cas }
}
}
impl WalStateStore for FsWalStateStore {
fn load<'a>(&'a self, path: &'a Path) -> WalStateLoadFuture<'a> {
Box::pin(async move {
match self
.cas
.load_with_tag(path)
.await
.map_err(|err| map_fusio_err("load wal state", err))?
{
Some((bytes, tag)) => Ok(Some((bytes, tag))),
None => Ok(None),
}
})
}
fn put<'a>(
&'a self,
path: &'a Path,
payload: &'a [u8],
expect: Option<&'a str>,
) -> WalStatePutFuture<'a> {
Box::pin(async move {
let condition = match expect {
Some(tag) => CasCondition::IfMatch(tag.to_string()),
None => CasCondition::IfNotExists,
};
self.cas
.put_conditional(path, payload, Some("application/json"), None, condition)
.await
.map_err(|err| map_fusio_err("persist wal state", err))
})
}
}
fn map_fusio_err(action: &str, err: FusioError) -> WalError {
WalError::State(format!("failed to {action}: {err}"))
}