forest/db/parity_db/
gc.rs1use super::*;
5use crate::db::{BlockstoreWriteOpsSubscribable, HeaviestTipsetKeyProvider};
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8
9#[auto_impl::auto_impl(&, Arc)]
11pub trait GarbageCollectableDb {
12 fn reset_gc_columns(&self) -> anyhow::Result<()>;
13}
14
15pub struct GarbageCollectableParityDb {
17 options: Options,
18 db: RwLock<ParityDb>,
19}
20
21impl GarbageCollectableParityDb {
22 pub fn new(options: Options) -> anyhow::Result<Self> {
23 let db = RwLock::new(ParityDb::open_with_options(&options)?);
24 Ok(Self { options, db })
25 }
26
27 pub fn reset_gc_columns(&self) -> anyhow::Result<()> {
28 let mut guard = self.db.write();
29 let tmp_db_dir = tempfile::tempdir()?;
31 let tmp = ParityDb::open(tmp_db_dir.path(), &ParityDbConfig::default())?;
32 drop(std::mem::replace(&mut *guard, tmp));
34 let result = self.reset_gc_columns_inner();
35 *guard = ParityDb::open_with_options(&self.options)
37 .with_context(|| {
38 format!(
39 "failed to reopen parity-db at {}",
40 self.options.path.display()
41 )
42 })
43 .expect("unexpected fatal error");
44 result
45 }
46
47 fn reset_gc_columns_inner(&self) -> anyhow::Result<()> {
48 const GC_COLUMNS: [u8; 2] = [
49 DbColumn::GraphDagCborBlake2b256 as u8,
50 DbColumn::GraphFull as u8,
51 ];
52
53 let mut options = self.options.clone();
54 for col in GC_COLUMNS {
55 let start = Instant::now();
56 tracing::info!("pruning parity-db column {col}...");
57 const MAX_ATTEMPTS: usize = 3;
60 for i in 1..=MAX_ATTEMPTS {
61 match parity_db::Db::reset_column(&mut options, col, None) {
62 Ok(_) => break,
63 Err(_) if i < MAX_ATTEMPTS => {
64 tracing::warn!("retry pruning parity-db column {col} in 1s...");
65 std::thread::sleep(Duration::from_secs(1));
66 }
67 Err(e) => anyhow::bail!(
68 "failed to reset parity-db column {col} after {MAX_ATTEMPTS} attempts: {e}"
69 ),
70 }
71 }
72 tracing::info!(
73 "pruned parity-db column {col}, took {}",
74 humantime::format_duration(start.elapsed())
75 );
76 }
77 Ok(())
78 }
79}
80
81impl GarbageCollectableDb for GarbageCollectableParityDb {
82 fn reset_gc_columns(&self) -> anyhow::Result<()> {
83 self.reset_gc_columns()
84 }
85}
86
87impl Blockstore for GarbageCollectableParityDb {
88 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
89 Blockstore::get(&*self.db.read(), k)
90 }
91
92 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
93 Blockstore::put_keyed(&*self.db.read(), k, block)
94 }
95
96 fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
97 where
98 Self: Sized,
99 D: AsRef<[u8]>,
100 I: IntoIterator<Item = (Cid, D)>,
101 {
102 Blockstore::put_many_keyed(&*self.db.read(), blocks)
103 }
104}
105
106impl HeaviestTipsetKeyProvider for GarbageCollectableParityDb {
107 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
108 HeaviestTipsetKeyProvider::heaviest_tipset_key(&*self.db.read())
109 }
110
111 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
112 HeaviestTipsetKeyProvider::set_heaviest_tipset_key(&*self.db.read(), tsk)
113 }
114}
115
116impl SettingsStore for GarbageCollectableParityDb {
117 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
118 SettingsStore::read_bin(&*self.db.read(), key)
119 }
120
121 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
122 SettingsStore::write_bin(&*self.db.read(), key, value)
123 }
124
125 fn exists(&self, key: &str) -> anyhow::Result<bool> {
126 SettingsStore::exists(&*self.db.read(), key)
127 }
128
129 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
130 SettingsStore::setting_keys(&*self.db.read())
131 }
132}
133
134impl EthMappingsStore for GarbageCollectableParityDb {
135 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
136 EthMappingsStore::read_bin(&*self.db.read(), key)
137 }
138
139 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
140 EthMappingsStore::write_bin(&*self.db.read(), key, value)
141 }
142
143 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
144 EthMappingsStore::exists(&*self.db.read(), key)
145 }
146
147 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
148 EthMappingsStore::get_message_cids(&*self.db.read())
149 }
150
151 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
152 EthMappingsStore::delete(&*self.db.read(), keys)
153 }
154}
155
156impl PersistentStore for GarbageCollectableParityDb {
157 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
158 PersistentStore::put_keyed_persistent(&*self.db.read(), k, block)
159 }
160}
161
162impl BitswapStoreRead for GarbageCollectableParityDb {
163 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
164 BitswapStoreRead::contains(&*self.db.read(), cid)
165 }
166
167 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
168 BitswapStoreRead::get(&*self.db.read(), cid)
169 }
170}
171
172impl BitswapStoreReadWrite for GarbageCollectableParityDb {
173 type Hashes = <ParityDb as BitswapStoreReadWrite>::Hashes;
174
175 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
176 BitswapStoreReadWrite::insert(&*self.db.read(), block)
177 }
178}
179
180impl DBStatistics for GarbageCollectableParityDb {
181 fn get_statistics(&self) -> Option<String> {
182 DBStatistics::get_statistics(&*self.db.read())
183 }
184}
185
186impl BlockstoreWriteOpsSubscribable for GarbageCollectableParityDb {
187 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
188 BlockstoreWriteOpsSubscribable::subscribe_write_ops(&*self.db.read())
189 }
190
191 fn unsubscribe_write_ops(&self) {
192 BlockstoreWriteOpsSubscribable::unsubscribe_write_ops(&*self.db.read())
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::utils::db::car_stream::CarBlock;
200 use quickcheck_macros::quickcheck;
201
202 #[quickcheck]
203 fn test_reset_gc_columns(blocks: Vec<CarBlock>) -> anyhow::Result<()> {
204 let db_path = tempfile::tempdir()?;
205 let options = ParityDb::to_options(db_path.path(), &ParityDbConfig::default());
206 let db = GarbageCollectableParityDb::new(options)?;
207 for b in &blocks {
209 db.put_keyed(&b.cid, &b.data)?;
210 }
211 for b in &blocks {
213 assert_eq!(Blockstore::get(&db, &b.cid)?.as_ref(), Some(&b.data));
214 }
215 db.reset_gc_columns()?;
217 for b in &blocks {
219 assert_eq!(Blockstore::get(&db, &b.cid)?, None);
220 }
221 for b in &blocks {
223 db.put_keyed(&b.cid, &b.data)?;
224 }
225 for b in &blocks {
227 assert_eq!(Blockstore::get(&db, &b.cid)?.as_ref(), Some(&b.data));
228 }
229 Ok(())
230 }
231}