1use futures::channel::mpsc;
2pub use ipfs_sqlite_block_store::TempPin;
3use ipfs_sqlite_block_store::{
4 cache::{BlockInfo, CacheTracker, SqliteCacheTracker},
5 BlockStore, Config, SizeTargets, Synchronous,
6};
7use lazy_static::lazy_static;
8use libipld::codec::References;
9use libipld::store::StoreParams;
10use libipld::{Block, Cid, Ipld, Result};
11use parking_lot::Mutex;
12use prometheus::core::{Collector, Desc};
13use prometheus::proto::MetricFamily;
14use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry};
15use std::future::Future;
16use std::marker::PhantomData;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Duration;
20
21#[derive(Clone, Debug, Eq, PartialEq)]
23pub struct StorageConfig {
24 pub path: Option<PathBuf>,
27 pub cache_size_blocks: u64,
36 pub cache_size_bytes: u64,
44 pub gc_interval: Duration,
49 pub gc_min_blocks: usize,
54 pub gc_target_duration: Duration,
59}
60
61impl StorageConfig {
62 pub fn new(path: Option<PathBuf>, cache_size: u64, gc_interval: Duration) -> Self {
64 Self {
65 path,
66 cache_size_blocks: cache_size,
67 cache_size_bytes: u64::MAX,
68 gc_interval,
69 gc_min_blocks: usize::MAX,
70 gc_target_duration: Duration::new(u64::MAX, 1_000_000_000 - 1),
71 }
72 }
73}
74
75#[derive(Clone, Debug, Eq, PartialEq)]
76pub enum StorageEvent {
77 Remove(Cid),
78}
79
80#[derive(Clone)]
81pub struct StorageService<S: StoreParams> {
82 _marker: PhantomData<S>,
83 store: Arc<Mutex<BlockStore>>,
84 gc_target_duration: Duration,
85 gc_min_blocks: usize,
86}
87
88impl<S: StoreParams> StorageService<S>
89where
90 Ipld: References<S::Codecs>,
91{
92 pub fn open(config: StorageConfig, tx: mpsc::UnboundedSender<StorageEvent>) -> Result<Self> {
93 let size = SizeTargets::new(config.cache_size_blocks, config.cache_size_bytes);
94 let store_config = Config::default()
95 .with_size_targets(size)
96 .with_pragma_synchronous(Synchronous::Normal);
97 let store = if let Some(path) = config.path {
98 let tracker = SqliteCacheTracker::open(&path, |access, _| Some(access))?;
99 let tracker = IpfsCacheTracker { tracker, tx };
100 BlockStore::open(path, store_config.with_cache_tracker(tracker))?
101 } else {
102 let tracker = SqliteCacheTracker::memory(|access, _| Some(access))?;
103 let tracker = IpfsCacheTracker { tracker, tx };
104 BlockStore::memory(store_config.with_cache_tracker(tracker))?
105 };
106 let store = Arc::new(Mutex::new(store));
107 let gc = store.clone();
108 let gc_interval = config.gc_interval;
109 let gc_min_blocks = config.gc_min_blocks;
110 let gc_target_duration = config.gc_target_duration;
111 async_global_executor::spawn(async_global_executor::spawn_blocking(move || {
112 std::thread::sleep(gc_interval / 2);
113 loop {
114 tracing::debug!("gc_loop running incremental gc");
115 gc.lock()
116 .incremental_gc(gc_min_blocks, gc_target_duration)
117 .ok();
118 std::thread::sleep(gc_interval / 2);
119 tracing::debug!("gc_loop running incremental delete orphaned");
120 gc.lock()
121 .incremental_delete_orphaned(gc_min_blocks, gc_target_duration)
122 .ok();
123 std::thread::sleep(gc_interval / 2);
124 }
125 }))
126 .detach();
127 Ok(Self {
128 _marker: PhantomData,
129 gc_target_duration: config.gc_target_duration,
130 gc_min_blocks: config.gc_min_blocks,
131 store,
132 })
133 }
134
135 pub fn create_temp_pin(&self) -> Result<TempPin> {
136 observe_query::<_, std::io::Error, _>("create_temp_pin", || {
137 Ok(self.store.lock().temp_pin())
138 })
139 }
140
141 pub fn temp_pin(
142 &self,
143 temp: &TempPin,
144 iter: impl IntoIterator<Item = Cid> + Send + 'static,
145 ) -> Result<()> {
146 observe_query("temp_pin", || {
147 self.store.lock().assign_temp_pin(&temp, iter)
148 })
149 }
150
151 pub fn iter(&self) -> Result<impl Iterator<Item = Cid>> {
152 let cids = observe_query("iter", || self.store.lock().get_block_cids::<Vec<Cid>>())?;
153 Ok(cids.into_iter())
154 }
155
156 pub fn contains(&self, cid: &Cid) -> Result<bool> {
157 observe_query("contains", || self.store.lock().has_block(cid))
158 }
159
160 pub fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
161 observe_query("get", || self.store.lock().get_block(cid))
162 }
163
164 pub fn insert(&self, block: &Block<S>) -> Result<()> {
165 observe_query("insert", || self.store.lock().put_block(block, None))
166 }
167
168 pub async fn evict(&self) -> Result<()> {
169 let store = self.store.clone();
170 let gc_min_blocks = self.gc_min_blocks;
171 let gc_target_duration = self.gc_target_duration;
172 async_global_executor::spawn_blocking(move || {
173 while !store
174 .lock()
175 .incremental_gc(gc_min_blocks, gc_target_duration)?
176 {}
177 while !store
178 .lock()
179 .incremental_delete_orphaned(gc_min_blocks, gc_target_duration)?
180 {}
181 Ok(())
182 })
183 .await
184 }
185
186 pub fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
187 observe_query("alias", || self.store.lock().alias(alias, cid))
188 }
189
190 pub fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
191 observe_query("resolve", || self.store.lock().resolve(alias))
192 }
193
194 pub fn reverse_alias(&self, cid: &Cid) -> Result<Option<Vec<Vec<u8>>>> {
195 observe_query("reverse_alias", || self.store.lock().reverse_alias(cid))
196 }
197
198 pub fn missing_blocks(&self, cid: &Cid) -> Result<Vec<Cid>> {
199 observe_query("missing_blocks", || {
200 self.store.lock().get_missing_blocks(cid)
201 })
202 }
203
204 pub async fn flush(&self) -> Result<()> {
205 let store = self.store.clone();
206 let flush = async_global_executor::spawn_blocking(move || store.lock().flush());
207 observe_future("flush", flush).await
208 }
209
210 pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
211 registry.register(Box::new(QUERIES_TOTAL.clone()))?;
212 registry.register(Box::new(QUERY_DURATION.clone()))?;
213 registry.register(Box::new(SqliteStoreCollector::new(self.store.clone())))?;
214 Ok(())
215 }
216}
217
218#[derive(Debug)]
219struct IpfsCacheTracker<T> {
220 tracker: T,
221 tx: mpsc::UnboundedSender<StorageEvent>,
222}
223
224impl<T: CacheTracker> CacheTracker for IpfsCacheTracker<T> {
225 fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
226 self.tracker.blocks_accessed(blocks)
227 }
228
229 fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
230 for block in &blocks {
231 self.tx
232 .unbounded_send(StorageEvent::Remove(*block.cid()))
233 .ok();
234 }
235 self.tracker.blocks_deleted(blocks)
236 }
237
238 fn retain_ids(&self, ids: &[i64]) {
239 self.tracker.retain_ids(ids)
240 }
241
242 fn sort_ids(&self, ids: &mut [i64]) {
243 self.tracker.sort_ids(ids)
244 }
245}
246
247lazy_static! {
248 pub static ref QUERIES_TOTAL: IntCounterVec = IntCounterVec::new(
249 Opts::new(
250 "block_store_queries_total",
251 "Number of block store requests labelled by type."
252 ),
253 &["type"],
254 )
255 .unwrap();
256 pub static ref QUERY_DURATION: HistogramVec = HistogramVec::new(
257 HistogramOpts::new(
258 "block_store_query_duration",
259 "Duration of store queries labelled by type.",
260 ),
261 &["type"],
262 )
263 .unwrap();
264}
265
266fn observe_query<T, E, F>(name: &'static str, query: F) -> Result<T>
267where
268 E: std::error::Error + Send + Sync + 'static,
269 F: FnOnce() -> Result<T, E>,
270{
271 QUERIES_TOTAL.with_label_values(&[name]).inc();
272 let timer = QUERY_DURATION.with_label_values(&[name]).start_timer();
273 let res = query();
274 if res.is_ok() {
275 timer.observe_duration();
276 } else {
277 timer.stop_and_discard();
278 }
279 Ok(res?)
280}
281
282async fn observe_future<T, E, F>(name: &'static str, query: F) -> Result<T>
283where
284 E: std::error::Error + Send + Sync + 'static,
285 F: Future<Output = Result<T, E>>,
286{
287 QUERIES_TOTAL.with_label_values(&[name]).inc();
288 let timer = QUERY_DURATION.with_label_values(&[name]).start_timer();
289 let res = query.await;
290 if res.is_ok() {
291 timer.observe_duration();
292 } else {
293 timer.stop_and_discard();
294 }
295 Ok(res?)
296}
297
298struct SqliteStoreCollector {
299 desc: Desc,
300 store: Arc<Mutex<BlockStore>>,
301}
302
303impl Collector for SqliteStoreCollector {
304 fn desc(&self) -> Vec<&Desc> {
305 vec![&self.desc]
306 }
307
308 fn collect(&self) -> Vec<MetricFamily> {
309 let mut family = vec![];
310
311 if let Ok(stats) = self.store.lock().get_store_stats() {
312 let store_block_count =
313 IntGauge::new("block_store_block_count", "Number of stored blocks").unwrap();
314 store_block_count.set(stats.count() as _);
315 family.push(store_block_count.collect()[0].clone());
316
317 let store_size =
318 IntGauge::new("block_store_size", "Size in bytes of stored blocks").unwrap();
319 store_size.set(stats.size() as _);
320 family.push(store_size.collect()[0].clone());
321 }
322
323 family
324 }
325}
326
327impl SqliteStoreCollector {
328 pub fn new(store: Arc<Mutex<BlockStore>>) -> Self {
329 let desc = Desc::new(
330 "block_store_stats".into(),
331 ".".into(),
332 Default::default(),
333 Default::default(),
334 )
335 .unwrap();
336 Self { store, desc }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343 use futures::stream::StreamExt;
344 use libipld::cbor::DagCborCodec;
345 use libipld::multihash::Code;
346 use libipld::store::DefaultParams;
347 use libipld::{alias, ipld};
348
349 fn create_block(ipld: &Ipld) -> Block<DefaultParams> {
350 Block::encode(DagCborCodec, Code::Blake3_256, ipld).unwrap()
351 }
352
353 macro_rules! assert_evicted {
354 ($store:expr, $block:expr) => {
355 assert_eq!($store.reverse_alias($block.cid()).unwrap(), None);
356 };
357 }
358
359 macro_rules! assert_pinned {
360 ($store:expr, $block:expr) => {
361 assert_eq!(
362 $store
363 .reverse_alias($block.cid())
364 .unwrap()
365 .map(|a| !a.is_empty()),
366 Some(true)
367 );
368 };
369 }
370
371 macro_rules! assert_unpinned {
372 ($store:expr, $block:expr) => {
373 assert_eq!(
374 $store
375 .reverse_alias($block.cid())
376 .unwrap()
377 .map(|a| !a.is_empty()),
378 Some(false)
379 );
380 };
381 }
382
383 fn tracing_try_init() {
384 tracing_subscriber::fmt()
385 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
386 .try_init()
387 .ok();
388 }
389
390 fn create_store() -> (
391 StorageService<DefaultParams>,
392 mpsc::UnboundedReceiver<StorageEvent>,
393 ) {
394 let (tx, rx) = mpsc::unbounded();
395 let config = StorageConfig::new(None, 2, Duration::from_secs(100));
396 (StorageService::open(config, tx).unwrap(), rx)
397 }
398
399 #[async_std::test]
400 async fn test_store_evict() {
401 tracing_try_init();
402 let (store, mut rx) = create_store();
403 let blocks = [
404 create_block(&ipld!(0)),
405 create_block(&ipld!(1)),
406 create_block(&ipld!(2)),
407 create_block(&ipld!(3)),
408 ];
409 store.insert(&blocks[0]).unwrap();
410 store.insert(&blocks[1]).unwrap();
411 store.flush().await.unwrap();
412 store.evict().await.unwrap();
413 assert_unpinned!(&store, &blocks[0]);
414 assert_unpinned!(&store, &blocks[1]);
415 store.insert(&blocks[2]).unwrap();
416 store.flush().await.unwrap();
417 store.evict().await.unwrap();
418 assert_evicted!(&store, &blocks[0]);
419 assert_unpinned!(&store, &blocks[1]);
420 assert_unpinned!(&store, &blocks[2]);
421 store.get(blocks[1].cid()).unwrap();
422 store.insert(&blocks[3]).unwrap();
423 store.flush().await.unwrap();
424 store.evict().await.unwrap();
425 assert_unpinned!(&store, &blocks[1]);
426 assert_evicted!(&store, &blocks[2]);
427 assert_unpinned!(&store, &blocks[3]);
428 assert_eq!(
429 rx.next().await,
430 Some(StorageEvent::Remove(*blocks[0].cid()))
431 );
432 assert_eq!(
433 rx.next().await,
434 Some(StorageEvent::Remove(*blocks[2].cid()))
435 );
436 }
437
438 #[async_std::test]
439 #[allow(clippy::many_single_char_names)]
440 async fn test_store_unpin() {
441 tracing_try_init();
442 let (store, _) = create_store();
443 let a = create_block(&ipld!({ "a": [] }));
444 let b = create_block(&ipld!({ "b": [a.cid()] }));
445 let c = create_block(&ipld!({ "c": [a.cid()] }));
446 let x = alias!(x).as_bytes().to_vec();
447 let y = alias!(y).as_bytes().to_vec();
448 store.insert(&a).unwrap();
449 store.insert(&b).unwrap();
450 store.insert(&c).unwrap();
451 store.alias(&x, Some(b.cid())).unwrap();
452 store.alias(&y, Some(c.cid())).unwrap();
453 store.flush().await.unwrap();
454 assert_pinned!(&store, &a);
455 assert_pinned!(&store, &b);
456 assert_pinned!(&store, &c);
457 store.alias(&x, None).unwrap();
458 store.flush().await.unwrap();
459 assert_pinned!(&store, &a);
460 assert_unpinned!(&store, &b);
461 assert_pinned!(&store, &c);
462 store.alias(&y, None).unwrap();
463 store.flush().await.unwrap();
464 assert_unpinned!(&store, &a);
465 assert_unpinned!(&store, &b);
466 assert_unpinned!(&store, &c);
467 }
468
469 #[async_std::test]
470 #[allow(clippy::many_single_char_names)]
471 async fn test_store_unpin2() {
472 tracing_try_init();
473 let (store, _) = create_store();
474 let a = create_block(&ipld!({ "a": [] }));
475 let b = create_block(&ipld!({ "b": [a.cid()] }));
476 let x = alias!(x).as_bytes().to_vec();
477 let y = alias!(y).as_bytes().to_vec();
478 store.insert(&a).unwrap();
479 store.insert(&b).unwrap();
480 store.alias(&x, Some(b.cid())).unwrap();
481 store.alias(&y, Some(b.cid())).unwrap();
482 store.flush().await.unwrap();
483 assert_pinned!(&store, &a);
484 assert_pinned!(&store, &b);
485 store.alias(&x, None).unwrap();
486 store.flush().await.unwrap();
487 assert_pinned!(&store, &a);
488 assert_pinned!(&store, &b);
489 store.alias(&y, None).unwrap();
490 store.flush().await.unwrap();
491 assert_unpinned!(&store, &a);
492 assert_unpinned!(&store, &b);
493 }
494}