1#![allow(async_fn_in_trait, dead_code)]
2
3pub mod checkpointer;
4pub mod error;
5pub mod io;
6pub mod registry;
7pub mod replication;
8pub mod segment;
9mod segment_swap_strategy;
10pub mod shared_wal;
11pub mod storage;
12pub mod transaction;
13pub mod wal;
14
15const LIBSQL_MAGIC: u64 = u64::from_be_bytes(*b"LIBSQL\0\0");
16const LIBSQL_PAGE_SIZE: u16 = 4096;
17const LIBSQL_WAL_VERSION: u16 = 1;
18
19use uuid::Uuid;
20use zerocopy::byteorder::big_endian::{U128 as bu128, U16 as bu16, U64 as bu64};
21#[derive(Copy, Clone, Debug, zerocopy::FromBytes, zerocopy::FromZeroes, zerocopy::AsBytes)]
26#[repr(C)]
27pub struct LibsqlFooter {
28 pub magic: bu64,
29 pub version: bu16,
30 pub replication_index: bu64,
34 pub log_id: bu128,
36}
37
38impl LibsqlFooter {
39 pub fn log_id(&self) -> Uuid {
40 Uuid::from_u128(self.log_id.get())
41 }
42
43 fn validate(&self) -> error::Result<()> {
44 if self.magic.get() != LIBSQL_MAGIC {
45 return Err(error::Error::InvalidFooterMagic);
46 }
47
48 if self.version.get() != LIBSQL_WAL_VERSION {
49 return Err(error::Error::InvalidFooterVersion);
50 }
51
52 Ok(())
53 }
54}
55
56#[cfg(any(debug_assertions, test))]
57pub mod test {
58 use std::fs::OpenOptions;
59 use std::path::Path;
60 use std::path::PathBuf;
61 use std::sync::Arc;
62 use std::time::Duration;
63
64 use libsql_sys::name::NamespaceName;
65 use libsql_sys::rusqlite::OpenFlags;
66 use tempfile::{tempdir, TempDir};
67 use tokio::sync::mpsc;
68
69 use crate::checkpointer::LibsqlCheckpointer;
70 use crate::io::Io;
71 use crate::io::StdIO;
72 use crate::registry::WalRegistry;
73 use crate::shared_wal::SharedWal;
74 use crate::storage::TestStorage;
75 use crate::wal::{LibsqlWal, LibsqlWalManager};
76
77 pub struct TestEnv<IO: Io = StdIO> {
78 pub tmp: Arc<TempDir>,
79 pub registry: Arc<WalRegistry<IO, TestStorage<IO>>>,
80 pub wal: LibsqlWalManager<IO, TestStorage<IO>>,
81 }
82
83 impl TestEnv {
84 pub fn new() -> Self {
85 Self::new_store(false)
86 }
87
88 pub fn new_store(store: bool) -> Self {
89 TestEnv::new_io(StdIO(()), store)
90 }
91 }
92
93 impl<IO: Io + Clone> TestEnv<IO> {
94 pub fn new_io(io: IO, store: bool) -> Self {
95 let tmp = tempdir().unwrap();
96 Self::new_io_and_tmp(io, tmp.into(), store)
97 }
98
99 pub fn new_io_and_tmp(io: IO, tmp: Arc<TempDir>, store: bool) -> Self {
100 let resolver = |path: &Path| {
101 let name = path
102 .parent()
103 .unwrap()
104 .file_name()
105 .unwrap()
106 .to_str()
107 .unwrap();
108 NamespaceName::from_string(name.to_string())
109 };
110
111 let (sender, receiver) = mpsc::channel(128);
112 let registry = Arc::new(
113 WalRegistry::new_with_io(io.clone(), TestStorage::new_io(store, io).into(), sender)
114 .unwrap(),
115 );
116
117 if store {
118 let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 5);
119 tokio::spawn(checkpointer.run());
120 }
121
122 let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
123
124 Self { tmp, registry, wal }
125 }
126
127 pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO>> {
128 let path = self.tmp.path().join(namespace).join("data");
129 let registry = self.registry.clone();
130 let namespace = NamespaceName::from_string(namespace.into());
131 registry.clone().open(path.as_ref(), &namespace).unwrap()
132 }
133
134 pub fn db_path(&self, namespace: &str) -> PathBuf {
135 self.tmp.path().join(namespace)
136 }
137
138 pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection<LibsqlWal<IO>> {
139 let path = self.db_path(namespace);
140 let wal = self.wal.clone();
141 std::fs::create_dir_all(&path).unwrap();
142 libsql_sys::Connection::open(
143 path.join("data"),
144 OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
145 wal,
146 100000,
147 None,
148 )
149 .unwrap()
150 }
151
152 pub fn db_file(&self, namespace: &str) -> std::fs::File {
153 let path = self.db_path(namespace);
154 OpenOptions::new()
155 .read(true)
156 .write(true)
157 .open(path)
158 .unwrap()
159 }
160 }
161
162 pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO>) {
163 let mut tx = shared.begin_read(99999).into();
164 shared.upgrade(&mut tx).unwrap();
165 {
166 let mut guard = tx.as_write_mut().unwrap().lock();
167 guard.commit();
168 shared.swap_current(&mut guard).unwrap();
169 }
170 tx.end();
171 }
172
173 pub async fn wait_current_durable<IO: Io>(shared: &SharedWal<IO>) {
174 let current = shared.current.load().next_frame_no().get() - 1;
175 loop {
176 {
177 if *shared.durable_frame_no.lock() >= current {
178 break;
179 }
180 }
181
182 tokio::time::sleep(Duration::from_millis(5)).await;
183 }
184 }
185}