ipfs_sqlite_block_store/cache/
sqlite_tracker.rs1use super::{BlockInfo, CacheTracker};
2use crate::error::Context;
3use fnv::{FnvHashMap, FnvHashSet};
4use parking_lot::Mutex;
5use rusqlite::{Connection, Transaction};
6use std::{
7 fmt::Debug,
8 ops::DerefMut,
9 path::Path,
10 sync::Arc,
11 time::{Instant, SystemTime},
12};
13use tracing::*;
14
15pub struct SqliteCacheTracker<F> {
17 conn: Arc<Mutex<Connection>>,
18 mk_cache_entry: F,
19}
20
21impl<F> Debug for SqliteCacheTracker<F> {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 f.debug_struct("SqliteCacheTracker").finish()
24 }
25}
26
27const INIT: &str = r#"
28PRAGMA journal_mode = WAL;
29PRAGMA synchronous = OFF;
30CREATE TABLE IF NOT EXISTS accessed (
31 id INTEGER PRIMARY KEY,
32 time INTEGER
33);
34"#;
35
36fn init_db(conn: &mut Connection) -> crate::Result<()> {
37 conn.execute_batch(INIT).ctx("initialising CT DB")?;
38 Ok(())
39}
40
41pub(crate) fn in_ro_txn<T>(
44 conn: &mut Connection,
45 f: impl FnOnce(&Transaction) -> crate::Result<T>,
46) -> crate::Result<T> {
47 let txn = conn.transaction().ctx("beginning CT ro transaction")?;
48 f(&txn)
49}
50
51fn attempt_txn<T>(
52 mut conn: impl DerefMut<Target = Connection>,
53 f: impl FnOnce(&Transaction) -> crate::Result<T>,
54) {
55 let result = conn
56 .transaction()
57 .ctx("beginning CT transaction")
58 .and_then(|txn| {
59 f(&txn)?;
60 Ok(txn)
61 })
62 .and_then(|txn| txn.commit().ctx("committing CT transaction"));
63 if let Err(cause) = result {
64 tracing::warn!("Unable to execute transaction: {}", cause);
65 }
66}
67
68fn attempt_ro_txn<T>(
69 mut conn: impl DerefMut<Target = Connection>,
70 f: impl FnOnce(&Transaction) -> crate::Result<T>,
71) {
72 let result = in_ro_txn(&mut conn, f);
73 if let Err(cause) = result {
74 tracing::warn!("Unable to execute readonly transaction {}", cause);
75 }
76}
77
78fn set_accessed(txn: &Transaction, id: i64, accessed: i64) -> crate::Result<()> {
79 txn.prepare_cached("REPLACE INTO accessed (id, time) VALUES (?, ?)")
80 .ctx("setting accessed (prep)")?
81 .execute([id, accessed])
82 .ctx("setting accessed")?;
83 Ok(())
84}
85
86fn get_accessed_bulk(
87 txn: &Transaction,
88 result: &mut FnvHashMap<i64, Option<i64>>,
89) -> crate::Result<()> {
90 let mut stmt = txn
91 .prepare_cached("SELECT id, time FROM accessed")
92 .ctx("getting accessed (prep)")?;
93 let accessed = stmt
94 .query_map([], |row| {
95 let id: i64 = row.get(0)?;
96 let time: i64 = row.get(1)?;
97 Ok((id, time))
98 })
99 .ctx("getting accessed")?;
100 for row in accessed.flatten() {
102 let (id, time) = row;
104 if let Some(value) = result.get_mut(&id) {
105 *value = Some(time);
106 }
107 }
108 Ok(())
109}
110
111fn delete_id(txn: &Transaction, id: i64) -> crate::Result<()> {
112 txn.prepare_cached("DELETE FROM accessed WHERE id = ?")
113 .ctx("deleting from CT (prep)")?
114 .execute([id])
115 .ctx("deleting from CT")?;
116 Ok(())
117}
118
119fn get_ids(txn: &Transaction) -> crate::Result<Vec<i64>> {
120 let ids = txn
121 .prepare_cached("SELECT id FROM accessed")
122 .ctx("getting IDs (prep)")?
123 .query_map([], |row| row.get(0))
124 .ctx("getting IDs")?
125 .collect::<rusqlite::Result<Vec<i64>>>()
126 .ctx("getting IDs (transform)")?;
127 Ok(ids)
128}
129
130impl<F> SqliteCacheTracker<F>
131where
132 F: Fn(i64, BlockInfo) -> Option<i64>,
133{
134 pub fn memory(mk_cache_entry: F) -> crate::Result<Self> {
135 let mut conn = Connection::open_in_memory().ctx("opening in-memory CT DB")?;
136 init_db(&mut conn)?;
137 Ok(Self {
138 conn: Arc::new(Mutex::new(conn)),
139 mk_cache_entry,
140 })
141 }
142
143 pub fn open(path: impl AsRef<Path>, mk_cache_entry: F) -> crate::Result<Self> {
144 let mut conn = Connection::open(path).ctx("opening CT DB")?;
145 init_db(&mut conn)?;
146 Ok(Self {
147 conn: Arc::new(Mutex::new(conn)),
148 mk_cache_entry,
149 })
150 }
151}
152
153#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
154struct SortKey {
155 time: Option<i64>,
156 id: i64,
157}
158
159impl SortKey {
160 fn new(time: Option<i64>, id: i64) -> Self {
161 Self { time, id }
162 }
163}
164
165impl<F> CacheTracker for SqliteCacheTracker<F>
166where
167 F: Fn(i64, BlockInfo) -> Option<i64> + Send + Sync,
168{
169 #[allow(clippy::needless_collect)]
170 fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
171 let accessed = SystemTime::now()
172 .duration_since(SystemTime::UNIX_EPOCH)
173 .unwrap_or_default();
174 let nanos = accessed.as_nanos() as i64;
175 let items = blocks
176 .iter()
177 .filter_map(|block| (self.mk_cache_entry)(nanos, *block).map(|nanos| (block.id, nanos)))
178 .collect::<Vec<_>>();
179 if items.is_empty() {
180 return;
181 }
182 attempt_txn(self.conn.lock(), |txn| {
183 for (id, accessed) in items {
184 set_accessed(txn, id, accessed as i64)?;
185 }
186 Ok(())
187 });
188 }
189
190 fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
191 attempt_txn(self.conn.lock(), |txn| {
192 for block in blocks {
193 delete_id(txn, block.id)?;
194 }
195 Ok(())
196 });
197 }
198
199 fn retain_ids(&self, ids: &[i64]) {
200 let ids = ids.iter().cloned().collect::<FnvHashSet<i64>>();
201 attempt_txn(self.conn.lock(), move |txn| {
202 for id in get_ids(txn)? {
203 if !&ids.contains(&id) {
204 delete_id(txn, id)?;
205 }
206 }
207 Ok(())
208 });
209 }
210
211 fn sort_ids(&self, ids: &mut [i64]) {
212 attempt_ro_txn(self.conn.lock(), |txn| {
213 let t0 = Instant::now();
214 let mut accessed = ids
215 .iter()
216 .map(|id| (*id, None))
217 .collect::<FnvHashMap<i64, Option<i64>>>();
218 get_accessed_bulk(txn, &mut accessed)?;
219 debug!("getting access times took {}", t0.elapsed().as_micros());
220 let t0 = Instant::now();
221 ids.sort_by_cached_key(|id| SortKey::new(accessed.get(id).cloned().flatten(), *id));
222 debug!("sorting ids took {}", t0.elapsed().as_micros());
223 Ok(())
224 });
225 }
226
227 fn has_persistent_state(&self) -> bool {
228 true
229 }
230}
231
232#[test]
233fn sort_key_sort_order() {
234 assert!(
235 SortKey::new(None, i64::max_value())
236 < SortKey::new(Some(i64::min_value()), i64::min_value())
237 );
238}