nut 0.1.4

Bolt DB Port in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
use parking_lot::{
    MappedRwLockReadGuard, MappedRwLockWriteGuard, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
};
use std::collections::HashMap;
use std::fmt;
use std::fs::OpenOptions;
use std::io::{copy, Cursor as IOCursor, Read, Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Weak};
use std::thread;
use std::time::{Duration, SystemTime};

use crate::bucket::{Bucket, Cursor};
use crate::consts::{Flags, IGNORE_NOSYNC, PGID, TXID};
use crate::db::{CheckMode, WeakDB, DB};
use crate::errors::Error;
use crate::meta::Meta;
use crate::page::{OwnedPage, Page, PageInfo};

use super::stats::TxStats;

pub(crate) struct TxInner {
    /// is transaction writable
    pub(crate) writable: bool,

    /// declares that transaction is in use
    pub(crate) managed: AtomicBool,

    /// defines whether transaction will be checked on close
    pub(crate) check: AtomicBool,

    /// ref to DB.
    /// if transaction closed then ref points to null
    pub(crate) db: RwLock<WeakDB>,

    /// transaction meta
    pub(crate) meta: RwLock<Meta>,

    /// root bucket.
    /// this bucket holds another buckets
    pub(crate) root: RwLock<Bucket>,

    /// pages cache
    pub(crate) pages: RwLock<HashMap<PGID, OwnedPage>>,

    /// transactions statistics
    pub(crate) stats: Mutex<TxStats>,

    /// list of callbacks that will be called after commit
    pub(crate) commit_handlers: Mutex<Vec<Box<dyn Fn()>>>,

    /// WriteFlag specifies the flag for write-related methods like WriteTo().
    /// Tx opens the database file with the specified flag to copy the data.
    ///
    /// By default, the flag is unset, which works well for mostly in-memory
    /// workloads. For databases that are much larger than available RAM,
    /// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
    pub(super) write_flag: usize,
}

impl fmt::Debug for TxInner {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        let db = self
            .db
            .try_read()
            .unwrap()
            .upgrade()
            .map(|db| &db as *const DB);

        f.debug_struct("TxInner")
            .field("writable", &self.writable)
            .field("managed", &self.managed)
            .field("db", &db)
            .field("meta", &*self.meta.try_read().unwrap())
            .field("root", &*self.root.try_read().unwrap())
            .field("pages", &*self.pages.try_read().unwrap())
            .field("stats", &*self.stats.try_lock().unwrap())
            .field(
                "commit handlers len",
                &self.commit_handlers.try_lock().unwrap().len(),
            )
            .field("write_flag", &self.write_flag)
            .finish()
    }
}

/// Transaction
#[derive(Debug)]
pub struct Tx(pub(crate) Arc<TxInner>);

unsafe impl Sync for Tx {}
unsafe impl Send for Tx {}

impl Tx {
    pub(crate) fn clone(&self) -> Self {
        Self(Arc::clone(&self.0))
    }

    /// Returns whether the transaction can perform write operations.
    pub fn writable(&self) -> bool {
        self.0.writable
    }

    /// Defines whether transaction is fresh and not been used before
    /// commiting transaction twice must resolve in error
    pub(crate) fn opened(&self) -> bool {
        match self.0.db.try_read().unwrap().upgrade() {
            None => false,
            Some(db) => db.opened(),
        }
    }

    /// Returns a reference to the database that created the transaction.
    pub(crate) fn db(&self) -> Result<DB, Error> {
        self.0
            .db
            .try_read()
            .unwrap()
            .upgrade()
            .ok_or(Error::DatabaseGone)
    }

    /// Returns the transaction id.
    pub(crate) fn id(&self) -> TXID {
        self.0.meta.try_read().unwrap().txid
    }

    /// Returns the transaction page id.
    pub(crate) fn pgid(&self) -> PGID {
        self.0.meta.try_read().unwrap().pgid
    }

    /// Returns the transaction page id.
    pub(crate) fn set_pgid(&mut self, id: PGID) -> Result<(), Error> {
        self.0.meta.try_write().ok_or("pgid locked")?.pgid = id;
        Ok(())
    }

