use crate::error::Error;
use crate::metadata::Metadata;
use crate::wal::WalFileSet;
use crate::{reader, writer, LogSync, ShutdownHandle};
use openraft::RaftTypeConfig;
use std::fs;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};
use tokio::sync::oneshot;
use tokio::task;
#[derive(Debug)]
pub struct LogStore<T>
where
T: RaftTypeConfig,
{
meta: Arc<RwLock<Metadata>>,
wal: Arc<RwLock<WalFileSet>>,
pub writer: flume::Sender<writer::Action>,
pub reader: flume::Sender<reader::Action>,
_marker: PhantomData<T>,
}
impl<T> LogStore<T>
where
T: RaftTypeConfig,
{
pub async fn start(
base_path: String,
sync: LogSync,
wal_size: u32,
wal_ignore_lock: bool,
) -> Result<Self, Error> {
let slf = task::spawn_blocking(move || {
fs::create_dir_all(&base_path)?;
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&base_path)?.permissions();
perms.set_mode(0o700);
fs::set_permissions(&base_path, perms)?;
}
let meta = Metadata::read_or_create(&base_path)?;
let meta = Arc::new(RwLock::new(meta));
let (writer, wal) =
writer::spawn(base_path, sync, wal_size, wal_ignore_lock, meta.clone())?;
let reader = reader::spawn(meta.clone(), wal.clone())?;
Ok::<Self, Error>(Self {
meta,
wal,
writer,
reader,
_marker: Default::default(),
})
})
.await??;
Ok(slf)
}
#[cfg(feature = "migration")]
pub async fn start_writer_migration(
base_path: String,
wal_size: u32,
wal_ignore_lock: bool,
) -> Result<flume::Sender<writer::Action>, Error> {
task::spawn_blocking(move || {
let meta = Metadata::read_or_create(&base_path)?;
let meta = Arc::new(RwLock::new(meta));
let (writer, _) = writer::spawn(
base_path,
LogSync::ImmediateAsync,
wal_size,
wal_ignore_lock,
meta,
)?;
Ok(writer)
})
.await?
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
ShutdownHandle::new(self.writer.clone(), self.reader.clone())
}
pub async fn stop(self) -> Result<(), Error> {
let (tx_ack, ack) = oneshot::channel();
self.writer
.send_async(writer::Action::Shutdown(tx_ack))
.await?;
ack.await?;
let _ = self.reader.send_async(reader::Action::Shutdown).await;
Ok(())
}
pub(crate) fn spawn_reader(&self) -> Result<LogStoreReader<T>, Error> {
let tx = reader::spawn(self.meta.clone(), self.wal.clone())?;
Ok(LogStoreReader {
tx,
_marker: self._marker,
})
}
}
#[derive(Debug)]
pub struct LogStoreReader<T>
where
T: RaftTypeConfig,
{
pub tx: flume::Sender<reader::Action>,
_marker: PhantomData<T>,
}