1use crate::error::Error;
2use crate::lockfile::LockFile;
3use crate::metadata::Metadata;
4use crate::wal::WalFileSet;
5use crate::{reader, writer, LogSync, ShutdownHandle};
6use openraft::RaftTypeConfig;
7use std::fs;
8use std::marker::PhantomData;
9use std::sync::{Arc, RwLock};
10use tokio::sync::oneshot;
11use tokio::task;
12use tracing::warn;
13
14#[derive(Debug)]
16pub struct LogStore<T>
17where
18 T: RaftTypeConfig,
19{
20 meta: Arc<RwLock<Metadata>>,
21 wal: Arc<RwLock<WalFileSet>>,
22 pub writer: flume::Sender<writer::Action>,
23 pub reader: flume::Sender<reader::Action>,
24 _marker: PhantomData<T>,
25}
26
27impl<T> LogStore<T>
28where
29 T: RaftTypeConfig,
30{
31 pub async fn start(base_path: String, sync: LogSync, wal_size: u32) -> Result<Self, Error> {
33 let slf = task::spawn_blocking(move || {
34 fs::create_dir_all(&base_path)?;
35 #[cfg(target_os = "linux")]
36 {
37 use std::os::unix::fs::PermissionsExt;
38 let mut perms = fs::metadata(&base_path)?.permissions();
39 perms.set_mode(0o700);
40 fs::set_permissions(&base_path, perms)?;
41 }
42
43 let lock_exists = LockFile::exists(&base_path)?;
44 if lock_exists {
45 warn!("LockFile {base_path} exists already - this is not a clean start!");
46 if LockFile::is_locked(&base_path)? {
47 panic!("LockFile {base_path} is locked and in use by another process");
48 }
49 }
50 let lockfile = LockFile::create(&base_path)?;
51 lockfile.lock()?;
52 debug_assert!(LockFile::is_locked(&base_path).unwrap());
53
54 let meta = Metadata::read_or_create(&base_path)?;
55 let meta = Arc::new(RwLock::new(meta));
56
57 let (writer, wal) = writer::spawn(
58 base_path,
59 lockfile,
60 sync,
61 wal_size,
62 lock_exists,
63 meta.clone(),
64 )?;
65 let reader = reader::spawn(meta.clone(), wal.clone())?;
66
67 Ok::<Self, Error>(Self {
68 meta,
69 wal,
70 writer,
71 reader,
72 _marker: Default::default(),
73 })
74 })
75 .await??;
76
77 Ok(slf)
78 }
79
80 #[cfg(feature = "migration")]
83 pub async fn start_writer_migration(
84 base_path: String,
85 wal_size: u32,
86 ) -> Result<flume::Sender<writer::Action>, Error> {
87 let lock_exists = LockFile::exists(&base_path)?;
88 if lock_exists {
89 warn!("LockFile in {base_path} exists already - this is not a clean start!");
90 if LockFile::is_locked(&base_path)? {
91 panic!("LockFile {base_path} is locked and in use by another process");
92 }
93 }
94 let lockfile = LockFile::create(&base_path)?;
95 lockfile.lock()?;
96
97 task::spawn_blocking(move || {
98 let meta = Metadata::read_or_create(&base_path)?;
99 let meta = Arc::new(RwLock::new(meta));
100 let (writer, _) = writer::spawn(
101 base_path,
102 lockfile,
103 LogSync::ImmediateAsync,
104 wal_size,
105 lock_exists,
106 meta,
107 )?;
108 Ok(writer)
109 })
110 .await?
111 }
112
113 pub fn shutdown_handle(&self) -> ShutdownHandle {
114 ShutdownHandle::new(self.writer.clone(), self.reader.clone())
115 }
116
117 pub async fn stop(self) -> Result<(), Error> {
118 let (tx_ack, ack) = oneshot::channel();
119 self.writer
120 .send_async(writer::Action::Shutdown(tx_ack))
121 .await?;
122 ack.await?;
123
124 let _ = self.reader.send_async(reader::Action::Shutdown).await;
125
126 Ok(())
127 }
128
129 pub(crate) fn spawn_reader(&self) -> Result<LogStoreReader<T>, Error> {
130 let tx = reader::spawn(self.meta.clone(), self.wal.clone())?;
131
132 Ok(LogStoreReader {
133 tx,
134 _marker: self._marker,
135 })
136 }
137}
138
139#[derive(Debug)]
140pub struct LogStoreReader<T>
141where
142 T: RaftTypeConfig,
143{
144 pub tx: flume::Sender<reader::Action>,
145 _marker: PhantomData<T>,
146}