1use std::collections::BTreeMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use arc_swap::ArcSwap;
8use crossbeam::deque::Injector;
9use crossbeam::sync::Unparker;
10use parking_lot::{Mutex, MutexGuard};
11use tokio::sync::{mpsc, watch};
12use uuid::Uuid;
13
14use crate::checkpointer::CheckpointMessage;
15use crate::error::{Error, Result};
16use crate::io::file::FileExt;
17use crate::io::Io;
18use crate::replication::storage::ReplicateFromStorage;
19use crate::segment::current::CurrentSegment;
20use crate::segment_swap_strategy::SegmentSwapStrategy;
21use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
22use libsql_sys::name::NamespaceName;
23
24#[derive(Default)]
25pub struct WalLock {
26 pub(crate) tx_id: Arc<async_lock::Mutex<Option<u64>>>,
27 pub(crate) reserved: Mutex<Option<u64>>,
35 next_tx_id: AtomicU64,
36 pub(crate) waiters: Injector<(Unparker, u64)>,
37}
38
39pub(crate) trait SwapLog<IO: Io>: Sync + Send + 'static {
40 fn swap_current(&self, shared: &SharedWal<IO>, tx: &dyn TxGuard<IO::File>) -> Result<()>;
41}
42
43pub struct SharedWal<IO: Io> {
44 pub(crate) current: ArcSwap<CurrentSegment<IO::File>>,
45 pub(crate) wal_lock: Arc<WalLock>,
46 pub(crate) db_file: IO::File,
47 pub(crate) namespace: NamespaceName,
48 pub(crate) registry: Arc<dyn SwapLog<IO>>,
49 #[allow(dead_code)] pub(crate) checkpointed_frame_no: AtomicU64,
51 pub(crate) durable_frame_no: Arc<Mutex<u64>>,
53 pub(crate) new_frame_notifier: tokio::sync::watch::Sender<u64>,
54 pub(crate) stored_segments: Box<dyn ReplicateFromStorage>,
55 pub(crate) shutdown: AtomicBool,
56 pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
57 pub(crate) io: Arc<IO>,
58 pub(crate) swap_strategy: Box<dyn SegmentSwapStrategy>,
59 pub(crate) wals_path: PathBuf,
60}
61
62impl<IO: Io> SharedWal<IO> {
63 #[tracing::instrument(skip(self), fields(namespace = self.namespace.as_str()))]
64 pub fn shutdown(&self) -> Result<()> {
65 tracing::info!("started namespace shutdown");
66 self.shutdown.store(true, Ordering::SeqCst);
67 let mut tx = loop {
69 let mut tx = Transaction::Read(self.begin_read(u64::MAX));
70 match self.upgrade(&mut tx) {
71 Ok(_) => break tx,
72 Err(Error::BusySnapshot) => continue,
73 Err(e) => return Err(e),
74 }
75 };
76
77 {
78 let mut tx = tx.as_write_mut().unwrap().lock();
79 tx.commit();
80 self.registry.swap_current(self, &tx)?;
81 }
82 self.current.load().seal(self.io.now())?;
85 tracing::info!("namespace shutdown");
86 Ok(())
87 }
88
89 pub fn new_frame_notifier(&self) -> watch::Receiver<u64> {
90 self.new_frame_notifier.subscribe()
91 }
92
93 pub fn db_size(&self) -> u32 {
94 self.current.load().db_size()
95 }
96
97 pub fn log_id(&self) -> Uuid {
98 self.current.load().log_id()
99 }
100
101 pub fn durable_frame_no(&self) -> u64 {
102 *self.durable_frame_no.lock()
103 }
104
105 #[tracing::instrument(skip_all)]
106 pub fn begin_read(&self, conn_id: u64) -> ReadTransaction<IO::File> {
107 let current = self.current.load();
110 current.inc_reader_count();
111 let (max_frame_no, db_size, max_offset) = current.with_header(|header| {
112 (
113 header.last_committed(),
114 header.size_after(),
115 header.frame_count() as u64,
116 )
117 });
118 let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed);
119 ReadTransaction {
120 id,
121 max_frame_no,
122 current: current.clone(),
123 db_size,
124 created_at: Instant::now(),
125 conn_id,
126 pages_read: 0,
127 namespace: self.namespace.clone(),
128 checkpoint_notifier: self.checkpoint_notifier.clone(),
129 max_offset,
130 }
131 }
132
133 pub fn upgrade(&self, tx: &mut Transaction<IO::File>) -> Result<()> {
135 loop {
136 match tx {
137 Transaction::Write(_) => unreachable!("already in a write transaction"),
138 Transaction::Read(read_tx) => {
139 let mut reserved = self.wal_lock.reserved.lock();
140 match *reserved {
141 Some(id) if id == read_tx.conn_id => {
143 tracing::trace!("taking reserved slot");
144 reserved.take();
145 let lock = self.wal_lock.tx_id.lock_blocking();
146 assert!(lock.is_none());
147 let write_tx = self.acquire_write(read_tx, lock, reserved)?;
148 *tx = Transaction::Write(write_tx);
149 return Ok(());
150 }
151 None => {
152 let lock = self.wal_lock.tx_id.lock_blocking();
153 if lock.is_none() && self.wal_lock.waiters.is_empty() {
154 let write_tx = self.acquire_write(read_tx, lock, reserved)?;
155 *tx = Transaction::Write(write_tx);
156 return Ok(());
157 }
158 }
159 _ => (),
160 }
161
162 tracing::trace!(
163 "txn currently held by another connection, registering to wait queue"
164 );
165
166 let parker = crossbeam::sync::Parker::new();
167 let unparker = parker.unparker().clone();
168 self.wal_lock.waiters.push((unparker, read_tx.conn_id));
169 drop(reserved);
170 parker.park();
171 }
172 }
173 }
174 }
175
176 fn acquire_write(
177 &self,
178 read_tx: &ReadTransaction<IO::File>,
179 mut tx_id_lock: async_lock::MutexGuard<Option<u64>>,
180 mut reserved: MutexGuard<Option<u64>>,
181 ) -> Result<WriteTransaction<IO::File>> {
182 assert!(reserved.is_none() || *reserved == Some(read_tx.conn_id));
183 assert!(tx_id_lock.is_none());
184 let current = self.current.load();
189 let last_commited = current.last_committed();
190 if read_tx.max_frame_no != last_commited || current.is_sealed() {
191 if read_tx.pages_read <= 1 {
192 tracing::debug!("reserving tx slot");
199 reserved.replace(read_tx.conn_id);
200 }
201 return Err(Error::BusySnapshot);
202 }
203 let next_offset = current.count_committed() as u32;
204 let next_frame_no = current.next_frame_no().get();
205 *tx_id_lock = Some(read_tx.id);
206 let current_checksum = current.current_checksum();
207
208 Ok(WriteTransaction {
209 wal_lock: self.wal_lock.clone(),
210 savepoints: vec![Savepoint {
211 current_checksum,
212 next_offset,
213 next_frame_no,
214 index: BTreeMap::new(),
215 }],
216 next_frame_no,
217 next_offset,
218 current_checksum,
219 is_commited: false,
220 read_tx: read_tx.clone(),
221 recompute_checksum: None,
222 })
223 }
224
225 #[tracing::instrument(skip(self, tx, buffer))]
226 pub fn read_page(
227 &self,
228 tx: &mut Transaction<IO::File>,
229 page_no: u32,
230 buffer: &mut [u8],
231 ) -> Result<()> {
232 match tx.current.find_frame(page_no, tx) {
233 Some(offset) => {
234 #[cfg(debug_assertions)]
236 {
237 if let Ok(header) = tx.current.frame_header_at(offset) {
238 assert!(
240 header.frame_no() <= tx.max_frame_no(),
241 "read frame is greater than max frame, {}, {}",
242 header.frame_no(),
243 tx.max_frame_no()
244 );
245 assert_eq!(header.page_no(), page_no);
247 }
248 }
249
250 tx.current.read_page_offset(offset, buffer)?;
251 }
252 None => {
253 if !tx
255 .current
256 .tail()
257 .read_page(page_no, tx.max_frame_no, buffer)?
258 {
259 tracing::trace!(page_no, "reading from main file");
261 self.db_file
262 .read_exact_at(buffer, (page_no as u64 - 1) * 4096)?;
263 }
264 }
265 }
266
267 tx.pages_read += 1;
268
269 Ok(())
270 }
271
272 #[tracing::instrument(skip_all, fields(tx_id = tx.id))]
273 pub fn insert_frames<'a>(
274 &self,
275 tx: &mut WriteTransaction<IO::File>,
276 pages: impl Iterator<Item = (u32, &'a [u8])>,
277 size_after: Option<u32>,
278 ) -> Result<()> {
279 let current = self.current.load();
280 let mut tx = tx.lock();
281 if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx)? {
282 self.new_frame_notifier.send_replace(last_committed);
283 }
284
285 if tx.is_commited() && self.swap_strategy.should_swap(current.count_committed()) {
286 self.swap_current(&tx)?;
287 self.swap_strategy.swapped();
288 }
289
290 Ok(())
291 }
292
293 pub fn seal_current(&self) -> Result<()> {
295 let mut tx = self.begin_read(u64::MAX).into();
296 self.upgrade(&mut tx)?;
297
298 let ret = {
299 let mut guard = tx.as_write_mut().unwrap().lock();
300 guard.commit();
301 self.swap_current(&mut guard)
302 };
303 tx.end();
308
309 ret
310 }
311
312 pub(crate) fn swap_current(&self, tx: &impl TxGuard<IO::File>) -> Result<()> {
314 self.registry.swap_current(self, tx)?;
315 Ok(())
316 }
317
318 #[tracing::instrument(skip(self))]
319 pub async fn checkpoint(&self) -> Result<Option<u64>> {
320 let durable_frame_no = *self.durable_frame_no.lock();
321 let checkpointed_frame_no = self
322 .current
323 .load()
324 .tail()
325 .checkpoint(&self.db_file, durable_frame_no, self.log_id(), &self.io)
326 .await?;
327 if let Some(checkpointed_frame_no) = checkpointed_frame_no {
328 self.checkpointed_frame_no
329 .store(checkpointed_frame_no, Ordering::SeqCst);
330 }
331
332 Ok(checkpointed_frame_no)
333 }
334
335 pub fn last_committed_frame_no(&self) -> u64 {
336 let current = self.current.load();
337 current.last_committed_frame_no()
338 }
339
340 pub fn namespace(&self) -> &NamespaceName {
341 &self.namespace
342 }
343}
344
345#[cfg(test)]
346mod test {
347 use crate::test::{seal_current_segment, TestEnv};
348
349 use super::*;
350
351 #[tokio::test]
352 async fn checkpoint() {
353 let env = TestEnv::new();
354 let conn = env.open_conn("test");
355 let shared = env.shared("test");
356
357 assert_eq!(shared.checkpointed_frame_no.load(Ordering::Relaxed), 0);
358
359 conn.execute("create table test (x)", ()).unwrap();
360 conn.execute("insert into test values (12)", ()).unwrap();
361 conn.execute("insert into test values (12)", ()).unwrap();
362
363 assert_eq!(shared.checkpointed_frame_no.load(Ordering::Relaxed), 0);
364
365 seal_current_segment(&shared);
366
367 *shared.durable_frame_no.lock() = 999999;
368
369 let frame_no = shared.checkpoint().await.unwrap().unwrap();
370 assert_eq!(frame_no, 4);
371 assert_eq!(shared.checkpointed_frame_no.load(Ordering::Relaxed), 4);
372
373 assert!(shared.checkpoint().await.unwrap().is_none());
374 }
375}