forest/db/ttl/
mod.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::eth::EthChainId;
5use crate::message::ChainMessage;
6use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash};
7use crate::shim::clock::{ChainEpoch, EPOCH_DURATION_SECONDS};
8use fvm_ipld_blockstore::Blockstore;
9use std::sync::Arc;
10use std::time::Duration;
11
12use super::EthMappingsStore;
13
14pub struct EthMappingCollector<DB> {
15    db: Arc<DB>,
16    eth_chain_id: EthChainId,
17    ttl: std::time::Duration,
18}
19
20impl<DB: Blockstore + EthMappingsStore + Sync + Send + 'static> EthMappingCollector<DB> {
21    /// Creates a `TTL` collector for the Ethereum mapping.
22    ///
23    pub fn new(db: Arc<DB>, eth_chain_id: EthChainId, retention_epochs: ChainEpoch) -> Self {
24        // Convert retention_epochs to number of seconds
25        let secs = EPOCH_DURATION_SECONDS * retention_epochs;
26        Self {
27            db,
28            eth_chain_id,
29            ttl: Duration::from_secs(secs as u64),
30        }
31    }
32
33    /// Remove keys whose `(duration - timestamp) > TTL` from the database
34    /// where `duration` is the elapsed time since "UNIX timestamp".
35    fn ttl_workflow(&self, duration: Duration) -> anyhow::Result<()> {
36        let keys: Vec<EthHash> = self
37            .db
38            .get_message_cids()?
39            .iter()
40            .filter(|(_, timestamp)| {
41                duration.saturating_sub(Duration::from_secs(*timestamp)) > self.ttl
42            })
43            .filter_map(|(cid, _)| {
44                let message = crate::chain::get_chain_message(self.db.as_ref(), cid);
45                if let Ok(ChainMessage::Signed(smsg)) = message {
46                    let result = eth_tx_from_signed_eth_message(&smsg, self.eth_chain_id);
47                    if let Ok((_, tx)) = result {
48                        tx.eth_hash().ok().map(EthHash)
49                    } else {
50                        None
51                    }
52                } else {
53                    None
54                }
55            })
56            .collect();
57
58        for h in keys.iter() {
59            tracing::trace!("Marked {} for deletion", h);
60        }
61        let count = keys.len();
62        self.db.delete(keys)?;
63
64        tracing::debug!(
65            "Found and deleted {count} mappings older than {:?}",
66            self.ttl
67        );
68
69        Ok(())
70    }
71
72    pub async fn run(&mut self) -> anyhow::Result<()> {
73        loop {
74            tokio::time::sleep(self.ttl).await;
75
76            let duration = Duration::from_secs(chrono::Utc::now().timestamp() as u64);
77            self.ttl_workflow(duration)?;
78        }
79    }
80}
81
82#[cfg(test)]
83mod test {
84    use std::convert::TryFrom;
85
86    use chrono::{DateTime, TimeZone, Utc};
87    use cid::Cid;
88
89    use crate::chain_sync::TipsetValidator;
90    use crate::db::EthMappingsStore;
91    use crate::db::EthMappingsStoreExt;
92    use crate::db::MemoryDB;
93    use crate::networks::calibnet::ETH_CHAIN_ID;
94    use crate::test_utils::construct_eth_messages;
95
96    const ZERO_DURATION: Duration = Duration::from_secs(0);
97    const EPS_DURATION: Duration = Duration::from_secs(1);
98    const RETENTION_EPOCHS: i64 = 2;
99
100    use super::*;
101
102    #[tokio::test]
103    async fn test_ttl() {
104        let blockstore = Arc::new(MemoryDB::default());
105
106        let (bls0, secp0) = construct_eth_messages(0);
107        let (bls1, secp1) = construct_eth_messages(1);
108
109        crate::chain::persist_objects(&blockstore, [bls0.clone(), bls1.clone()].iter()).unwrap();
110        crate::chain::persist_objects(&blockstore, [secp0.clone(), secp1.clone()].iter()).unwrap();
111
112        let expected_root =
113            Cid::try_from("bafy2bzacebqzqoow32yddtu746myprecdtblty77f3k6at6v2axkhvqd3iwvi")
114                .unwrap();
115
116        let root = TipsetValidator::compute_msg_root(
117            &blockstore,
118            &[bls0.clone(), bls1.clone()],
119            &[secp0.clone(), secp1.clone()],
120        )
121        .expect("Computing message root should succeed");
122        assert_eq!(root, expected_root);
123
124        // Unix epoch corresponds to 1970-01-01 00:00:00 UTC
125        let unix_timestamp: DateTime<Utc> = Utc.timestamp_opt(0, 0).unwrap();
126
127        // Add key0 with unix epoch
128        let (_, tx0) = eth_tx_from_signed_eth_message(&secp0, ETH_CHAIN_ID).unwrap();
129        let key0 = tx0.eth_hash().unwrap().into();
130
131        let timestamp = unix_timestamp.timestamp() as u64;
132        blockstore
133            .write_obj(&key0, &(secp0.cid(), timestamp))
134            .unwrap();
135
136        assert!(blockstore.exists(&key0).unwrap());
137
138        // Add key1 with unix epoch + 2 * ttl
139        let (_, tx1) = eth_tx_from_signed_eth_message(&secp1, ETH_CHAIN_ID).unwrap();
140        let key1 = tx1.eth_hash().unwrap().into();
141
142        let ttl_duration = Duration::from_secs(
143            (RETENTION_EPOCHS * EPOCH_DURATION_SECONDS)
144                .try_into()
145                .unwrap(),
146        );
147
148        blockstore
149            .write_obj(
150                &key1,
151                &(
152                    secp1.cid(),
153                    unix_timestamp.timestamp() as u64 + 2 * ttl_duration.as_secs(),
154                ),
155            )
156            .unwrap();
157
158        assert!(blockstore.exists(&key1).unwrap());
159
160        let collector =
161            EthMappingCollector::new(blockstore.clone(), ETH_CHAIN_ID, RETENTION_EPOCHS);
162
163        collector.ttl_workflow(ZERO_DURATION).unwrap();
164
165        assert!(blockstore.exists(&key0).unwrap());
166        assert!(blockstore.exists(&key1).unwrap());
167
168        collector.ttl_workflow(ttl_duration + EPS_DURATION).unwrap();
169
170        assert!(!blockstore.exists(&key0).unwrap());
171        assert!(blockstore.exists(&key1).unwrap());
172    }
173}