Skip to main content

forest/db/parity_db/
gc.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::db::{BlockstoreWriteOpsSubscribable, HeaviestTipsetKeyProvider};
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8
9/// A trait for databases that support garbage collection by resetting specific columns.
10#[auto_impl::auto_impl(&, Arc)]
11pub trait GarbageCollectableDb {
12    fn reset_gc_columns(&self) -> anyhow::Result<()>;
13}
14
15/// A wrapper around `ParityDb` that provides a method to reset the columns used for garbage collection.
16pub struct GarbageCollectableParityDb {
17    options: Options,
18    db: RwLock<ParityDb>,
19}
20
21impl GarbageCollectableParityDb {
22    pub fn new(options: Options) -> anyhow::Result<Self> {
23        let db = RwLock::new(ParityDb::open_with_options(&options)?);
24        Ok(Self { options, db })
25    }
26
27    pub fn reset_gc_columns(&self) -> anyhow::Result<()> {
28        let mut guard = self.db.write();
29        // Close the database before resetting the columns, otherwise parity-db will fail to reset them.
30        let tmp_db_dir = tempfile::tempdir()?;
31        let tmp = ParityDb::open(tmp_db_dir.path(), &ParityDbConfig::default())?;
32        // Close the database by dropping it, and replace it with a temporary one to avoid holding the file handles of the original database.
33        drop(std::mem::replace(&mut *guard, tmp));
34        let result = self.reset_gc_columns_inner();
35        // Reopen the database no matter whether resetting columns succeeds or not
36        *guard = ParityDb::open_with_options(&self.options)
37            .with_context(|| {
38                format!(
39                    "failed to reopen parity-db at {}",
40                    self.options.path.display()
41                )
42            })
43            .expect("unexpected fatal error");
44        result
45    }
46
47    fn reset_gc_columns_inner(&self) -> anyhow::Result<()> {
48        const GC_COLUMNS: [u8; 2] = [
49            DbColumn::GraphDagCborBlake2b256 as u8,
50            DbColumn::GraphFull as u8,
51        ];
52
53        let mut options = self.options.clone();
54        for col in GC_COLUMNS {
55            let start = Instant::now();
56            tracing::info!("pruning parity-db column {col}...");
57            // Allow up to 3 attempts with 1s interval in case parity-db is still holding some file handles to the column.
58            // Note: retry should no longer be needed with the current logic, keeping it for now just for safety.
59            const MAX_ATTEMPTS: usize = 3;
60            for i in 1..=MAX_ATTEMPTS {
61                match parity_db::Db::reset_column(&mut options, col, None) {
62                    Ok(_) => break,
63                    Err(_) if i < MAX_ATTEMPTS => {
64                        tracing::warn!("retry pruning parity-db column {col} in 1s...");
65                        std::thread::sleep(Duration::from_secs(1));
66                    }
67                    Err(e) => anyhow::bail!(
68                        "failed to reset parity-db column {col} after {MAX_ATTEMPTS} attempts: {e}"
69                    ),
70                }
71            }
72            tracing::info!(
73                "pruned parity-db column {col}, took {}",
74                humantime::format_duration(start.elapsed())
75            );
76        }
77        Ok(())
78    }
79}
80
81impl GarbageCollectableDb for GarbageCollectableParityDb {
82    fn reset_gc_columns(&self) -> anyhow::Result<()> {
83        self.reset_gc_columns()
84    }
85}
86
87impl Blockstore for GarbageCollectableParityDb {
88    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
89        Blockstore::get(&*self.db.read(), k)
90    }
91
92    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
93        Blockstore::put_keyed(&*self.db.read(), k, block)
94    }
95
96    fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
97    where
98        Self: Sized,
99        D: AsRef<[u8]>,
100        I: IntoIterator<Item = (Cid, D)>,
101    {
102        Blockstore::put_many_keyed(&*self.db.read(), blocks)
103    }
104}
105
106impl HeaviestTipsetKeyProvider for GarbageCollectableParityDb {
107    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
108        HeaviestTipsetKeyProvider::heaviest_tipset_key(&*self.db.read())
109    }
110
111    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
112        HeaviestTipsetKeyProvider::set_heaviest_tipset_key(&*self.db.read(), tsk)
113    }
114}
115
116impl SettingsStore for GarbageCollectableParityDb {
117    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
118        SettingsStore::read_bin(&*self.db.read(), key)
119    }
120
121    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
122        SettingsStore::write_bin(&*self.db.read(), key, value)
123    }
124
125    fn exists(&self, key: &str) -> anyhow::Result<bool> {
126        SettingsStore::exists(&*self.db.read(), key)
127    }
128
129    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
130        SettingsStore::setting_keys(&*self.db.read())
131    }
132}
133
134impl EthMappingsStore for GarbageCollectableParityDb {
135    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
136        EthMappingsStore::read_bin(&*self.db.read(), key)
137    }
138
139    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
140        EthMappingsStore::write_bin(&*self.db.read(), key, value)
141    }
142
143    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
144        EthMappingsStore::exists(&*self.db.read(), key)
145    }
146
147    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
148        EthMappingsStore::get_message_cids(&*self.db.read())
149    }
150
151    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
152        EthMappingsStore::delete(&*self.db.read(), keys)
153    }
154
155    fn tipset_key_by_epoch(&self, epoch: i64) -> anyhow::Result<Option<TipsetKey>> {
156        EthMappingsStore::tipset_key_by_epoch(&*self.db.read(), epoch)
157    }
158
159    fn set_tipset_key_at_epoch(&self, ts: &Tipset) -> anyhow::Result<()> {
160        EthMappingsStore::set_tipset_key_at_epoch(&*self.db.read(), ts)
161    }
162}
163
164impl PersistentStore for GarbageCollectableParityDb {
165    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
166        PersistentStore::put_keyed_persistent(&*self.db.read(), k, block)
167    }
168}
169
170impl BitswapStoreRead for GarbageCollectableParityDb {
171    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
172        BitswapStoreRead::contains(&*self.db.read(), cid)
173    }
174
175    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
176        BitswapStoreRead::get(&*self.db.read(), cid)
177    }
178}
179
180impl BitswapStoreReadWrite for GarbageCollectableParityDb {
181    type Hashes = <ParityDb as BitswapStoreReadWrite>::Hashes;
182
183    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
184        BitswapStoreReadWrite::insert(&*self.db.read(), block)
185    }
186}
187
188impl DBStatistics for GarbageCollectableParityDb {
189    fn get_statistics(&self) -> Option<String> {
190        DBStatistics::get_statistics(&*self.db.read())
191    }
192}
193
194impl BlockstoreWriteOpsSubscribable for GarbageCollectableParityDb {
195    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
196        BlockstoreWriteOpsSubscribable::subscribe_write_ops(&*self.db.read())
197    }
198
199    fn unsubscribe_write_ops(&self) {
200        BlockstoreWriteOpsSubscribable::unsubscribe_write_ops(&*self.db.read())
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::utils::db::car_stream::CarBlock;
208    use quickcheck_macros::quickcheck;
209
210    #[quickcheck]
211    fn test_reset_gc_columns(blocks: Vec<CarBlock>) -> anyhow::Result<()> {
212        let db_path = tempfile::tempdir()?;
213        let options = ParityDb::to_options(db_path.path(), &ParityDbConfig::default());
214        let db = GarbageCollectableParityDb::new(options)?;
215        // insert blocks
216        for b in &blocks {
217            db.put_keyed(&b.cid, &b.data)?;
218        }
219        // check blocks are present
220        for b in &blocks {
221            assert_eq!(
222                Blockstore::get(&db, &b.cid)?.map(Bytes::from).as_ref(),
223                Some(&b.data)
224            );
225        }
226        // reset gc columns
227        db.reset_gc_columns()?;
228        // check blocks are gone
229        for b in &blocks {
230            assert_eq!(Blockstore::get(&db, &b.cid)?, None);
231        }
232        // insert blocks again
233        for b in &blocks {
234            db.put_keyed(&b.cid, &b.data)?;
235        }
236        // check blocks are present
237        for b in &blocks {
238            assert_eq!(
239                Blockstore::get(&db, &b.cid)?.map(Bytes::from).as_ref(),
240                Some(&b.data)
241            );
242        }
243        Ok(())
244    }
245}