libsql_wal/
transaction.rs

1use std::collections::BTreeMap;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::Instant;
5
6use libsql_sys::name::NamespaceName;
7use tokio::sync::mpsc;
8
9use crate::checkpointer::CheckpointMessage;
10use crate::segment::current::{CurrentSegment, SegmentIndex};
11use crate::shared_wal::WalLock;
12
13pub enum Transaction<F> {
14    Write(WriteTransaction<F>),
15    Read(ReadTransaction<F>),
16}
17
18impl<T> From<ReadTransaction<T>> for Transaction<T> {
19    fn from(value: ReadTransaction<T>) -> Self {
20        Self::Read(value)
21    }
22}
23
24impl<F> Transaction<F> {
25    pub fn as_write_mut(&mut self) -> Option<&mut WriteTransaction<F>> {
26        if let Self::Write(ref mut v) = self {
27            Some(v)
28        } else {
29            None
30        }
31    }
32
33    pub fn into_write(self) -> Result<WriteTransaction<F>, Self> {
34        if let Self::Write(v) = self {
35            Ok(v)
36        } else {
37            Err(self)
38        }
39    }
40
41    pub fn max_frame_no(&self) -> u64 {
42        match self {
43            Transaction::Write(w) => w.next_frame_no - 1,
44            Transaction::Read(read) => read.max_frame_no,
45        }
46    }
47
48    pub(crate) fn end(self) {
49        match self {
50            Transaction::Write(tx) => {
51                tx.downgrade();
52            }
53            Transaction::Read(_) => (),
54        }
55    }
56}
57
58impl<F> Deref for Transaction<F> {
59    type Target = ReadTransaction<F>;
60
61    fn deref(&self) -> &Self::Target {
62        match self {
63            Transaction::Write(tx) => &tx,
64            Transaction::Read(tx) => &tx,
65        }
66    }
67}
68
69impl<F> DerefMut for Transaction<F> {
70    fn deref_mut(&mut self) -> &mut Self::Target {
71        match self {
72            Transaction::Write(ref mut tx) => tx,
73            Transaction::Read(ref mut tx) => tx,
74        }
75    }
76}
77
78pub struct ReadTransaction<F> {
79    pub id: u64,
80    /// Max frame number that this transaction can read
81    pub max_frame_no: u64,
82    // max offset that can be read from the current log
83    pub max_offset: u64,
84    pub db_size: u32,
85    /// The segment to which we have a read lock
86    pub current: Arc<CurrentSegment<F>>,
87    pub created_at: Instant,
88    pub conn_id: u64,
89    /// number of pages read by this transaction. This is used to determine whether a write lock
90    /// will be re-acquired.
91    pub pages_read: usize,
92    pub namespace: NamespaceName,
93    pub checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
94}
95
96// fixme: clone should probably not be implemented for this type, figure a way to do it
97impl<F> Clone for ReadTransaction<F> {
98    fn clone(&self) -> Self {
99        self.current.inc_reader_count();
100        Self {
101            id: self.id,
102            max_frame_no: self.max_frame_no,
103            current: self.current.clone(),
104            db_size: self.db_size,
105            created_at: self.created_at,
106            conn_id: self.conn_id,
107            pages_read: self.pages_read,
108            namespace: self.namespace.clone(),
109            checkpoint_notifier: self.checkpoint_notifier.clone(),
110            max_offset: self.max_offset,
111        }
112    }
113}
114
115impl<F> Drop for ReadTransaction<F> {
116    fn drop(&mut self) {
117        // FIXME: it would be more approriate to wait till the segment is stored before notfying,
118        // because we are not waiting for read to be released before that
119        if self.current.dec_reader_count() && self.current.is_sealed() {
120            let _: Result<_, _> = self
121                .checkpoint_notifier
122                .try_send(self.namespace.clone().into());
123        }
124    }
125}
126
127pub struct Savepoint {
128    pub next_offset: u32,
129    pub next_frame_no: u64,
130    pub current_checksum: u32,
131    pub index: BTreeMap<u32, u32>,
132}
133
134/// The savepoints must be passed from most recent to oldest
135pub(crate) fn merge_savepoints<'a>(
136    savepoints: impl Iterator<Item = &'a BTreeMap<u32, u32>>,
137    out: &SegmentIndex,
138) {
139    for savepoint in savepoints {
140        for (k, v) in savepoint.iter() {
141            out.insert(*k, *v);
142        }
143    }
144}
145
146pub struct WriteTransaction<F> {
147    /// id of the transaction currently holding the lock
148    pub wal_lock: Arc<WalLock>,
149    pub savepoints: Vec<Savepoint>,
150    pub next_frame_no: u64,
151    pub next_offset: u32,
152    pub current_checksum: u32,
153    pub is_commited: bool,
154    pub read_tx: ReadTransaction<F>,
155    /// if transaction overwrote frames, then the running checksum needs to be recomputed.
156    /// We store here the lowest segment offset at which a frame was overwritten
157    pub recompute_checksum: Option<u32>,
158}
159
160pub struct TxGuardOwned<F> {
161    lock: Option<async_lock::MutexGuardArc<Option<u64>>>,
162    inner: Option<WriteTransaction<F>>,
163}
164
165impl<F> TxGuardOwned<F> {
166    pub(crate) fn into_inner(mut self) -> WriteTransaction<F> {
167        self.lock.take();
168        self.inner.take().unwrap()
169    }
170}
171
172impl<F> Drop for TxGuardOwned<F> {
173    fn drop(&mut self) {
174        let _ = self.lock.take();
175        if let Some(inner) = self.inner.take() {
176            inner.downgrade();
177        }
178    }
179}
180
181impl<F> Deref for TxGuardOwned<F> {
182    type Target = WriteTransaction<F>;
183
184    fn deref(&self) -> &Self::Target {
185        self.inner.as_ref().expect("guard used after drop")
186    }
187}
188
189impl<F> DerefMut for TxGuardOwned<F> {
190    fn deref_mut(&mut self) -> &mut Self::Target {
191        self.inner.as_mut().expect("guard used after drop")
192    }
193}
194
195pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync {}
196
197impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> {}
198impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> {}
199
200pub struct TxGuardShared<'a, F> {
201    _lock: async_lock::MutexGuardArc<Option<u64>>,
202    inner: &'a mut WriteTransaction<F>,
203}
204
205impl<'a, F> Deref for TxGuardShared<'a, F> {
206    type Target = WriteTransaction<F>;
207
208    fn deref(&self) -> &Self::Target {
209        &self.inner
210    }
211}
212
213impl<'a, F> DerefMut for TxGuardShared<'a, F> {
214    fn deref_mut(&mut self) -> &mut Self::Target {
215        self.inner
216    }
217}
218
219impl<F> WriteTransaction<F> {
220    pub(crate) fn merge_savepoints(&self, out: &SegmentIndex) {
221        let savepoints = self.savepoints.iter().rev().map(|s| &s.index);
222        merge_savepoints(savepoints, out);
223    }
224
225    pub fn savepoint(&mut self) -> usize {
226        let savepoint_id = self.savepoints.len();
227        self.savepoints.push(Savepoint {
228            next_offset: self.next_offset,
229            next_frame_no: self.next_frame_no,
230            index: BTreeMap::new(),
231            current_checksum: self.current_checksum,
232        });
233        savepoint_id
234    }
235
236    pub fn lock(&mut self) -> TxGuardShared<F> {
237        let g = self.wal_lock.tx_id.lock_arc_blocking();
238        match *g {
239            // we still hold the lock, we can proceed
240            Some(id) if self.id == id => TxGuardShared {
241                _lock: g,
242                inner: self,
243            },
244            // Somebody took the lock from us
245            Some(_) => todo!("lock stolen"),
246            None => todo!("not a transaction"),
247        }
248    }
249
250    pub fn into_lock_owned(self) -> TxGuardOwned<F> {
251        let g = self.wal_lock.tx_id.lock_arc_blocking();
252        match *g {
253            // we still hold the lock, we can proceed
254            Some(id) if self.id == id => TxGuardOwned {
255                lock: Some(g),
256                inner: Some(self),
257            },
258            // Somebody took the lock from us
259            Some(_) => todo!("lock stolen"),
260            None => todo!("not a transaction"),
261        }
262    }
263
264    pub fn reset(&mut self, savepoint_id: usize) {
265        if savepoint_id >= self.savepoints.len() {
266            unreachable!("savepoint doesn't exist");
267        }
268
269        self.savepoints.drain(savepoint_id + 1..).count();
270        self.savepoints[savepoint_id].index.clear();
271        let last_savepoint = self.savepoints.last().unwrap();
272        self.next_frame_no = last_savepoint.next_frame_no;
273        self.next_offset = last_savepoint.next_offset;
274    }
275
276    pub fn index_page_iter(&self) -> impl Iterator<Item = u32> + '_ {
277        self.savepoints
278            .iter()
279            .map(|s| s.index.keys().copied())
280            .flatten()
281    }
282
283    pub fn not_empty(&self) -> bool {
284        self.savepoints.iter().any(|s| !s.index.is_empty())
285    }
286
287    #[tracing::instrument(skip(self))]
288    pub fn downgrade(self) -> ReadTransaction<F> {
289        tracing::trace!("downgrading write transaction");
290        let Self {
291            wal_lock, read_tx, ..
292        } = self;
293        // always acquire lock in this order: reserved, then tx_id
294        let mut reserved = wal_lock.reserved.lock();
295        let mut lock = wal_lock.tx_id.lock_blocking();
296        match *lock {
297            Some(lock_id) if lock_id == read_tx.id => {
298                lock.take();
299            }
300            _ => (),
301        }
302
303        if let Some(id) = *reserved {
304            tracing::trace!("tx already reserved by {id}");
305            return read_tx;
306        }
307
308        loop {
309            match wal_lock.waiters.steal() {
310                crossbeam::deque::Steal::Empty => {
311                    tracing::trace!("no connection waiting");
312                    break;
313                }
314                crossbeam::deque::Steal::Success((unparker, id)) => {
315                    tracing::trace!("waking up {id}");
316                    reserved.replace(id);
317                    unparker.unpark();
318                    break;
319                }
320                crossbeam::deque::Steal::Retry => (),
321            }
322        }
323
324        tracing::trace!(id = read_tx.id, "lock released");
325
326        read_tx
327    }
328
329    pub fn is_commited(&self) -> bool {
330        self.is_commited
331    }
332
333    pub(crate) fn find_frame_offset(&self, page_no: u32) -> Option<u32> {
334        let iter = self.savepoints.iter().rev().map(|s| &s.index);
335        for index in iter {
336            if let Some(val) = index.get(&page_no) {
337                return Some(*val);
338            }
339        }
340
341        None
342    }
343
344    pub(crate) fn commit(&mut self) {
345        self.is_commited = true;
346    }
347
348    pub(crate) fn current_checksum(&self) -> u32 {
349        self.savepoints.last().unwrap().current_checksum
350    }
351}
352
353impl<F> Deref for WriteTransaction<F> {
354    type Target = ReadTransaction<F>;
355
356    fn deref(&self) -> &Self::Target {
357        &self.read_tx
358    }
359}
360
361impl<F> DerefMut for WriteTransaction<F> {
362    fn deref_mut(&mut self) -> &mut Self::Target {
363        &mut self.read_tx
364    }
365}