1use crate::eth::EthChainId;
5use crate::message::ChainMessage;
6use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
7use crate::shim::clock::{ChainEpoch, EPOCH_DURATION_SECONDS};
8use fvm_ipld_blockstore::Blockstore;
9use std::sync::Arc;
10use std::time::Duration;
11
12use super::EthMappingsStore;
13
14pub struct EthMappingCollector<DB> {
15 db: Arc<DB>,
16 eth_chain_id: EthChainId,
17 ttl: std::time::Duration,
18}
19
20impl<DB: Blockstore + EthMappingsStore + Sync + Send + 'static> EthMappingCollector<DB> {
21 pub fn new(db: Arc<DB>, eth_chain_id: EthChainId, retention_epochs: ChainEpoch) -> Self {
24 let secs = EPOCH_DURATION_SECONDS * retention_epochs;
26 Self {
27 db,
28 eth_chain_id,
29 ttl: Duration::from_secs(secs as u64),
30 }
31 }
32
33 fn ttl_workflow(&self, duration: Duration) -> anyhow::Result<()> {
36 let keys: Vec<EthHash> = self
37 .db
38 .get_message_cids()?
39 .iter()
40 .filter(|(_, timestamp)| {
41 duration.saturating_sub(Duration::from_secs(*timestamp)) > self.ttl
42 })
43 .filter_map(|(cid, _)| {
44 let message = crate::chain::get_chain_message(self.db.as_ref(), cid);
45 if let Ok(ChainMessage::Signed(smsg)) = message {
46 let result = eth_tx_from_signed_eth_message(&smsg, self.eth_chain_id);
47 if let Ok((_, tx)) = result {
48 tx.eth_hash().ok().map(EthHash)
49 } else {
50 None
51 }
52 } else {
53 None
54 }
55 })
56 .collect();
57
58 for h in keys.iter() {
59 tracing::trace!("Marked {} for deletion", h);
60 }
61 let count = keys.len();
62 self.db.delete(keys)?;
63
64 tracing::debug!(
65 "Found and deleted {count} mappings older than {:?}",
66 self.ttl
67 );
68
69 Ok(())
70 }
71
72 pub async fn run(&mut self) -> anyhow::Result<()> {
73 loop {
74 tokio::time::sleep(self.ttl).await;
75
76 let duration = Duration::from_secs(chrono::Utc::now().timestamp() as u64);
77 self.ttl_workflow(duration)?;
78 }
79 }
80}
81
82#[cfg(test)]
83mod test {
84 use std::convert::TryFrom;
85
86 use chrono::{DateTime, TimeZone, Utc};
87 use cid::Cid;
88
89 use crate::chain_sync::TipsetValidator;
90 use crate::db::EthMappingsStore;
91 use crate::db::EthMappingsStoreExt;
92 use crate::db::MemoryDB;
93 use crate::networks::calibnet::ETH_CHAIN_ID;
94 use crate::test_utils::construct_eth_messages;
95
96 const ZERO_DURATION: Duration = Duration::from_secs(0);
97 const EPS_DURATION: Duration = Duration::from_secs(1);
98 const RETENTION_EPOCHS: i64 = 2;
99
100 use super::*;
101
102 #[tokio::test]
103 async fn test_ttl() {
104 let blockstore = Arc::new(MemoryDB::default());
105
106 let (bls0, secp0) = construct_eth_messages(0);
107 let (bls1, secp1) = construct_eth_messages(1);
108
109 crate::chain::persist_objects(&blockstore, [bls0.clone(), bls1.clone()].iter()).unwrap();
110 crate::chain::persist_objects(&blockstore, [secp0.clone(), secp1.clone()].iter()).unwrap();
111
112 let expected_root =
113 Cid::try_from("bafy2bzacebqzqoow32yddtu746myprecdtblty77f3k6at6v2axkhvqd3iwvi")
114 .unwrap();
115
116 let root = TipsetValidator::compute_msg_root(
117 &blockstore,
118 &[bls0.clone(), bls1.clone()],
119 &[secp0.clone(), secp1.clone()],
120 )
121 .expect("Computing message root should succeed");
122 assert_eq!(root, expected_root);
123
124 let unix_timestamp: DateTime<Utc> = Utc.timestamp_opt(0, 0).unwrap();
126
127 let (_, tx0) = eth_tx_from_signed_eth_message(&secp0, ETH_CHAIN_ID).unwrap();
129 let key0 = tx0.eth_hash().unwrap().into();
130
131 let timestamp = unix_timestamp.timestamp() as u64;
132 blockstore
133 .write_obj(&key0, &(secp0.cid(), timestamp))
134 .unwrap();
135
136 assert!(blockstore.exists(&key0).unwrap());
137
138 let (_, tx1) = eth_tx_from_signed_eth_message(&secp1, ETH_CHAIN_ID).unwrap();
140 let key1 = tx1.eth_hash().unwrap().into();
141
142 let ttl_duration = Duration::from_secs(
143 (RETENTION_EPOCHS * EPOCH_DURATION_SECONDS)
144 .try_into()
145 .unwrap(),
146 );
147
148 blockstore
149 .write_obj(
150 &key1,
151 &(
152 secp1.cid(),
153 unix_timestamp.timestamp() as u64 + 2 * ttl_duration.as_secs(),
154 ),
155 )
156 .unwrap();
157
158 assert!(blockstore.exists(&key1).unwrap());
159
160 let collector =
161 EthMappingCollector::new(blockstore.clone(), ETH_CHAIN_ID, RETENTION_EPOCHS);
162
163 collector.ttl_workflow(ZERO_DURATION).unwrap();
164
165 assert!(blockstore.exists(&key0).unwrap());
166 assert!(blockstore.exists(&key1).unwrap());
167
168 collector.ttl_workflow(ttl_duration + EPS_DURATION).unwrap();
169
170 assert!(!blockstore.exists(&key0).unwrap());
171 assert!(blockstore.exists(&key1).unwrap());
172 }
173}