fuel_core/service/adapters/
compression_adapters.rs

1use super::import_result_provider::{
2    self,
3};
4use crate::{
5    database::{
6        Database,
7        database_description::{
8            compression::CompressionDatabase,
9            on_chain::OnChain,
10        },
11    },
12    service::adapters::BlockImporterAdapter,
13};
14use fuel_core_compression_service::{
15    config,
16    ports::{
17        block_source::{
18            self,
19            BlockAt,
20        },
21        canonical_height,
22        compression_storage,
23        configuration,
24    },
25};
26use fuel_core_storage::transactional::{
27    AtomicView,
28    HistoricalView,
29};
30use fuel_core_types::{
31    blockchain::block::Block,
32    fuel_types::ChainId,
33    services::block_importer::SharedImportResult,
34};
35
36/// Provides the necessary functionality for accessing latest and historical block data.
37pub struct CompressionBlockDBAdapter {
38    block_importer: BlockImporterAdapter,
39    db: Database<OnChain>,
40}
41
42impl CompressionBlockDBAdapter {
43    pub fn new(block_importer: BlockImporterAdapter, db: Database<OnChain>) -> Self {
44        Self { block_importer, db }
45    }
46}
47
48impl From<BlockAt> for import_result_provider::BlockAt {
49    fn from(value: BlockAt) -> Self {
50        match value {
51            BlockAt::Genesis => Self::Genesis,
52            BlockAt::Specific(h) => Self::Specific(h.into()),
53        }
54    }
55}
56
57impl block_source::BlockSource for CompressionBlockDBAdapter {
58    fn subscribe(&self) -> fuel_core_services::stream::BoxStream<SharedImportResult> {
59        self.block_importer.events_shared_result()
60    }
61
62    fn get_block(&self, height: BlockAt) -> anyhow::Result<Block> {
63        let latest_view = self.db.latest_view()?;
64        let height = match height {
65            BlockAt::Genesis => latest_view.genesis_height()?.ok_or_else(|| {
66                anyhow::anyhow!("Genesis block not found in the database")
67            })?,
68            BlockAt::Specific(h) => h.into(),
69        };
70        let block = latest_view
71            .get_sealed_block_by_height(&height)?
72            .map(|sealed_block| sealed_block.entity)
73            .ok_or(anyhow::anyhow!("Block not found at height: {:?}", height))?;
74        Ok(block)
75    }
76}
77
78impl configuration::CompressionConfigProvider
79    for crate::service::config::DaCompressionConfig
80{
81    fn config(&self, chain_id: ChainId) -> config::CompressionConfig {
82        config::CompressionConfig::new(
83            self.retention_duration,
84            self.starting_height,
85            self.metrics,
86            chain_id,
87        )
88    }
89}
90
91impl compression_storage::LatestHeight for Database<CompressionDatabase> {
92    fn latest_height(&self) -> Option<u32> {
93        HistoricalView::latest_height(self).map(Into::into)
94    }
95}
96
97impl canonical_height::CanonicalHeight for Database<OnChain> {
98    fn get(&self) -> Option<u32> {
99        HistoricalView::latest_height(self).map(Into::into)
100    }
101}
102
103pub struct CompressionServiceAdapter {
104    db: Database<CompressionDatabase>,
105}
106
107impl CompressionServiceAdapter {
108    pub fn new(db: Database<CompressionDatabase>) -> Self {
109        Self { db }
110    }
111
112    pub fn storage(&self) -> &Database<CompressionDatabase> {
113        &self.db
114    }
115}