Skip to main content

forest/db/parity_db/
gc.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::*;
5use crate::db::{BlockstoreWriteOpsSubscribable, HeaviestTipsetKeyProvider};
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8
9/// A trait for databases that support garbage collection by resetting specific columns.
10#[auto_impl::auto_impl(&, Arc)]
11pub trait GarbageCollectableDb {
12    fn reset_gc_columns(&self) -> anyhow::Result<()>;
13}
14
15/// A wrapper around `ParityDb` that provides a method to reset the columns used for garbage collection.
16pub struct GarbageCollectableParityDb {
17    options: Options,
18    db: RwLock<ParityDb>,
19}
20
21impl GarbageCollectableParityDb {
22    pub fn new(options: Options) -> anyhow::Result<Self> {
23        let db = RwLock::new(ParityDb::open_with_options(&options)?);
24        Ok(Self { options, db })
25    }
26
27    pub fn reset_gc_columns(&self) -> anyhow::Result<()> {
28        let mut guard = self.db.write();
29        // Close the database before resetting the columns, otherwise parity-db will fail to reset them.
30        let tmp_db_dir = tempfile::tempdir()?;
31        let tmp = ParityDb::open(tmp_db_dir.path(), &ParityDbConfig::default())?;
32        // Close the database by dropping it, and replace it with a temporary one to avoid holding the file handles of the original database.
33        drop(std::mem::replace(&mut *guard, tmp));
34        let result = self.reset_gc_columns_inner();
35        // Reopen the database no matter whether resetting columns succeeds or not
36        *guard = ParityDb::open_with_options(&self.options)
37            .with_context(|| {
38                format!(
39                    "failed to reopen parity-db at {}",
40                    self.options.path.display()
41                )
42            })
43            .expect("unexpected fatal error");
44        result
45    }
46
47    fn reset_gc_columns_inner(&self) -> anyhow::Result<()> {
48        const GC_COLUMNS: [u8; 2] = [
49            DbColumn::GraphDagCborBlake2b256 as u8,
50            DbColumn::GraphFull as u8,
51        ];
52
53        let mut options = self.options.clone();
54        for col in GC_COLUMNS {
55            let start = Instant::now();
56            tracing::info!("pruning parity-db column {col}...");
57            // Allow up to 3 attempts with 1s interval in case parity-db is still holding some file handles to the column.
58            // Note: retry should no longer be needed with the current logic, keeping it for now just for safety.
59            const MAX_ATTEMPTS: usize = 3;
60            for i in 1..=MAX_ATTEMPTS {
61                match parity_db::Db::reset_column(&mut options, col, None) {
62                    Ok(_) => break,
63                    Err(_) if i < MAX_ATTEMPTS => {
64                        tracing::warn!("retry pruning parity-db column {col} in 1s...");
65                        std::thread::sleep(Duration::from_secs(1));
66                    }
67                    Err(e) => anyhow::bail!(
68                        "failed to reset parity-db column {col} after {MAX_ATTEMPTS} attempts: {e}"
69                    ),
70                }
71            }
72            tracing::info!(
73                "pruned parity-db column {col}, took {}",
74                humantime::format_duration(start.elapsed())
75            );
76        }
77        Ok(())
78    }
79}
80
81impl GarbageCollectableDb for GarbageCollectableParityDb {
82    fn reset_gc_columns(&self) -> anyhow::Result<()> {
83        self.reset_gc_columns()
84    }
85}
86
87impl Blockstore for GarbageCollectableParityDb {
88    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
89        Blockstore::get(&*self.db.read(), k)
90    }
91
92    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
93        Blockstore::put_keyed(&*self.db.read(), k, block)
94    }
95
96    fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
97    where
98        Self: Sized,
99        D: AsRef<[u8]>,
100        I: IntoIterator<Item = (Cid, D)>,
101    {
102        Blockstore::put_many_keyed(&*self.db.read(), blocks)
103    }
104}
105
106impl HeaviestTipsetKeyProvider for GarbageCollectableParityDb {
107    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
108        HeaviestTipsetKeyProvider::heaviest_tipset_key(&*self.db.read())
109    }
110
111    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
112        HeaviestTipsetKeyProvider::set_heaviest_tipset_key(&*self.db.read(), tsk)
113    }
114}
115
116impl SettingsStore for GarbageCollectableParityDb {
117    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
118        SettingsStore::read_bin(&*self.db.read(), key)
119    }
120
121    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
122        SettingsStore::write_bin(&*self.db.read(), key, value)
123    }
124
125    fn exists(&self, key: &str) -> anyhow::Result<bool> {
126        SettingsStore::exists(&*self.db.read(), key)
127    }
128
129    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
130        SettingsStore::setting_keys(&*self.db.read())
131    }
132}
133
134impl EthMappingsStore for GarbageCollectableParityDb {
135    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
136        EthMappingsStore::read_bin(&*self.db.read(), key)
137    }
138
139    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
140        EthMappingsStore::write_bin(&*self.db.read(), key, value)
141    }
142
143    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
144        EthMappingsStore::exists(&*self.db.read(), key)
145    }
146
147    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
148        EthMappingsStore::get_message_cids(&*self.db.read())
149    }
150
151    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
152        EthMappingsStore::delete(&*self.db.read(), keys)
153    }
154}
155
156impl PersistentStore for GarbageCollectableParityDb {
157    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
158        PersistentStore::put_keyed_persistent(&*self.db.read(), k, block)
159    }
160}
161
162impl BitswapStoreRead for GarbageCollectableParityDb {
163    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
164        BitswapStoreRead::contains(&*self.db.read(), cid)
165    }
166
167    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
168        BitswapStoreRead::get(&*self.db.read(), cid)
169    }
170}
171
172impl BitswapStoreReadWrite for GarbageCollectableParityDb {
173    type Hashes = <ParityDb as BitswapStoreReadWrite>::Hashes;
174
175    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
176        BitswapStoreReadWrite::insert(&*self.db.read(), block)
177    }
178}
179
180impl DBStatistics for GarbageCollectableParityDb {
181    fn get_statistics(&self) -> Option<String> {
182        DBStatistics::get_statistics(&*self.db.read())
183    }
184}
185
186impl BlockstoreWriteOpsSubscribable for GarbageCollectableParityDb {
187    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
188        BlockstoreWriteOpsSubscribable::subscribe_write_ops(&*self.db.read())
189    }
190
191    fn unsubscribe_write_ops(&self) {
192        BlockstoreWriteOpsSubscribable::unsubscribe_write_ops(&*self.db.read())
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::utils::db::car_stream::CarBlock;
200    use quickcheck_macros::quickcheck;
201
202    #[quickcheck]
203    fn test_reset_gc_columns(blocks: Vec<CarBlock>) -> anyhow::Result<()> {
204        let db_path = tempfile::tempdir()?;
205        let options = ParityDb::to_options(db_path.path(), &ParityDbConfig::default());
206        let db = GarbageCollectableParityDb::new(options)?;
207        // insert blocks
208        for b in &blocks {
209            db.put_keyed(&b.cid, &b.data)?;
210        }
211        // check blocks are present
212        for b in &blocks {
213            assert_eq!(Blockstore::get(&db, &b.cid)?.as_ref(), Some(&b.data));
214        }
215        // reset gc columns
216        db.reset_gc_columns()?;
217        // check blocks are gone
218        for b in &blocks {
219            assert_eq!(Blockstore::get(&db, &b.cid)?, None);
220        }
221        // insert blocks again
222        for b in &blocks {
223            db.put_keyed(&b.cid, &b.data)?;
224        }
225        // check blocks are present
226        for b in &blocks {
227            assert_eq!(Blockstore::get(&db, &b.cid)?.as_ref(), Some(&b.data));
228        }
229        Ok(())
230    }
231}