libsql_wal/
shared_wal.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::Instant;
6
7use arc_swap::ArcSwap;
8use crossbeam::deque::Injector;
9use crossbeam::sync::Unparker;
10use parking_lot::{Mutex, MutexGuard};
11use tokio::sync::{mpsc, watch};
12use uuid::Uuid;
13
14use crate::checkpointer::CheckpointMessage;
15use crate::error::{Error, Result};
16use crate::io::file::FileExt;
17use crate::io::Io;
18use crate::replication::storage::ReplicateFromStorage;
19use crate::segment::current::CurrentSegment;
20use crate::segment_swap_strategy::SegmentSwapStrategy;
21use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
22use libsql_sys::name::NamespaceName;
23
24#[derive(Default)]
25pub struct WalLock {
26    pub(crate) tx_id: Arc<async_lock::Mutex<Option<u64>>>,
27    /// When a writer is popped from the write queue, its write transaction may not be reading from the most recent
28    /// snapshot. In this case, we return `SQLITE_BUSY_SNAPHSOT` to the caller. If no reads were performed
29    /// with that transaction before upgrading, then the caller will call us back immediately after re-acquiring
30    /// a read mark.
31    /// Without the reserved slot, the writer would be re-enqueued, a writer before it would be inserted,
32    /// and we'd find ourselves in the initial situation. Instead, we use the reserved slot to bypass the queue when the
33    /// writer tried to re-acquire the write lock.
34    pub(crate) reserved: Mutex<Option<u64>>,
35    next_tx_id: AtomicU64,
36    pub(crate) waiters: Injector<(Unparker, u64)>,
37}
38
39pub(crate) trait SwapLog<IO: Io>: Sync + Send + 'static {
40    fn swap_current(&self, shared: &SharedWal<IO>, tx: &dyn TxGuard<IO::File>) -> Result<()>;
41}
42
43pub struct SharedWal<IO: Io> {
44    pub(crate) current: ArcSwap<CurrentSegment<IO::File>>,
45    pub(crate) wal_lock: Arc<WalLock>,
46    pub(crate) db_file: IO::File,
47    pub(crate) namespace: NamespaceName,
48    pub(crate) registry: Arc<dyn SwapLog<IO>>,
49    #[allow(dead_code)] // used by replication
50    pub(crate) checkpointed_frame_no: AtomicU64,
51    /// max frame_no acknowledged by the durable storage
52    pub(crate) durable_frame_no: Arc<Mutex<u64>>,
53    pub(crate) new_frame_notifier: tokio::sync::watch::Sender<u64>,
54    pub(crate) stored_segments: Box<dyn ReplicateFromStorage>,
55    pub(crate) shutdown: AtomicBool,
56    pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
57    pub(crate) io: Arc<IO>,
58    pub(crate) swap_strategy: Box<dyn SegmentSwapStrategy>,
59    pub(crate) wals_path: PathBuf,
60}
61
62impl<IO: Io> SharedWal<IO> {
63    #[tracing::instrument(skip(self), fields(namespace = self.namespace.as_str()))]
64    pub fn shutdown(&self) -> Result<()> {
65        tracing::info!("started namespace shutdown");
66        self.shutdown.store(true, Ordering::SeqCst);
67        // fixme: for infinite loop
68        let mut tx = loop {
69            let mut tx = Transaction::Read(self.begin_read(u64::MAX));
70            match self.upgrade(&mut tx) {
71                Ok(_) => break tx,
72                Err(Error::BusySnapshot) => continue,
73                Err(e) => return Err(e),
74            }
75        };
76
77        {
78            let mut tx = tx.as_write_mut().unwrap().lock();
79            tx.commit();
80            self.registry.swap_current(self, &tx)?;
81        }
82        // The current segment will not be used anymore. It's empty, but we still seal it so that
83        // the next startup doesn't find an unsealed segment.
84        self.current.load().seal(self.io.now())?;
85        tracing::info!("namespace shutdown");
86        Ok(())
87    }
88
89    pub fn new_frame_notifier(&self) -> watch::Receiver<u64> {
90        self.new_frame_notifier.subscribe()
91    }
92
93    pub fn db_size(&self) -> u32 {
94        self.current.load().db_size()
95    }
96
97    pub fn log_id(&self) -> Uuid {
98        self.current.load().log_id()
99    }
100
101    pub fn durable_frame_no(&self) -> u64 {
102        *self.durable_frame_no.lock()
103    }
104
105    #[tracing::instrument(skip_all)]
106    pub fn begin_read(&self, conn_id: u64) -> ReadTransaction<IO::File> {
107        // FIXME: this is not enough to just increment the counter, we must make sure that the segment
108        // is not sealed. If the segment is sealed, retry with the current segment
109        let current = self.current.load();
110        current.inc_reader_count();
111        let (max_frame_no, db_size, max_offset) = current.with_header(|header| {
112            (
113                header.last_committed(),
114                header.size_after(),
115                header.frame_count() as u64,
116            )
117        });
118        let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed);
119        ReadTransaction {
120            id,
121            max_frame_no,
122            current: current.clone(),
123            db_size,
124            created_at: Instant::now(),
125            conn_id,
126            pages_read: 0,
127            namespace: self.namespace.clone(),
128            checkpoint_notifier: self.checkpoint_notifier.clone(),
129            max_offset,
130        }
131    }
132
133    /// Upgrade a read transaction to a write transaction
134    pub fn upgrade(&self, tx: &mut Transaction<IO::File>) -> Result<()> {
135        loop {
136            match tx {
137                Transaction::Write(_) => unreachable!("already in a write transaction"),
138                Transaction::Read(read_tx) => {
139                    let mut reserved = self.wal_lock.reserved.lock();
140                    match *reserved {
141                        // we have already reserved the slot, go ahead and try to acquire
142                        Some(id) if id == read_tx.conn_id => {
143                            tracing::trace!("taking reserved slot");
144                            reserved.take();
145                            let lock = self.wal_lock.tx_id.lock_blocking();
146                            assert!(lock.is_none());
147                            let write_tx = self.acquire_write(read_tx, lock, reserved)?;
148                            *tx = Transaction::Write(write_tx);
149                            return Ok(());
150                        }
151                        None => {
152                            let lock = self.wal_lock.tx_id.lock_blocking();
153                            if lock.is_none() && self.wal_lock.waiters.is_empty() {
154                                let write_tx = self.acquire_write(read_tx, lock, reserved)?;
155                                *tx = Transaction::Write(write_tx);
156                                return Ok(());
157                            }
158                        }
159                        _ => (),
160                    }
161
162                    tracing::trace!(
163                        "txn currently held by another connection, registering to wait queue"
164                    );
165
166                    let parker = crossbeam::sync::Parker::new();
167                    let unparker = parker.unparker().clone();
168                    self.wal_lock.waiters.push((unparker, read_tx.conn_id));
169                    drop(reserved);
170                    parker.park();
171                }
172            }
173        }
174    }
175
176    fn acquire_write(
177        &self,
178        read_tx: &ReadTransaction<IO::File>,
179        mut tx_id_lock: async_lock::MutexGuard<Option<u64>>,
180        mut reserved: MutexGuard<Option<u64>>,
181    ) -> Result<WriteTransaction<IO::File>> {
182        assert!(reserved.is_none() || *reserved == Some(read_tx.conn_id));
183        assert!(tx_id_lock.is_none());
184        // we read two fields in the header. There is no risk that a transaction commit in
185        // between the two reads because this would require that:
186        // 1) there would be a running txn
187        // 2) that transaction held the lock to tx_id (be in a transaction critical section)
188        let current = self.current.load();
189        let last_commited = current.last_committed();
190        if read_tx.max_frame_no != last_commited || current.is_sealed() {
191            if read_tx.pages_read <= 1 {
192                // this transaction hasn't read anything yet, it will retry to
193                // acquire the lock, reserved the slot so that it can make
194                // progress quickly
195                // TODO: is it possible that we upgrade the read lock ourselves, so we don't need
196                // that reserved stuff anymore? If nothing was read, just upgrade the read,
197                // otherwise return snapshot busy and let the connection do the cleanup.
198                tracing::debug!("reserving tx slot");
199                reserved.replace(read_tx.conn_id);
200            }
201            return Err(Error::BusySnapshot);
202        }
203        let next_offset = current.count_committed() as u32;
204        let next_frame_no = current.next_frame_no().get();
205        *tx_id_lock = Some(read_tx.id);
206        let current_checksum = current.current_checksum();
207
208        Ok(WriteTransaction {
209            wal_lock: self.wal_lock.clone(),
210            savepoints: vec![Savepoint {
211                current_checksum,
212                next_offset,
213                next_frame_no,
214                index: BTreeMap::new(),
215            }],
216            next_frame_no,
217            next_offset,
218            current_checksum,
219            is_commited: false,
220            read_tx: read_tx.clone(),
221            recompute_checksum: None,
222        })
223    }
224
225    #[tracing::instrument(skip(self, tx, buffer))]
226    pub fn read_page(
227        &self,
228        tx: &mut Transaction<IO::File>,
229        page_no: u32,
230        buffer: &mut [u8],
231    ) -> Result<()> {
232        match tx.current.find_frame(page_no, tx) {
233            Some(offset) => {
234                // some debug assertions to make sure invariants hold
235                #[cfg(debug_assertions)]
236                {
237                    if let Ok(header) = tx.current.frame_header_at(offset) {
238                        // the frame we got is not more recent than max frame_no
239                        assert!(
240                            header.frame_no() <= tx.max_frame_no(),
241                            "read frame is greater than max frame, {}, {}",
242                            header.frame_no(),
243                            tx.max_frame_no()
244                        );
245                        // the page we got is the page we asked for
246                        assert_eq!(header.page_no(), page_no);
247                    }
248                }
249
250                tx.current.read_page_offset(offset, buffer)?;
251            }
252            None => {
253                // locate in segments
254                if !tx
255                    .current
256                    .tail()
257                    .read_page(page_no, tx.max_frame_no, buffer)?
258                {
259                    // read from db_file
260                    tracing::trace!(page_no, "reading from main file");
261                    self.db_file
262                        .read_exact_at(buffer, (page_no as u64 - 1) * 4096)?;
263                }
264            }
265        }
266
267        tx.pages_read += 1;
268
269        Ok(())
270    }
271
272    #[tracing::instrument(skip_all, fields(tx_id = tx.id))]
273    pub fn insert_frames<'a>(
274        &self,
275        tx: &mut WriteTransaction<IO::File>,
276        pages: impl Iterator<Item = (u32, &'a [u8])>,
277        size_after: Option<u32>,
278    ) -> Result<()> {
279        let current = self.current.load();
280        let mut tx = tx.lock();
281        if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx)? {
282            self.new_frame_notifier.send_replace(last_committed);
283        }
284
285        if tx.is_commited() && self.swap_strategy.should_swap(current.count_committed()) {
286            self.swap_current(&tx)?;
287            self.swap_strategy.swapped();
288        }
289
290        Ok(())
291    }
292
293    /// Cut the current log, and register it for storage
294    pub fn seal_current(&self) -> Result<()> {
295        let mut tx = self.begin_read(u64::MAX).into();
296        self.upgrade(&mut tx)?;
297
298        let ret = {
299            let mut guard = tx.as_write_mut().unwrap().lock();
300            guard.commit();
301            self.swap_current(&mut guard)
302        };
303        // make sure the tx is always ended before it's dropped!
304        // FIXME: this is an issue with this design, since downgrade consume self, we can't have a
305        // drop implementation. The should probably have a Option<WriteTxnInner>, to that we can
306        // take &mut Self instead.
307        tx.end();
308
309        ret
310    }
311
312    /// Swap the current log. A write lock must be held, but the transaction must be must be committed already.
313    pub(crate) fn swap_current(&self, tx: &impl TxGuard<IO::File>) -> Result<()> {
314        self.registry.swap_current(self, tx)?;
315        Ok(())
316    }
317
318    #[tracing::instrument(skip(self))]
319    pub async fn checkpoint(&self) -> Result<Option<u64>> {
320        let durable_frame_no = *self.durable_frame_no.lock();
321        let checkpointed_frame_no = self
322            .current
323            .load()
324            .tail()
325            .checkpoint(&self.db_file, durable_frame_no, self.log_id(), &self.io)
326            .await?;
327        if let Some(checkpointed_frame_no) = checkpointed_frame_no {
328            self.checkpointed_frame_no
329                .store(checkpointed_frame_no, Ordering::SeqCst);
330        }
331
332        Ok(checkpointed_frame_no)
333    }
334
335    pub fn last_committed_frame_no(&self) -> u64 {
336        let current = self.current.load();
337        current.last_committed_frame_no()
338    }
339
340    pub fn namespace(&self) -> &NamespaceName {
341        &self.namespace
342    }
343}
344
345#[cfg(test)]
346mod test {
347    use crate::test::{seal_current_segment, TestEnv};
348
349    use super::*;
350
351    #[tokio::test]
352    async fn checkpoint() {
353        let env = TestEnv::new();
354        let conn = env.open_conn("test");
355        let shared = env.shared("test");
356
357        assert_eq!(shared.checkpointed_frame_no.load(Ordering::Relaxed), 0);
358
359        conn.execute("create table test (x)", ()).unwrap();
360        conn.execute("insert into test values (12)", ()).unwrap();
361        conn.execute("insert into test values (12)", ()).unwrap();
362
363        assert_eq!(shared.checkpointed_frame_no.load(Ordering::Relaxed), 0);
364
365        seal_current_segment(&shared);
366
367        *shared.durable_frame_no.lock() = 999999;
368
369        let frame_no = shared.checkpoint().await.unwrap().unwrap();
370        assert_eq!(frame_no, 4);
371        assert_eq!(shared.checkpointed_frame_no.load(Ordering::Relaxed), 4);
372
373        assert!(shared.checkpoint().await.unwrap().is_none());
374    }
375}