libsql_wal/
lib.rs

1#![allow(async_fn_in_trait, dead_code)]
2
3pub mod checkpointer;
4pub mod error;
5pub mod io;
6pub mod registry;
7pub mod replication;
8pub mod segment;
9mod segment_swap_strategy;
10pub mod shared_wal;
11pub mod storage;
12pub mod transaction;
13pub mod wal;
14
15const LIBSQL_MAGIC: u64 = u64::from_be_bytes(*b"LIBSQL\0\0");
16const LIBSQL_PAGE_SIZE: u16 = 4096;
17const LIBSQL_WAL_VERSION: u16 = 1;
18
19use uuid::Uuid;
20use zerocopy::byteorder::big_endian::{U128 as bu128, U16 as bu16, U64 as bu64};
21/// LibsqlFooter is located at the end of the libsql file. I contains libsql specific metadata,
22/// while remaining fully compatible with sqlite (which just ignores that footer)
23///
24/// The fields are in big endian to remain coherent with sqlite
25#[derive(Copy, Clone, Debug, zerocopy::FromBytes, zerocopy::FromZeroes, zerocopy::AsBytes)]
26#[repr(C)]
27pub struct LibsqlFooter {
28    pub magic: bu64,
29    pub version: bu16,
30    /// Replication index checkpointed into this file.
31    /// only valid if there are no outstanding segments to checkpoint, since a checkpoint could be
32    /// partial.
33    pub replication_index: bu64,
34    /// Id of the log for this this database
35    pub log_id: bu128,
36}
37
38impl LibsqlFooter {
39    pub fn log_id(&self) -> Uuid {
40        Uuid::from_u128(self.log_id.get())
41    }
42
43    fn validate(&self) -> error::Result<()> {
44        if self.magic.get() != LIBSQL_MAGIC {
45            return Err(error::Error::InvalidFooterMagic);
46        }
47
48        if self.version.get() != LIBSQL_WAL_VERSION {
49            return Err(error::Error::InvalidFooterVersion);
50        }
51
52        Ok(())
53    }
54}
55
56#[cfg(any(debug_assertions, test))]
57pub mod test {
58    use std::fs::OpenOptions;
59    use std::path::Path;
60    use std::path::PathBuf;
61    use std::sync::Arc;
62    use std::time::Duration;
63
64    use libsql_sys::name::NamespaceName;
65    use libsql_sys::rusqlite::OpenFlags;
66    use tempfile::{tempdir, TempDir};
67    use tokio::sync::mpsc;
68
69    use crate::checkpointer::LibsqlCheckpointer;
70    use crate::io::Io;
71    use crate::io::StdIO;
72    use crate::registry::WalRegistry;
73    use crate::shared_wal::SharedWal;
74    use crate::storage::TestStorage;
75    use crate::wal::{LibsqlWal, LibsqlWalManager};
76
77    pub struct TestEnv<IO: Io = StdIO> {
78        pub tmp: Arc<TempDir>,
79        pub registry: Arc<WalRegistry<IO, TestStorage<IO>>>,
80        pub wal: LibsqlWalManager<IO, TestStorage<IO>>,
81    }
82
83    impl TestEnv {
84        pub fn new() -> Self {
85            Self::new_store(false)
86        }
87
88        pub fn new_store(store: bool) -> Self {
89            TestEnv::new_io(StdIO(()), store)
90        }
91    }
92
93    impl<IO: Io + Clone> TestEnv<IO> {
94        pub fn new_io(io: IO, store: bool) -> Self {
95            let tmp = tempdir().unwrap();
96            Self::new_io_and_tmp(io, tmp.into(), store)
97        }
98
99        pub fn new_io_and_tmp(io: IO, tmp: Arc<TempDir>, store: bool) -> Self {
100            let resolver = |path: &Path| {
101                let name = path
102                    .parent()
103                    .unwrap()
104                    .file_name()
105                    .unwrap()
106                    .to_str()
107                    .unwrap();
108                NamespaceName::from_string(name.to_string())
109            };
110
111            let (sender, receiver) = mpsc::channel(128);
112            let registry = Arc::new(
113                WalRegistry::new_with_io(io.clone(), TestStorage::new_io(store, io).into(), sender)
114                    .unwrap(),
115            );
116
117            if store {
118                let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 5);
119                tokio::spawn(checkpointer.run());
120            }
121
122            let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
123
124            Self { tmp, registry, wal }
125        }
126
127        pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO>> {
128            let path = self.tmp.path().join(namespace).join("data");
129            let registry = self.registry.clone();
130            let namespace = NamespaceName::from_string(namespace.into());
131            registry.clone().open(path.as_ref(), &namespace).unwrap()
132        }
133
134        pub fn db_path(&self, namespace: &str) -> PathBuf {
135            self.tmp.path().join(namespace)
136        }
137
138        pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection<LibsqlWal<IO>> {
139            let path = self.db_path(namespace);
140            let wal = self.wal.clone();
141            std::fs::create_dir_all(&path).unwrap();
142            libsql_sys::Connection::open(
143                path.join("data"),
144                OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
145                wal,
146                100000,
147                None,
148            )
149            .unwrap()
150        }
151
152        pub fn db_file(&self, namespace: &str) -> std::fs::File {
153            let path = self.db_path(namespace);
154            OpenOptions::new()
155                .read(true)
156                .write(true)
157                .open(path)
158                .unwrap()
159        }
160    }
161
162    pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO>) {
163        let mut tx = shared.begin_read(99999).into();
164        shared.upgrade(&mut tx).unwrap();
165        {
166            let mut guard = tx.as_write_mut().unwrap().lock();
167            guard.commit();
168            shared.swap_current(&mut guard).unwrap();
169        }
170        tx.end();
171    }
172
173    pub async fn wait_current_durable<IO: Io>(shared: &SharedWal<IO>) {
174        let current = shared.current.load().next_frame_no().get() - 1;
175        loop {
176            {
177                if *shared.durable_frame_no.lock() >= current {
178                    break;
179                }
180            }
181
182            tokio::time::sleep(Duration::from_millis(5)).await;
183        }
184    }
185}