    /// Adds a handler function to be executed after the transaction successfully commits.
    pub fn on_commit(&mut self, handler: Box<dyn Fn()>) {
        self.0.commit_handlers.lock().push(handler);
    }

    /// Returns current database size in bytes as seen by this transaction.
    pub(super) fn size(&self) -> i64 {
        self.pgid() as i64 * self.db().unwrap().page_size() as i64
    }

    /// Creates a cursor associated with the root bucket.
    /// All items in the cursor will return a nil value because all items in root bucket are also buckets.
    /// The cursor is only valid as long as the transaction is open.
    /// Do not use a cursor after the transaction is closed.
    pub fn cursor(&self) -> Cursor<RwLockWriteGuard<Bucket>> {
        self.0.stats.lock().cursor_count += 1;

        Cursor::new(self.0.root.write())
    }

    // Returns a copy of the current transaction statistics.
    pub fn stats(&self) -> TxStats {
        self.0.stats.lock().clone()
    }

    /// Bucket retrieves a bucket by name.
    /// Returns None if the bucket does not exist.
    /// The bucket instance is only valid for the lifetime of the transaction.
    pub fn bucket(&self, key: &[u8]) -> Result<MappedRwLockReadGuard<Bucket>, Error> {
        let bucket = self.0.root.try_read().ok_or("Can't acquire bucket")?;

        RwLockReadGuard::try_map(bucket, |b| b.bucket(key)).map_err(|_| "Can't get bucket".into())
    }

    /// Bucket retrieves a mutable bucket by name.
    /// Returns None if the bucket does not exist.
    /// The bucket instance is only valid for the lifetime of the transaction.
    pub fn bucket_mut(&mut self, key: &[u8]) -> Result<MappedRwLockWriteGuard<Bucket>, Error> {
        if !self.0.writable {
            return Err(Error::TxReadonly);
        };

        let bucket = self.0.root.try_write().ok_or("Can't acquire bucket")?;

        RwLockWriteGuard::try_map(bucket, |b| b.bucket_mut(key))
            .map_err(|_| "Can't get bucket".into())
    }

    /// returns bucket keys for db
    pub fn buckets(&self) -> Vec<Vec<u8>> {
        self.0.root.read().buckets()
    }

    /// Creates a new bucket.
    /// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
    /// The bucket instance is only valid for the lifetime of the transaction.
    pub fn create_bucket(&mut self, key: &[u8]) -> Result<MappedRwLockWriteGuard<Bucket>, Error> {
        if !self.0.writable {
            return Err(Error::TxReadonly);
        };

        let bucket = self.0.root.try_write().ok_or("Can't acquire bucket")?;

        RwLockWriteGuard::try_map(bucket, |b| b.create_bucket(key).ok())
            .map_err(|_| "Can't get bucket".into())
    }

    /// Creates a new bucket if it doesn't already exist.
    /// Returns an error if the bucket name is blank, or if the bucket name is too long.
    /// The bucket instance is only valid for the lifetime of the transaction.
    pub fn create_bucket_if_not_exists(
        &mut self,
        key: &[u8],
    ) -> Result<MappedRwLockWriteGuard<Bucket>, Error> {
        if !self.0.writable {
            return Err(Error::TxReadonly);
        };

        let bucket = self.0.root.try_write().ok_or("Can't acquire bucket")?;

        RwLockWriteGuard::try_map(bucket, |b| b.create_bucket_if_not_exists(key).ok())
            .map_err(|_| "Can't get bucket".into())
    }

    /// Deletes a bucket.
    /// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
    pub fn delete_bucket(&mut self, key: &[u8]) -> Result<(), Error> {
        if !self.0.writable {
            return Err(Error::TxReadonly);
        };

        self.0.root.try_write().unwrap().delete_bucket(key)
    }

