forest/db/parity_db/
gc.rs1use super::*;
5use crate::db::{BlockstoreWriteOpsSubscribable, HeaviestTipsetKeyProvider};
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8
9#[auto_impl::auto_impl(&, Arc)]
11pub trait GarbageCollectableDb {
12 fn reset_gc_columns(&self) -> anyhow::Result<()>;
13}
14
15pub struct GarbageCollectableParityDb {
17 options: Options,
18 db: RwLock<ParityDb>,
19}
20
21impl GarbageCollectableParityDb {
22 pub fn new(options: Options) -> anyhow::Result<Self> {
23 let db = RwLock::new(ParityDb::open_with_options(&options)?);
24 Ok(Self { options, db })
25 }
26
27 pub fn reset_gc_columns(&self) -> anyhow::Result<()> {
28 let mut guard = self.db.write();
29 let tmp_db_dir = tempfile::tempdir()?;
31 let tmp = ParityDb::open(tmp_db_dir.path(), &ParityDbConfig::default())?;
32 drop(std::mem::replace(&mut *guard, tmp));
34 let result = self.reset_gc_columns_inner();
35 *guard = ParityDb::open_with_options(&self.options)
37 .with_context(|| {
38 format!(
39 "failed to reopen parity-db at {}",
40 self.options.path.display()
41 )
42 })
43 .expect("unexpected fatal error");
44 result
45 }
46
47 fn reset_gc_columns_inner(&self) -> anyhow::Result<()> {
48 const GC_COLUMNS: [u8; 2] = [
49 DbColumn::GraphDagCborBlake2b256 as u8,
50 DbColumn::GraphFull as u8,
51 ];
52
53 let mut options = self.options.clone();
54 for col in GC_COLUMNS {
55 let start = Instant::now();
56 tracing::info!("pruning parity-db column {col}...");
57 const MAX_ATTEMPTS: usize = 3;
60 for i in 1..=MAX_ATTEMPTS {
61 match parity_db::Db::reset_column(&mut options, col, None) {
62 Ok(_) => break,
63 Err(_) if i < MAX_ATTEMPTS => {
64 tracing::warn!("retry pruning parity-db column {col} in 1s...");
65 std::thread::sleep(Duration::from_secs(1));
66 }
67 Err(e) => anyhow::bail!(
68 "failed to reset parity-db column {col} after {MAX_ATTEMPTS} attempts: {e}"
69 ),
70 }
71 }
72 tracing::info!(
73 "pruned parity-db column {col}, took {}",
74 humantime::format_duration(start.elapsed())
75 );
76 }
77 Ok(())
78 }
79}
80
81impl GarbageCollectableDb for GarbageCollectableParityDb {
82 fn reset_gc_columns(&self) -> anyhow::Result<()> {
83 self.reset_gc_columns()
84 }
85}
86
87impl Blockstore for GarbageCollectableParityDb {
88 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
89 Blockstore::get(&*self.db.read(), k)
90 }
91
92 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
93 Blockstore::put_keyed(&*self.db.read(), k, block)
94 }
95
96 fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
97 where
98 Self: Sized,
99 D: AsRef<[u8]>,
100 I: IntoIterator<Item = (Cid, D)>,
101 {
102 Blockstore::put_many_keyed(&*self.db.read(), blocks)
103 }
104}
105
106impl HeaviestTipsetKeyProvider for GarbageCollectableParityDb {
107 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
108 HeaviestTipsetKeyProvider::heaviest_tipset_key(&*self.db.read())
109 }
110
111 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
112 HeaviestTipsetKeyProvider::set_heaviest_tipset_key(&*self.db.read(), tsk)
113 }
114}
115
116impl SettingsStore for GarbageCollectableParityDb {
117 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
118 SettingsStore::read_bin(&*self.db.read(), key)
119 }
120
121 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
122 SettingsStore::write_bin(&*self.db.read(), key, value)
123 }
124
125 fn exists(&self, key: &str) -> anyhow::Result<bool> {
126 SettingsStore::exists(&*self.db.read(), key)
127 }
128
129 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
130 SettingsStore::setting_keys(&*self.db.read())
131 }
132}
133
134impl EthMappingsStore for GarbageCollectableParityDb {
135 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
136 EthMappingsStore::read_bin(&*self.db.read(), key)
137 }
138
139 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
140 EthMappingsStore::write_bin(&*self.db.read(), key, value)
141 }
142
143 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
144 EthMappingsStore::exists(&*self.db.read(), key)
145 }
146
147 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
148 EthMappingsStore::get_message_cids(&*self.db.read())
149 }
150
151 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
152 EthMappingsStore::delete(&*self.db.read(), keys)
153 }
154
155 fn tipset_key_by_epoch(&self, epoch: i64) -> anyhow::Result<Option<TipsetKey>> {
156 EthMappingsStore::tipset_key_by_epoch(&*self.db.read(), epoch)
157 }
158
159 fn set_tipset_key_at_epoch(&self, ts: &Tipset) -> anyhow::Result<()> {
160 EthMappingsStore::set_tipset_key_at_epoch(&*self.db.read(), ts)
161 }
162}
163
164impl PersistentStore for GarbageCollectableParityDb {
165 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
166 PersistentStore::put_keyed_persistent(&*self.db.read(), k, block)
167 }
168}
169
170impl BitswapStoreRead for GarbageCollectableParityDb {
171 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
172 BitswapStoreRead::contains(&*self.db.read(), cid)
173 }
174
175 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
176 BitswapStoreRead::get(&*self.db.read(), cid)
177 }
178}
179
180impl BitswapStoreReadWrite for GarbageCollectableParityDb {
181 type Hashes = <ParityDb as BitswapStoreReadWrite>::Hashes;
182
183 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
184 BitswapStoreReadWrite::insert(&*self.db.read(), block)
185 }
186}
187
188impl DBStatistics for GarbageCollectableParityDb {
189 fn get_statistics(&self) -> Option<String> {
190 DBStatistics::get_statistics(&*self.db.read())
191 }
192}
193
194impl BlockstoreWriteOpsSubscribable for GarbageCollectableParityDb {
195 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
196 BlockstoreWriteOpsSubscribable::subscribe_write_ops(&*self.db.read())
197 }
198
199 fn unsubscribe_write_ops(&self) {
200 BlockstoreWriteOpsSubscribable::unsubscribe_write_ops(&*self.db.read())
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::utils::db::car_stream::CarBlock;
208 use quickcheck_macros::quickcheck;
209
210 #[quickcheck]
211 fn test_reset_gc_columns(blocks: Vec<CarBlock>) -> anyhow::Result<()> {
212 let db_path = tempfile::tempdir()?;
213 let options = ParityDb::to_options(db_path.path(), &ParityDbConfig::default());
214 let db = GarbageCollectableParityDb::new(options)?;
215 for b in &blocks {
217 db.put_keyed(&b.cid, &b.data)?;
218 }
219 for b in &blocks {
221 assert_eq!(
222 Blockstore::get(&db, &b.cid)?.map(Bytes::from).as_ref(),
223 Some(&b.data)
224 );
225 }
226 db.reset_gc_columns()?;
228 for b in &blocks {
230 assert_eq!(Blockstore::get(&db, &b.cid)?, None);
231 }
232 for b in &blocks {
234 db.put_keyed(&b.cid, &b.data)?;
235 }
236 for b in &blocks {
238 assert_eq!(
239 Blockstore::get(&db, &b.cid)?.map(Bytes::from).as_ref(),
240 Some(&b.data)
241 );
242 }
243 Ok(())
244 }
245}