ipfs_sqlite_block_store/cache/
sqlite_tracker.rs

1use super::{BlockInfo, CacheTracker};
2use crate::error::Context;
3use fnv::{FnvHashMap, FnvHashSet};
4use parking_lot::Mutex;
5use rusqlite::{Connection, Transaction};
6use std::{
7    fmt::Debug,
8    ops::DerefMut,
9    path::Path,
10    sync::Arc,
11    time::{Instant, SystemTime},
12};
13use tracing::*;
14
15/// A cache tracker that uses a sqlite database as persistent storage
16pub struct SqliteCacheTracker<F> {
17    conn: Arc<Mutex<Connection>>,
18    mk_cache_entry: F,
19}
20
21impl<F> Debug for SqliteCacheTracker<F> {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        f.debug_struct("SqliteCacheTracker").finish()
24    }
25}
26
27const INIT: &str = r#"
28PRAGMA journal_mode = WAL;
29PRAGMA synchronous = OFF;
30CREATE TABLE IF NOT EXISTS accessed (
31    id INTEGER PRIMARY KEY,
32    time INTEGER
33);
34"#;
35
36fn init_db(conn: &mut Connection) -> crate::Result<()> {
37    conn.execute_batch(INIT).ctx("initialising CT DB")?;
38    Ok(())
39}
40
41/// execute a statement in a readonly transaction
42/// nested transactions are not allowed here.
43pub(crate) fn in_ro_txn<T>(
44    conn: &mut Connection,
45    f: impl FnOnce(&Transaction) -> crate::Result<T>,
46) -> crate::Result<T> {
47    let txn = conn.transaction().ctx("beginning CT ro transaction")?;
48    f(&txn)
49}
50
51fn attempt_txn<T>(
52    mut conn: impl DerefMut<Target = Connection>,
53    f: impl FnOnce(&Transaction) -> crate::Result<T>,
54) {
55    let result = conn
56        .transaction()
57        .ctx("beginning CT transaction")
58        .and_then(|txn| {
59            f(&txn)?;
60            Ok(txn)
61        })
62        .and_then(|txn| txn.commit().ctx("committing CT transaction"));
63    if let Err(cause) = result {
64        tracing::warn!("Unable to execute transaction: {}", cause);
65    }
66}
67
68fn attempt_ro_txn<T>(
69    mut conn: impl DerefMut<Target = Connection>,
70    f: impl FnOnce(&Transaction) -> crate::Result<T>,
71) {
72    let result = in_ro_txn(&mut conn, f);
73    if let Err(cause) = result {
74        tracing::warn!("Unable to execute readonly transaction {}", cause);
75    }
76}
77
78fn set_accessed(txn: &Transaction, id: i64, accessed: i64) -> crate::Result<()> {
79    txn.prepare_cached("REPLACE INTO accessed (id, time) VALUES (?, ?)")
80        .ctx("setting accessed (prep)")?
81        .execute([id, accessed])
82        .ctx("setting accessed")?;
83    Ok(())
84}
85
86fn get_accessed_bulk(
87    txn: &Transaction,
88    result: &mut FnvHashMap<i64, Option<i64>>,
89) -> crate::Result<()> {
90    let mut stmt = txn
91        .prepare_cached("SELECT id, time FROM accessed")
92        .ctx("getting accessed (prep)")?;
93    let accessed = stmt
94        .query_map([], |row| {
95            let id: i64 = row.get(0)?;
96            let time: i64 = row.get(1)?;
97            Ok((id, time))
98        })
99        .ctx("getting accessed")?;
100    // we have no choice but to run through all values in accessed.
101    for row in accessed.flatten() {
102        // only add if a row already exists
103        let (id, time) = row;
104        if let Some(value) = result.get_mut(&id) {
105            *value = Some(time);
106        }
107    }
108    Ok(())
109}
110
111fn delete_id(txn: &Transaction, id: i64) -> crate::Result<()> {
112    txn.prepare_cached("DELETE FROM accessed WHERE id = ?")
113        .ctx("deleting from CT (prep)")?
114        .execute([id])
115        .ctx("deleting from CT")?;
116    Ok(())
117}
118
119fn get_ids(txn: &Transaction) -> crate::Result<Vec<i64>> {
120    let ids = txn
121        .prepare_cached("SELECT id FROM accessed")
122        .ctx("getting IDs (prep)")?
123        .query_map([], |row| row.get(0))
124        .ctx("getting IDs")?
125        .collect::<rusqlite::Result<Vec<i64>>>()
126        .ctx("getting IDs (transform)")?;
127    Ok(ids)
128}
129
130impl<F> SqliteCacheTracker<F>
131where
132    F: Fn(i64, BlockInfo) -> Option<i64>,
133{
134    pub fn memory(mk_cache_entry: F) -> crate::Result<Self> {
135        let mut conn = Connection::open_in_memory().ctx("opening in-memory CT DB")?;
136        init_db(&mut conn)?;
137        Ok(Self {
138            conn: Arc::new(Mutex::new(conn)),
139            mk_cache_entry,
140        })
141    }
142
143    pub fn open(path: impl AsRef<Path>, mk_cache_entry: F) -> crate::Result<Self> {
144        let mut conn = Connection::open(path).ctx("opening CT DB")?;
145        init_db(&mut conn)?;
146        Ok(Self {
147            conn: Arc::new(Mutex::new(conn)),
148            mk_cache_entry,
149        })
150    }
151}
152
153#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct SortKey {
155    time: Option<i64>,
156    id: i64,
157}
158
159impl SortKey {
160    fn new(time: Option<i64>, id: i64) -> Self {
161        Self { time, id }
162    }
163}
164
165impl<F> CacheTracker for SqliteCacheTracker<F>
166where
167    F: Fn(i64, BlockInfo) -> Option<i64> + Send + Sync,
168{
169    #[allow(clippy::needless_collect)]
170    fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
171        let accessed = SystemTime::now()
172            .duration_since(SystemTime::UNIX_EPOCH)
173            .unwrap_or_default();
174        let nanos = accessed.as_nanos() as i64;
175        let items = blocks
176            .iter()
177            .filter_map(|block| (self.mk_cache_entry)(nanos, *block).map(|nanos| (block.id, nanos)))
178            .collect::<Vec<_>>();
179        if items.is_empty() {
180            return;
181        }
182        attempt_txn(self.conn.lock(), |txn| {
183            for (id, accessed) in items {
184                set_accessed(txn, id, accessed as i64)?;
185            }
186            Ok(())
187        });
188    }
189
190    fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
191        attempt_txn(self.conn.lock(), |txn| {
192            for block in blocks {
193                delete_id(txn, block.id)?;
194            }
195            Ok(())
196        });
197    }
198
199    fn retain_ids(&self, ids: &[i64]) {
200        let ids = ids.iter().cloned().collect::<FnvHashSet<i64>>();
201        attempt_txn(self.conn.lock(), move |txn| {
202            for id in get_ids(txn)? {
203                if !&ids.contains(&id) {
204                    delete_id(txn, id)?;
205                }
206            }
207            Ok(())
208        });
209    }
210
211    fn sort_ids(&self, ids: &mut [i64]) {
212        attempt_ro_txn(self.conn.lock(), |txn| {
213            let t0 = Instant::now();
214            let mut accessed = ids
215                .iter()
216                .map(|id| (*id, None))
217                .collect::<FnvHashMap<i64, Option<i64>>>();
218            get_accessed_bulk(txn, &mut accessed)?;
219            debug!("getting access times took {}", t0.elapsed().as_micros());
220            let t0 = Instant::now();
221            ids.sort_by_cached_key(|id| SortKey::new(accessed.get(id).cloned().flatten(), *id));
222            debug!("sorting ids took {}", t0.elapsed().as_micros());
223            Ok(())
224        });
225    }
226
227    fn has_persistent_state(&self) -> bool {
228        true
229    }
230}
231
232#[test]
233fn sort_key_sort_order() {
234    assert!(
235        SortKey::new(None, i64::max_value())
236            < SortKey::new(Some(i64::min_value()), i64::min_value())
237    );
238}