    #[allow(clippy::type_complexity)]
    /// Executes a function for each bucket in the root.
    /// If the provided function returns an error then the iteration is stopped and
    /// the error is returned to the caller.
    ///
    /// first argument of function is bucket's key, second is bucket itself
    pub fn for_each<'a, E: Into<Error>>(
        &self,
        mut handler: Box<dyn FnMut(&[u8], Option<&Bucket>) -> Result<(), E> + 'a>,
    ) -> Result<(), Error> {
        let root = self.0.root.try_write().unwrap();
        root.for_each(Box::new(|k: &[u8], _v: Option<&[u8]>| -> Result<(), E> {
            handler(k, root.bucket(k))
        }))
    }

    /// Writes the entire database to a writer.
    /// If err == nil then exactly tx.Size() bytes will be written into the writer.
    pub fn write_to<W: Write>(&self, mut w: W) -> Result<i64, Error> {
        let db = self.db()?;
        let page_size = db.page_size();
        let mut file =
            db.0.file
                .try_write()
                .ok_or("can't obtain file write access")?;

        let mut written = 0;

        let mut page = OwnedPage::new(page_size);
        page.flags = Flags::META;

        // first page
        {
            *page.meta_mut() = self.0.meta.try_read().unwrap().clone();
            page.meta_mut().checksum = page.meta().sum64();
            w.write_all(page.buf()).map_err(|e| format!("{e}"))?;
            written += page.size();
        }

        // second page
        {
            page.id = 1;
            page.meta_mut().txid -= 1;
            page.meta_mut().checksum = page.meta().sum64();
            w.write_all(page.buf()).map_err(|e| format!("{e}"))?;
            written += page.size();
        }

        file.seek(SeekFrom::Start(page_size as u64 * 2))
            .map_err(|e| format!("{e}"))?;

        let size = self.size() as u64 - (page_size as u64 * 2);
        written += copy(&mut Read::by_ref(&mut file.get_mut()).take(size), &mut w)
            .map_err(|e| format!("{e}"))? as usize;

        Ok(written as i64)
    }

    /// Copies the entire database to file at the given path.
    /// A reader transaction is maintained during the copy so it is safe to continue
    /// using the database while a copy is in progress.
    pub fn copy_to(&self, path: &str, mode: OpenOptions) -> Result<(), Error> {
        let file = mode.open(path).map_err(|e| e.to_string())?;
        self.write_to(file)?;
        Ok(())
    }

    /// Closes transaction (so subsequent use of it will resolve in error)
    pub(crate) fn close(&self) -> Result<(), Error> {
        let mut db = self.db()?;
        let tx = db.remove_tx(self)?;

        *tx.0.db.write() = WeakDB::new();
        tx.0.root.try_write().unwrap().clear();
        tx.0.pages.try_write().unwrap().clear();
        Ok(())
    }

    /// Writes all changes to disk and updates the meta page.
    /// Returns an error if a disk write error occurs, or if Commit is
    /// called on a read-only transaction.
    pub fn commit(&mut self) -> Result<(), Error> {
        if self.0.managed.load(Ordering::Acquire) {
            return Err(Error::TxManaged);
        } else if !self.writable() {
            return Err(Error::TxReadonly);
        };

        let mut db = self.db()?;

        {
            let start_time = SystemTime::now();
            self.0.root.try_write().unwrap().rebalance();
            let mut stats = self.0.stats.lock();
            if stats.rebalance > 0 {
                stats.rebalance_time += SystemTime::now().duration_since(start_time)?;
            };
        }

        {
            // spill
            let start_time = SystemTime::now();
            {
                let mut root = self.0.root.try_write().unwrap();
                root.spill()?;
            }
            self.0.stats.try_lock().unwrap().spill_time =
                SystemTime::now().duration_since(start_time)?;
        }

        // Free the old root bucket.
        self.0.meta.try_write().unwrap().root.root = self.0.root.try_read().unwrap().bucket.root;

        let (txid, tx_pgid, freelist_pgid) = {
            let meta = self.0.meta.try_read().unwrap();
            (meta.txid as usize, meta.pgid, meta.freelist)
        };

        let (freelist_size, page_size) = {
            // Free the freelist and allocate new pages for it. This will overestimate
            // the size of the freelist but not underestimate the size (which would be bad).
            let page = db.page(freelist_pgid);
            let mut freelist = db.0.freelist.try_write().unwrap();
            freelist.free(txid as u64, &page)?;

            let freelist_size = freelist.size();
            let page_size = db.page_size();

            (freelist_size, page_size)
        };

        {
            let page = self.allocate((freelist_size / page_size) as u64 + 1);
            if let Err(e) = page {
                self.rollback()?;
                return Err(e);
            }
            let page = page?;
            let page = unsafe { &mut *page };

            db.0.freelist.try_write().unwrap().write(page);
            self.0.meta.try_write().unwrap().freelist = page.id;

            // If the high water mark has moved up then attempt to grow the database.
            if self.pgid() > tx_pgid {
                if let Err(e) = db.grow((tx_pgid + 1) * page_size as u64) {
                    self.rollback()?;
                    return Err(e);
                }
            }

            // Write dirty pages to disk.
            let write_start_time = SystemTime::now();
            if let Err(e) = self.write() {
                self.rollback()?;
                return Err(e);
            }

            // If strict mode is enabled then perform a consistency check.
            // Only the first consistency error is reported in the panic.
            if self.0.check.swap(false, Ordering::AcqRel) {
                let strict = db.0.check_mode.contains(CheckMode::STRICT);
                if let Err(e) = self.check_sync() {
                    if strict {
                        self.rollback()?;
                        return Err(e);
                    } else {
                        println!("{e}");
                    }
                }
            };

            // Write meta to disk.
            if let Err(e) = self.write_meta() {
                self.0.check.store(false, Ordering::Release);
                self.rollback()?;
                return Err(e);
            };

            self.0.stats.try_lock().unwrap().write_time +=
                SystemTime::now().duration_since(write_start_time)?;
        };

        // Finalize the transaction.
        self.close()?;

        {
            for h in &*self.0.commit_handlers.try_lock().unwrap() {
                h();
            }
        }

        Ok(())
    }

    /// Closes the transaction and ignores all previous updates. Read-only
    /// transactions must be rolled back and not committed.
    pub fn rollback(&self) -> Result<(), Error> {
        if self.0.managed.load(Ordering::Acquire) {
            return Err(Error::TxManaged);
        };
        self.__rollback()?;
        Ok(())
    }

    pub(crate) fn __rollback(&self) -> Result<(), Error> {
        let db = self.db()?;
        if self.0.check.swap(false, Ordering::AcqRel) {
            let strict = db.0.check_mode.contains(CheckMode::STRICT);
            if let Err(e) = self.check_sync() {
                if strict {
                    return Err(e);
                } else {
                    println!("{e}");
                }
            }
        };
        if self.writable() {
            let txid = self.id();
            let mut freelist = db.0.freelist.write();
            freelist.rollback(txid);
            let freelist_id = db.meta()?.freelist;
            let freelist_page = db.page(freelist_id);
            freelist.reload(&freelist_page);
        };
        self.close()?;
        Ok(())
    }

    /// Sync version of check()
    ///
    /// In case of checking thread panic will also return Error
    pub fn check_sync(&self) -> Result<(), Error> {
        let (sender, ch) = mpsc::channel::<String>();
        let tx = self.clone();
        let handle = thread::spawn(move || tx.__check(sender));

        let mut errs = vec![];
        for err in ch {
            errs.push(err);
        }

        if let Err(e) = handle.join() {
            let estr = e.downcast_ref::<String>();
            if let Some(estr) = estr {
                errs.push(estr.clone());
            } else {
                errs.push(format!("check thread panicked: {e:?}"));
            }
        }

        if !errs.is_empty() {
            return Err(Error::CheckFail(errs));
        };

        Ok(())
    }

    // Performs several consistency checks on the database for this transaction.
    // An error is returned if any inconsistency is found.
    //
    // It can be safely run concurrently on a writable transaction. However, this
    // incurs a high cost for large databases and databases with a lot of subbuckets
    // because of caching. This overhead can be removed if running on a read-only
    // transaction, however, it is not safe to execute other writer transactions at
    // the same time.
    pub fn check(&self) -> mpsc::Receiver<String> {
        let (sender, receiver) = mpsc::channel::<String>();
        let tx = self.clone();
        thread::spawn(move || tx.__check(sender));
        receiver
    }

    pub fn freed(&self) -> Result<HashMap<PGID, bool>, Error> {
        let mut freed = HashMap::<PGID, bool>::new();
        let all_pgids = self
            .db()
            .unwrap()
            .0
            .freelist
            .try_read()
            .unwrap()
            .get_pgids();

        for id in &all_pgids {
            if freed.contains_key(id) {
                return Err(format!("page {id}: already freed").into());
            }
            freed.insert(*id, true);
        }

        Ok(freed)
    }

    pub(super) fn __check(&self, ch: mpsc::Sender<String>) {
        let freed = self.freed();
        if let Err(e) = freed {
            ch.send(e.to_string()).unwrap();
            return;
        }

        let freed = freed.unwrap();

        let mut reachable = HashMap::new();
        reachable.insert(0, true);
        reachable.insert(1, true);
        let freelist_pgid = self
            .0
            .meta
            .try_read_for(Duration::from_secs(10))
            .unwrap()
            .freelist;
        let freelist_overflow = unsafe { &*self.page(freelist_pgid).unwrap() }.overflow;
        for i in 0..=freelist_overflow {
            reachable.insert(freelist_pgid + u64::from(i), true);
        }

        self.check_bucket(
            &self.0.root.try_read().unwrap(),
            &mut reachable,
            &freed,
            &ch,
        );

        for i in 0..self.0.meta.try_read().unwrap().pgid {
            let is_reachable = reachable.contains_key(&i);
            let is_freed = freed.contains_key(&i);
            if !is_reachable && !is_freed {
                ch.send(format!("page {i}: unreachable unfreed")).unwrap();
            };
        }
    }

    fn check_bucket(
        &self,
        b: &Bucket,
        reachable: &mut HashMap<PGID, bool>,
        freed: &HashMap<PGID, bool>,
        ch: &mpsc::Sender<String>,
    ) {
        if b.bucket.root == 0 {
            return;
        }

        let meta_pgid = self.pgid();

        let handler = Box::new(|p: &Page, _pgid: usize| {
            if p.id > meta_pgid {
                ch.send(format!("page {}: out of bounds: {}", p.id, meta_pgid))
                    .unwrap();
            }

            for i in 0..=p.overflow {
                let id = p.id + u64::from(i);
                if reachable.contains_key(&id) {
                    ch.send(format!("page {id}: multiple references"))
                        .unwrap();
                }
                reachable.insert(id, true);
            }

            let page_type_is_valid = matches!(p.flags, Flags::BRANCHES | Flags::LEAVES);

            if freed.contains_key(&p.id) {
                ch.send(format!("page {}: reachable freed", p.id)).unwrap();
            } else if !page_type_is_valid {
                ch.send(format!("page {}: invalid type: {}", p.id, p.flags))
                    .unwrap();
            }
        });

        b.tx().unwrap().for_each_page(b.bucket.root, 0, handler);

        b.for_each(Box::new(|k, _v| -> Result<(), String> {
            let child = b.bucket(k);
            if let Some(child) = child {
                self.check_bucket(child, reachable, freed, ch);
            };
            Ok(())
        }))
        .unwrap();
    }

    /// Returns a contiguous block of memory starting at a given page.
    pub(crate) fn allocate(&mut self, count: u64) -> Result<*mut Page, Error> {
        let mut db = match self.db() {
            Err(_) => return Err(Error::TxClosed),
            Ok(db) => db,
        };

        let mut page = db.allocate(count, self)?;
        let page_id = page.id;
        let page_ptr = &mut *page as *mut Page;

        self.0.pages.try_write().unwrap().insert(page_id, page);

        // Update statistics.
        {
            let mut stats = self.0.stats.lock();
            stats.page_count += 1;
            stats.page_alloc += count as usize * self.db()?.page_size();
        }

        Ok(page_ptr)
    }

    /// Writes any dirty pages to disk.
    pub(crate) fn write(&mut self) -> Result<(), Error> {
        let mut pages: Vec<_> = self
            .0
            .pages
            .write()
            .drain()
            .map(|x| {
                let pgid = x.1.id;
                (pgid, x.1)
            })
            .collect();
        pages.sort_by(|a, b| a.0.cmp(&b.0));
        let mut db = self.db()?;

        let page_size = db.page_size();
        for (id, p) in &pages {
            let size = (p.overflow + 1) as usize * page_size;
            let offset = *id * page_size as u64;

            let buf = unsafe { std::slice::from_raw_parts(p.as_ptr(), size) };
            let cursor = IOCursor::new(buf);

            db.write_at(offset, cursor)?;
        }

        if !db.0.no_sync || IGNORE_NOSYNC {
            db.sync()?;
        }

        {
            let mut page_pool = db.0.page_pool.lock();
            let mut i = 0;
            while i != pages.len() {
                if pages[i].1.size() == page_size {
                    let mut page = pages.remove(i).1;
                    for i in page.buf_mut() {
                        *i = 0;
                    }
                    page_pool.push(page);
                } else {
                    i += 1;
                }
            }
        }

        Ok(())
    }

    /// Writes the meta to the disk.
    pub(crate) fn write_meta(&mut self) -> Result<(), Error> {
        let mut db = self.db()?;

        let mut buf = vec![0u8; db.page_size()];
        let page = Page::from_buf_mut(&mut buf);
        self.0.meta.try_write().unwrap().write(page)?;

        db.write_at(0, IOCursor::new(buf))?;

        if !db.0.no_sync || IGNORE_NOSYNC {
            db.sync()?;
        }

        self.0.stats.lock().write += 1;
        Ok(())
    }

    /// Returns a reference to the page with a given id.
    /// If page has been written to then a temporary buffered page is returned.
    pub(crate) fn page(&self, id: PGID) -> Result<*const Page, Error> {
        // Check the dirty pages first.
        {
            let pages = self.0.pages.try_read().unwrap();
            if let Some(p) = pages.get(&id) {
                return Ok(&**p);
            }
        }

        // Otherwise return directly from the mmap.
        Ok(&*self.db()?.page(id))
    }

    /// Iterates over every page within a given page and executes a function.
    pub(crate) fn for_each_page<'a>(
        &self,
        pgid: PGID,
        depth: usize,
        mut func: Box<dyn FnMut(&Page, usize) + 'a>,
    ) {
        let p = unsafe { &*self.page(pgid).unwrap() };

        // Execute function.
        func(p, depth);

        // Recursively loop over children.
        let flags = p.flags;
        if flags != Flags::BRANCHES {
            return;
        }
        let count = p.count as usize;
        for i in 0..count {
            let el = p.branch_page_element(i);
            self.for_each_page(el.pgid, depth + 1, Box::new(|p, d| func(p, d)));
        }
    }

    /// Returns page information for a given page number.
    /// This is only safe for concurrent use when used by a writable transaction.
    pub fn page_info(&self, id: usize) -> Result<Option<PageInfo>, Error> {
        if !self.opened() {
            return Err(Error::TxClosed);
        };
        if id >= self.pgid() as usize {
            return Ok(None);
        };

        let db = self.db()?;

        // Build the page info.
        let p = db.page(id as u64);
        let mut info = PageInfo {
            id: id as isize,
            ptype: Flags::FREELIST,
            count: p.count as usize,
            overflow_count: p.overflow as usize,
        };

        // Determine the type (or if it's free).
        if !db.0.freelist.try_read().unwrap().freed(id as u64) {
            info.ptype = p.flags
        }

        Ok(Some(info))
    }
}

impl Drop for Tx {
    fn drop(&mut self) {
        let count = Arc::strong_count(&self.0);
        // one for the reference, and one that is owned by db
        if count > 2 {
            return;
        };
        if let Ok(_db) = self.db() {
            if self.0.writable {
                self.commit().unwrap();
            } else {
                self.rollback().unwrap();
            }
        }
    }
}

#[derive(Clone)]
pub(crate) struct WeakTx(Weak<TxInner>);

impl WeakTx {
    pub(crate) fn new() -> Self {
        Self(Weak::new())
    }

    pub(crate) fn upgrade(&self) -> Option<Tx> {
        self.0.upgrade().map(Tx)
    }

    pub(crate) fn from(tx: &Tx) -> Self {
        Self(Arc::downgrade(&tx.0))
    }
}