use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::time::Instant;
use serde::{Deserialize, Serialize};
use ursula_shard::{BucketStreamId, ShardPlacement};
use ursula_stream::{StreamCommand, StreamErrorCode, StreamSnapshot};
use crate::cold_store::ColdStoreHandle;
use crate::command::{GroupSnapshot, GroupWriteCommand};
use crate::engine::{
GroupAppendBatchFuture, GroupAppendFuture, GroupBootstrapStreamFuture, GroupCloseStreamFuture,
GroupColdHotBacklogFuture, GroupCreateStreamFuture, GroupDeleteSnapshotFuture,
GroupDeleteStreamFuture, GroupEngine, GroupEngineCreateFuture, GroupEngineError,
GroupEngineFactory, GroupEngineMetrics, GroupFlushColdFuture, GroupForkRefFuture,
GroupHeadStreamFuture, GroupInstallSnapshotFuture, GroupPlanColdFlushFuture,
GroupPlanNextColdFlushBatchFuture, GroupPlanNextColdFlushFuture, GroupPublishSnapshotFuture,
GroupReadSnapshotFuture, GroupReadStreamFuture, GroupSnapshotFuture,
GroupTouchStreamAccessFuture, GroupWriteResponse,
};
use crate::engine_in_memory::InMemoryGroupEngine;
use crate::metrics::elapsed_ns;
use crate::request::{
AppendBatchRequest, AppendRequest, BootstrapStreamRequest, CloseStreamRequest,
ColdWriteAdmission, CreateStreamRequest, DeleteSnapshotRequest, DeleteStreamRequest,
FlushColdRequest, HeadStreamRequest, PlanColdFlushRequest, PlanGroupColdFlushRequest,
PublishSnapshotRequest, ReadSnapshotRequest, ReadStreamRequest, StreamAppendCount,
TouchStreamAccessResponse,
};
#[derive(Debug, Clone)]
pub struct WalGroupEngineFactory {
root: PathBuf,
cold_store: Option<ColdStoreHandle>,
}
impl WalGroupEngineFactory {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self {
root: root.into(),
cold_store: None,
}
}
pub fn with_cold_store(root: impl Into<PathBuf>, cold_store: Option<ColdStoreHandle>) -> Self {
Self {
root: root.into(),
cold_store,
}
}
}
impl GroupEngineFactory for WalGroupEngineFactory {
fn create<'a>(
&'a self,
placement: ShardPlacement,
metrics: GroupEngineMetrics,
) -> GroupEngineCreateFuture<'a> {
Box::pin(async move {
let engine: Box<dyn GroupEngine> = Box::new(WalGroupEngine::open(
&self.root,
placement,
metrics,
self.cold_store.clone(),
));
Ok(engine)
})
}
}
pub struct WalGroupEngine {
inner: InMemoryGroupEngine,
log_path: PathBuf,
placement: ShardPlacement,
metrics: GroupEngineMetrics,
init_error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "wal_record", rename_all = "snake_case")]
enum WalRecord {
Command {
command: Box<GroupWriteCommand>,
},
Snapshot {
group_commit_index: u64,
stream_snapshot: StreamSnapshot,
stream_append_counts: Vec<StreamAppendCount>,
},
}
impl WalGroupEngine {
fn open(
root: &Path,
placement: ShardPlacement,
metrics: GroupEngineMetrics,
cold_store: Option<ColdStoreHandle>,
) -> Self {
let log_path = group_log_path(root, placement);
match replay_group_log(&log_path) {
Ok(mut inner) => {
inner.cold_store = cold_store;
Self {
inner,
log_path,
placement,
metrics,
init_error: None,
}
}
Err(err) => Self {
inner: InMemoryGroupEngine {
cold_store,
..InMemoryGroupEngine::default()
},
log_path,
placement,
metrics,
init_error: Some(err.message().to_owned()),
},
}
}
fn ensure_ready(&self) -> Result<(), GroupEngineError> {
match &self.init_error {
Some(message) => Err(GroupEngineError::new(message.clone())),
None => Ok(()),
}
}
fn append_record(&self, command: &GroupWriteCommand) -> Result<(), GroupEngineError> {
self.append_records(std::slice::from_ref(command))
}
fn append_records(&self, commands: &[GroupWriteCommand]) -> Result<(), GroupEngineError> {
if commands.is_empty() {
return Ok(());
}
let Some(parent) = self.log_path.parent() else {
return Err(GroupEngineError::new(format!(
"WAL path '{}' has no parent directory",
self.log_path.display()
)));
};
fs::create_dir_all(parent).map_err(|err| {
GroupEngineError::new(format!("create WAL dir '{}': {err}", parent.display()))
})?;
let write_started_at = Instant::now();
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)
.map_err(|err| {
GroupEngineError::new(format!("open WAL '{}': {err}", self.log_path.display()))
})?;
for command in commands {
let record = WalRecord::Command {
command: Box::new(command.clone()),
};
serde_json::to_writer(&mut file, &record).map_err(|err| {
GroupEngineError::new(format!("encode WAL '{}': {err}", self.log_path.display()))
})?;
file.write_all(b"\n").map_err(|err| {
GroupEngineError::new(format!("write WAL '{}': {err}", self.log_path.display()))
})?;
}
let write_ns = elapsed_ns(write_started_at);
let sync_started_at = Instant::now();
file.sync_data().map_err(|err| {
GroupEngineError::new(format!("sync WAL '{}': {err}", self.log_path.display()))
})?;
self.metrics.record_wal_batch(
self.placement,
commands.len(),
write_ns,
elapsed_ns(sync_started_at),
);
Ok(())
}
fn append_snapshot_record(&self, snapshot: &GroupSnapshot) -> Result<(), GroupEngineError> {
let record = WalRecord::Snapshot {
group_commit_index: snapshot.group_commit_index,
stream_snapshot: snapshot.stream_snapshot.clone(),
stream_append_counts: snapshot.stream_append_counts.clone(),
};
let Some(parent) = self.log_path.parent() else {
return Err(GroupEngineError::new(format!(
"WAL path '{}' has no parent directory",
self.log_path.display()
)));
};
fs::create_dir_all(parent).map_err(|err| {
GroupEngineError::new(format!("create WAL dir '{}': {err}", parent.display()))
})?;
let write_started_at = Instant::now();
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)
.map_err(|err| {
GroupEngineError::new(format!("open WAL '{}': {err}", self.log_path.display()))
})?;
serde_json::to_writer(&mut file, &record).map_err(|err| {
GroupEngineError::new(format!("encode WAL '{}': {err}", self.log_path.display()))
})?;
file.write_all(b"\n").map_err(|err| {
GroupEngineError::new(format!("write WAL '{}': {err}", self.log_path.display()))
})?;
let write_ns = elapsed_ns(write_started_at);
let sync_started_at = Instant::now();
file.sync_data().map_err(|err| {
GroupEngineError::new(format!("sync WAL '{}': {err}", self.log_path.display()))
})?;
self.metrics
.record_wal_batch(self.placement, 1, write_ns, elapsed_ns(sync_started_at));
Ok(())
}
fn commit_access_if_needed(
&mut self,
stream_id: &BucketStreamId,
now_ms: u64,
renew_ttl: bool,
placement: ShardPlacement,
) -> Result<Option<TouchStreamAccessResponse>, GroupEngineError> {
if !self
.inner
.access_requires_write(stream_id, now_ms, renew_ttl)?
{
return Ok(None);
}
let command = GroupWriteCommand::TouchStreamAccess {
stream_id: stream_id.clone(),
now_ms,
renew_ttl,
};
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::TouchStreamAccess(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected touch stream access write response: {other:?}"
)));
}
};
if response.changed || response.expired {
self.append_record(&command)?;
}
self.inner = preview;
if response.expired {
return Err(GroupEngineError::stream(
StreamErrorCode::StreamNotFound,
format!("stream '{stream_id}' does not exist"),
));
}
Ok(Some(response))
}
}
impl GroupEngine for WalGroupEngine {
fn create_stream<'a>(
&'a mut self,
request: CreateStreamRequest,
placement: ShardPlacement,
) -> GroupCreateStreamFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::CreateStream(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected create stream write response: {other:?}"
)));
}
};
if !response.already_exists {
self.append_record(&command)?;
}
self.inner = preview;
Ok(response)
})
}
fn create_stream_with_cold_admission<'a>(
&'a mut self,
request: CreateStreamRequest,
placement: ShardPlacement,
admission: ColdWriteAdmission,
) -> GroupCreateStreamFuture<'a> {
if !admission.is_enabled() {
return self.create_stream(request, placement);
}
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::from(request.clone());
let mut preview = self.inner.clone();
let response =
preview.create_stream_with_admission_inner(request, placement, admission)?;
if !response.already_exists {
self.append_record(&command)?;
}
self.inner = preview;
Ok(response)
})
}
fn head_stream<'a>(
&'a mut self,
request: HeadStreamRequest,
placement: ShardPlacement,
) -> GroupHeadStreamFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
self.inner.head_stream(request, placement).await
})
}
fn read_stream<'a>(
&'a mut self,
request: ReadStreamRequest,
placement: ShardPlacement,
) -> GroupReadStreamFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
self.inner.read_stream(request, placement).await
})
}
fn publish_snapshot<'a>(
&'a mut self,
request: PublishSnapshotRequest,
placement: ShardPlacement,
) -> GroupPublishSnapshotFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::PublishSnapshot(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected publish snapshot write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn read_snapshot<'a>(
&'a mut self,
request: ReadSnapshotRequest,
placement: ShardPlacement,
) -> GroupReadSnapshotFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
self.inner.read_snapshot(request, placement).await
})
}
fn delete_snapshot<'a>(
&'a mut self,
request: DeleteSnapshotRequest,
placement: ShardPlacement,
) -> GroupDeleteSnapshotFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
self.inner.delete_snapshot(request, placement).await
})
}
fn bootstrap_stream<'a>(
&'a mut self,
request: BootstrapStreamRequest,
placement: ShardPlacement,
) -> GroupBootstrapStreamFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
self.inner.bootstrap_stream(request, placement).await
})
}
fn touch_stream_access<'a>(
&'a mut self,
stream_id: BucketStreamId,
now_ms: u64,
renew_ttl: bool,
placement: ShardPlacement,
) -> GroupTouchStreamAccessFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::TouchStreamAccess {
stream_id,
now_ms,
renew_ttl,
};
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::TouchStreamAccess(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected touch stream access write response: {other:?}"
)));
}
};
if response.changed || response.expired {
self.append_record(&command)?;
}
self.inner = preview;
Ok(response)
})
}
fn add_fork_ref<'a>(
&'a mut self,
stream_id: BucketStreamId,
now_ms: u64,
placement: ShardPlacement,
) -> GroupForkRefFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::AddForkRef { stream_id, now_ms };
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::AddForkRef(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected add fork ref write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn release_fork_ref<'a>(
&'a mut self,
stream_id: BucketStreamId,
placement: ShardPlacement,
) -> GroupForkRefFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::ReleaseForkRef { stream_id };
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::ReleaseForkRef(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected release fork ref write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn close_stream<'a>(
&'a mut self,
request: CloseStreamRequest,
placement: ShardPlacement,
) -> GroupCloseStreamFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::CloseStream(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected close stream write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn delete_stream<'a>(
&'a mut self,
request: DeleteStreamRequest,
placement: ShardPlacement,
) -> GroupDeleteStreamFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::DeleteStream(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected delete stream write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn append<'a>(
&'a mut self,
request: AppendRequest,
placement: ShardPlacement,
) -> GroupAppendFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::Append(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected append write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn append_with_cold_admission<'a>(
&'a mut self,
request: AppendRequest,
placement: ShardPlacement,
admission: ColdWriteAdmission,
) -> GroupAppendFuture<'a> {
if !admission.is_enabled() {
return self.append(request, placement);
}
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
let command = GroupWriteCommand::from(request.clone());
let mut preview = self.inner.clone();
let response = preview.append_with_admission_inner(request, placement, admission)?;
if !response.deduplicated {
self.append_record(&command)?;
}
self.inner = preview;
Ok(response)
})
}
fn append_batch<'a>(
&'a mut self,
request: AppendBatchRequest,
placement: ShardPlacement,
) -> GroupAppendBatchFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::AppendBatch(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected append batch write response: {other:?}"
)));
}
};
if response
.items
.iter()
.any(|item| matches!(item, Ok(response) if !response.deduplicated))
{
self.append_record(&command)?;
}
self.inner = preview;
Ok(response)
})
}
fn append_batch_with_cold_admission<'a>(
&'a mut self,
request: AppendBatchRequest,
placement: ShardPlacement,
admission: ColdWriteAdmission,
) -> GroupAppendBatchFuture<'a> {
if !admission.is_enabled() {
return self.append_batch(request, placement);
}
Box::pin(async move {
self.ensure_ready()?;
self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
let command = GroupWriteCommand::from(request.clone());
let mut preview = self.inner.clone();
let response =
preview.append_batch_with_admission_inner(request, placement, admission)?;
if response
.items
.iter()
.any(|item| matches!(item, Ok(response) if !response.deduplicated))
{
self.append_record(&command)?;
}
self.inner = preview;
Ok(response)
})
}
fn flush_cold<'a>(
&'a mut self,
request: FlushColdRequest,
placement: ShardPlacement,
) -> GroupFlushColdFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let command = GroupWriteCommand::from(request);
let mut preview = self.inner.clone();
let response = match preview.apply_committed_write(command.clone(), placement)? {
GroupWriteResponse::FlushCold(response) => response,
other => {
return Err(GroupEngineError::new(format!(
"unexpected flush cold write response: {other:?}"
)));
}
};
self.append_record(&command)?;
self.inner = preview;
Ok(response)
})
}
fn plan_cold_flush<'a>(
&'a mut self,
request: PlanColdFlushRequest,
placement: ShardPlacement,
) -> GroupPlanColdFlushFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.inner.plan_cold_flush(request, placement).await
})
}
fn plan_next_cold_flush<'a>(
&'a mut self,
request: PlanGroupColdFlushRequest,
placement: ShardPlacement,
) -> GroupPlanNextColdFlushFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.inner.plan_next_cold_flush(request, placement).await
})
}
fn plan_next_cold_flush_batch<'a>(
&'a mut self,
request: PlanGroupColdFlushRequest,
placement: ShardPlacement,
max_candidates: usize,
) -> GroupPlanNextColdFlushBatchFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.inner
.plan_next_cold_flush_batch(request, placement, max_candidates)
.await
})
}
fn cold_hot_backlog<'a>(
&'a mut self,
stream_id: BucketStreamId,
placement: ShardPlacement,
) -> GroupColdHotBacklogFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.inner.cold_hot_backlog(stream_id, placement).await
})
}
fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
self.inner.snapshot(placement).await
})
}
fn install_snapshot<'a>(
&'a mut self,
snapshot: GroupSnapshot,
) -> GroupInstallSnapshotFuture<'a> {
Box::pin(async move {
self.ensure_ready()?;
let mut preview = self.inner.clone();
preview.install_snapshot(snapshot.clone()).await?;
self.append_snapshot_record(&snapshot)?;
self.inner = preview;
Ok(())
})
}
}
pub(crate) fn group_log_path(root: &Path, placement: ShardPlacement) -> PathBuf {
root.join(format!("core-{}", placement.core_id.0))
.join(format!("group-{}.jsonl", placement.raft_group_id.0))
}
fn replay_group_log(log_path: &Path) -> Result<InMemoryGroupEngine, GroupEngineError> {
if !log_path.exists() {
return Ok(InMemoryGroupEngine::default());
}
let file = File::open(log_path).map_err(|err| {
GroupEngineError::new(format!("open WAL '{}': {err}", log_path.display()))
})?;
let reader = BufReader::new(file);
let mut inner = InMemoryGroupEngine::default();
for (line_index, line) in reader.lines().enumerate() {
let line = line.map_err(|err| {
GroupEngineError::new(format!(
"read WAL '{}' line {}: {err}",
log_path.display(),
line_index + 1
))
})?;
if line.trim().is_empty() {
continue;
}
if let Ok(record) = serde_json::from_str::<WalRecord>(&line) {
match record {
WalRecord::Command { command } => inner
.apply_replayed_write_command(*command)
.map_err(|err| {
GroupEngineError::new(format!(
"replay WAL command '{}' line {}: {err}",
log_path.display(),
line_index + 1
))
})?,
WalRecord::Snapshot {
group_commit_index,
stream_snapshot,
stream_append_counts,
} => inner
.install_snapshot_parts(
group_commit_index,
stream_snapshot,
stream_append_counts,
)
.map_err(|err| {
GroupEngineError::new(format!(
"replay WAL snapshot '{}' line {}: {err}",
log_path.display(),
line_index + 1
))
})?,
}
continue;
}
let command = serde_json::from_str::<StreamCommand>(&line).map_err(|err| {
GroupEngineError::new(format!(
"decode WAL '{}' line {}: {err}",
log_path.display(),
line_index + 1
))
})?;
inner.apply_replayed_command(command).map_err(|err| {
GroupEngineError::new(format!(
"replay WAL '{}' line {}: {err}",
log_path.display(),
line_index + 1
))
})?;
}
Ok(inner)
}