Skip to main content

hiqlite_wal/
log_store.rs

1use crate::error::Error;
2use crate::lockfile::LockFile;
3use crate::metadata::Metadata;
4use crate::wal::WalFileSet;
5use crate::{reader, writer, LogSync, ShutdownHandle};
6use openraft::RaftTypeConfig;
7use std::fs;
8use std::marker::PhantomData;
9use std::sync::{Arc, RwLock};
10use tokio::sync::oneshot;
11use tokio::task;
12use tracing::warn;
13
14/// `T::NodeId` MUST be a `u64` for the `LogStore` to work correctly.
15#[derive(Debug)]
16pub struct LogStore<T>
17where
18    T: RaftTypeConfig,
19{
20    meta: Arc<RwLock<Metadata>>,
21    wal: Arc<RwLock<WalFileSet>>,
22    pub writer: flume::Sender<writer::Action>,
23    pub reader: flume::Sender<reader::Action>,
24    _marker: PhantomData<T>,
25}
26
27impl<T> LogStore<T>
28where
29    T: RaftTypeConfig,
30{
31    /// Start the LogStore
32    pub async fn start(base_path: String, sync: LogSync, wal_size: u32) -> Result<Self, Error> {
33        let slf = task::spawn_blocking(move || {
34            fs::create_dir_all(&base_path)?;
35            #[cfg(target_os = "linux")]
36            {
37                use std::os::unix::fs::PermissionsExt;
38                let mut perms = fs::metadata(&base_path)?.permissions();
39                perms.set_mode(0o700);
40                fs::set_permissions(&base_path, perms)?;
41            }
42
43            let lock_exists = LockFile::exists(&base_path)?;
44            if lock_exists {
45                warn!("LockFile {base_path} exists already - this is not a clean start!");
46                if LockFile::is_locked(&base_path)? {
47                    panic!("LockFile {base_path} is locked and in use by another process");
48                }
49            }
50            let lockfile = LockFile::create(&base_path)?;
51            lockfile.lock()?;
52            debug_assert!(LockFile::is_locked(&base_path).unwrap());
53
54            let meta = Metadata::read_or_create(&base_path)?;
55            let meta = Arc::new(RwLock::new(meta));
56
57            let (writer, wal) = writer::spawn(
58                base_path,
59                lockfile,
60                sync,
61                wal_size,
62                lock_exists,
63                meta.clone(),
64            )?;
65            let reader = reader::spawn(meta.clone(), wal.clone())?;
66
67            Ok::<Self, Error>(Self {
68                meta,
69                wal,
70                writer,
71                reader,
72                _marker: Default::default(),
73            })
74        })
75        .await??;
76
77        Ok(slf)
78    }
79
80    /// Gives you a raw handle to the writer channel to perform manual migrations. Does not start
81    /// a log store and does not do anything on its own.
82    #[cfg(feature = "migration")]
83    pub async fn start_writer_migration(
84        base_path: String,
85        wal_size: u32,
86    ) -> Result<flume::Sender<writer::Action>, Error> {
87        let lock_exists = LockFile::exists(&base_path)?;
88        if lock_exists {
89            warn!("LockFile in {base_path} exists already - this is not a clean start!");
90            if LockFile::is_locked(&base_path)? {
91                panic!("LockFile {base_path} is locked and in use by another process");
92            }
93        }
94        let lockfile = LockFile::create(&base_path)?;
95        lockfile.lock()?;
96
97        task::spawn_blocking(move || {
98            let meta = Metadata::read_or_create(&base_path)?;
99            let meta = Arc::new(RwLock::new(meta));
100            let (writer, _) = writer::spawn(
101                base_path,
102                lockfile,
103                LogSync::ImmediateAsync,
104                wal_size,
105                lock_exists,
106                meta,
107            )?;
108            Ok(writer)
109        })
110        .await?
111    }
112
113    pub fn shutdown_handle(&self) -> ShutdownHandle {
114        ShutdownHandle::new(self.writer.clone(), self.reader.clone())
115    }
116
117    pub async fn stop(self) -> Result<(), Error> {
118        let (tx_ack, ack) = oneshot::channel();
119        self.writer
120            .send_async(writer::Action::Shutdown(tx_ack))
121            .await?;
122        ack.await?;
123
124        let _ = self.reader.send_async(reader::Action::Shutdown).await;
125
126        Ok(())
127    }
128
129    pub(crate) fn spawn_reader(&self) -> Result<LogStoreReader<T>, Error> {
130        let tx = reader::spawn(self.meta.clone(), self.wal.clone())?;
131
132        Ok(LogStoreReader {
133            tx,
134            _marker: self._marker,
135        })
136    }
137}
138
139#[derive(Debug)]
140pub struct LogStoreReader<T>
141where
142    T: RaftTypeConfig,
143{
144    pub tx: flume::Sender<reader::Action>,
145    _marker: PhantomData<T>,
146}