1use std::collections::BTreeMap;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::Instant;
5
6use libsql_sys::name::NamespaceName;
7use tokio::sync::mpsc;
8
9use crate::checkpointer::CheckpointMessage;
10use crate::segment::current::{CurrentSegment, SegmentIndex};
11use crate::shared_wal::WalLock;
12
13pub enum Transaction<F> {
14 Write(WriteTransaction<F>),
15 Read(ReadTransaction<F>),
16}
17
18impl<T> From<ReadTransaction<T>> for Transaction<T> {
19 fn from(value: ReadTransaction<T>) -> Self {
20 Self::Read(value)
21 }
22}
23
24impl<F> Transaction<F> {
25 pub fn as_write_mut(&mut self) -> Option<&mut WriteTransaction<F>> {
26 if let Self::Write(ref mut v) = self {
27 Some(v)
28 } else {
29 None
30 }
31 }
32
33 pub fn into_write(self) -> Result<WriteTransaction<F>, Self> {
34 if let Self::Write(v) = self {
35 Ok(v)
36 } else {
37 Err(self)
38 }
39 }
40
41 pub fn max_frame_no(&self) -> u64 {
42 match self {
43 Transaction::Write(w) => w.next_frame_no - 1,
44 Transaction::Read(read) => read.max_frame_no,
45 }
46 }
47
48 pub(crate) fn end(self) {
49 match self {
50 Transaction::Write(tx) => {
51 tx.downgrade();
52 }
53 Transaction::Read(_) => (),
54 }
55 }
56}
57
58impl<F> Deref for Transaction<F> {
59 type Target = ReadTransaction<F>;
60
61 fn deref(&self) -> &Self::Target {
62 match self {
63 Transaction::Write(tx) => &tx,
64 Transaction::Read(tx) => &tx,
65 }
66 }
67}
68
69impl<F> DerefMut for Transaction<F> {
70 fn deref_mut(&mut self) -> &mut Self::Target {
71 match self {
72 Transaction::Write(ref mut tx) => tx,
73 Transaction::Read(ref mut tx) => tx,
74 }
75 }
76}
77
78pub struct ReadTransaction<F> {
79 pub id: u64,
80 pub max_frame_no: u64,
82 pub max_offset: u64,
84 pub db_size: u32,
85 pub current: Arc<CurrentSegment<F>>,
87 pub created_at: Instant,
88 pub conn_id: u64,
89 pub pages_read: usize,
92 pub namespace: NamespaceName,
93 pub checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
94}
95
96impl<F> Clone for ReadTransaction<F> {
98 fn clone(&self) -> Self {
99 self.current.inc_reader_count();
100 Self {
101 id: self.id,
102 max_frame_no: self.max_frame_no,
103 current: self.current.clone(),
104 db_size: self.db_size,
105 created_at: self.created_at,
106 conn_id: self.conn_id,
107 pages_read: self.pages_read,
108 namespace: self.namespace.clone(),
109 checkpoint_notifier: self.checkpoint_notifier.clone(),
110 max_offset: self.max_offset,
111 }
112 }
113}
114
115impl<F> Drop for ReadTransaction<F> {
116 fn drop(&mut self) {
117 if self.current.dec_reader_count() && self.current.is_sealed() {
120 let _: Result<_, _> = self
121 .checkpoint_notifier
122 .try_send(self.namespace.clone().into());
123 }
124 }
125}
126
127pub struct Savepoint {
128 pub next_offset: u32,
129 pub next_frame_no: u64,
130 pub current_checksum: u32,
131 pub index: BTreeMap<u32, u32>,
132}
133
134pub(crate) fn merge_savepoints<'a>(
136 savepoints: impl Iterator<Item = &'a BTreeMap<u32, u32>>,
137 out: &SegmentIndex,
138) {
139 for savepoint in savepoints {
140 for (k, v) in savepoint.iter() {
141 out.insert(*k, *v);
142 }
143 }
144}
145
146pub struct WriteTransaction<F> {
147 pub wal_lock: Arc<WalLock>,
149 pub savepoints: Vec<Savepoint>,
150 pub next_frame_no: u64,
151 pub next_offset: u32,
152 pub current_checksum: u32,
153 pub is_commited: bool,
154 pub read_tx: ReadTransaction<F>,
155 pub recompute_checksum: Option<u32>,
158}
159
160pub struct TxGuardOwned<F> {
161 lock: Option<async_lock::MutexGuardArc<Option<u64>>>,
162 inner: Option<WriteTransaction<F>>,
163}
164
165impl<F> TxGuardOwned<F> {
166 pub(crate) fn into_inner(mut self) -> WriteTransaction<F> {
167 self.lock.take();
168 self.inner.take().unwrap()
169 }
170}
171
172impl<F> Drop for TxGuardOwned<F> {
173 fn drop(&mut self) {
174 let _ = self.lock.take();
175 if let Some(inner) = self.inner.take() {
176 inner.downgrade();
177 }
178 }
179}
180
181impl<F> Deref for TxGuardOwned<F> {
182 type Target = WriteTransaction<F>;
183
184 fn deref(&self) -> &Self::Target {
185 self.inner.as_ref().expect("guard used after drop")
186 }
187}
188
189impl<F> DerefMut for TxGuardOwned<F> {
190 fn deref_mut(&mut self) -> &mut Self::Target {
191 self.inner.as_mut().expect("guard used after drop")
192 }
193}
194
195pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync {}
196
197impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> {}
198impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> {}
199
200pub struct TxGuardShared<'a, F> {
201 _lock: async_lock::MutexGuardArc<Option<u64>>,
202 inner: &'a mut WriteTransaction<F>,
203}
204
205impl<'a, F> Deref for TxGuardShared<'a, F> {
206 type Target = WriteTransaction<F>;
207
208 fn deref(&self) -> &Self::Target {
209 &self.inner
210 }
211}
212
213impl<'a, F> DerefMut for TxGuardShared<'a, F> {
214 fn deref_mut(&mut self) -> &mut Self::Target {
215 self.inner
216 }
217}
218
219impl<F> WriteTransaction<F> {
220 pub(crate) fn merge_savepoints(&self, out: &SegmentIndex) {
221 let savepoints = self.savepoints.iter().rev().map(|s| &s.index);
222 merge_savepoints(savepoints, out);
223 }
224
225 pub fn savepoint(&mut self) -> usize {
226 let savepoint_id = self.savepoints.len();
227 self.savepoints.push(Savepoint {
228 next_offset: self.next_offset,
229 next_frame_no: self.next_frame_no,
230 index: BTreeMap::new(),
231 current_checksum: self.current_checksum,
232 });
233 savepoint_id
234 }
235
236 pub fn lock(&mut self) -> TxGuardShared<F> {
237 let g = self.wal_lock.tx_id.lock_arc_blocking();
238 match *g {
239 Some(id) if self.id == id => TxGuardShared {
241 _lock: g,
242 inner: self,
243 },
244 Some(_) => todo!("lock stolen"),
246 None => todo!("not a transaction"),
247 }
248 }
249
250 pub fn into_lock_owned(self) -> TxGuardOwned<F> {
251 let g = self.wal_lock.tx_id.lock_arc_blocking();
252 match *g {
253 Some(id) if self.id == id => TxGuardOwned {
255 lock: Some(g),
256 inner: Some(self),
257 },
258 Some(_) => todo!("lock stolen"),
260 None => todo!("not a transaction"),
261 }
262 }
263
264 pub fn reset(&mut self, savepoint_id: usize) {
265 if savepoint_id >= self.savepoints.len() {
266 unreachable!("savepoint doesn't exist");
267 }
268
269 self.savepoints.drain(savepoint_id + 1..).count();
270 self.savepoints[savepoint_id].index.clear();
271 let last_savepoint = self.savepoints.last().unwrap();
272 self.next_frame_no = last_savepoint.next_frame_no;
273 self.next_offset = last_savepoint.next_offset;
274 }
275
276 pub fn index_page_iter(&self) -> impl Iterator<Item = u32> + '_ {
277 self.savepoints
278 .iter()
279 .map(|s| s.index.keys().copied())
280 .flatten()
281 }
282
283 pub fn not_empty(&self) -> bool {
284 self.savepoints.iter().any(|s| !s.index.is_empty())
285 }
286
287 #[tracing::instrument(skip(self))]
288 pub fn downgrade(self) -> ReadTransaction<F> {
289 tracing::trace!("downgrading write transaction");
290 let Self {
291 wal_lock, read_tx, ..
292 } = self;
293 let mut reserved = wal_lock.reserved.lock();
295 let mut lock = wal_lock.tx_id.lock_blocking();
296 match *lock {
297 Some(lock_id) if lock_id == read_tx.id => {
298 lock.take();
299 }
300 _ => (),
301 }
302
303 if let Some(id) = *reserved {
304 tracing::trace!("tx already reserved by {id}");
305 return read_tx;
306 }
307
308 loop {
309 match wal_lock.waiters.steal() {
310 crossbeam::deque::Steal::Empty => {
311 tracing::trace!("no connection waiting");
312 break;
313 }
314 crossbeam::deque::Steal::Success((unparker, id)) => {
315 tracing::trace!("waking up {id}");
316 reserved.replace(id);
317 unparker.unpark();
318 break;
319 }
320 crossbeam::deque::Steal::Retry => (),
321 }
322 }
323
324 tracing::trace!(id = read_tx.id, "lock released");
325
326 read_tx
327 }
328
329 pub fn is_commited(&self) -> bool {
330 self.is_commited
331 }
332
333 pub(crate) fn find_frame_offset(&self, page_no: u32) -> Option<u32> {
334 let iter = self.savepoints.iter().rev().map(|s| &s.index);
335 for index in iter {
336 if let Some(val) = index.get(&page_no) {
337 return Some(*val);
338 }
339 }
340
341 None
342 }
343
344 pub(crate) fn commit(&mut self) {
345 self.is_commited = true;
346 }
347
348 pub(crate) fn current_checksum(&self) -> u32 {
349 self.savepoints.last().unwrap().current_checksum
350 }
351}
352
353impl<F> Deref for WriteTransaction<F> {
354 type Target = ReadTransaction<F>;
355
356 fn deref(&self) -> &Self::Target {
357 &self.read_tx
358 }
359}
360
361impl<F> DerefMut for WriteTransaction<F> {
362 fn deref_mut(&mut self) -> &mut Self::Target {
363 &mut self.read_tx
364 }
365}