fuel_core/service/adapters/
compression_adapters.rs1use super::import_result_provider::{
2 self,
3};
4use crate::{
5 database::{
6 Database,
7 database_description::{
8 compression::CompressionDatabase,
9 on_chain::OnChain,
10 },
11 },
12 service::adapters::BlockImporterAdapter,
13};
14use fuel_core_compression_service::{
15 config,
16 ports::{
17 block_source::{
18 self,
19 BlockAt,
20 },
21 canonical_height,
22 compression_storage,
23 configuration,
24 },
25};
26use fuel_core_storage::transactional::{
27 AtomicView,
28 HistoricalView,
29};
30use fuel_core_types::{
31 blockchain::block::Block,
32 fuel_types::ChainId,
33 services::block_importer::SharedImportResult,
34};
35
36pub struct CompressionBlockDBAdapter {
38 block_importer: BlockImporterAdapter,
39 db: Database<OnChain>,
40}
41
42impl CompressionBlockDBAdapter {
43 pub fn new(block_importer: BlockImporterAdapter, db: Database<OnChain>) -> Self {
44 Self { block_importer, db }
45 }
46}
47
48impl From<BlockAt> for import_result_provider::BlockAt {
49 fn from(value: BlockAt) -> Self {
50 match value {
51 BlockAt::Genesis => Self::Genesis,
52 BlockAt::Specific(h) => Self::Specific(h.into()),
53 }
54 }
55}
56
57impl block_source::BlockSource for CompressionBlockDBAdapter {
58 fn subscribe(&self) -> fuel_core_services::stream::BoxStream<SharedImportResult> {
59 self.block_importer.events_shared_result()
60 }
61
62 fn get_block(&self, height: BlockAt) -> anyhow::Result<Block> {
63 let latest_view = self.db.latest_view()?;
64 let height = match height {
65 BlockAt::Genesis => latest_view.genesis_height()?.ok_or_else(|| {
66 anyhow::anyhow!("Genesis block not found in the database")
67 })?,
68 BlockAt::Specific(h) => h.into(),
69 };
70 let block = latest_view
71 .get_sealed_block_by_height(&height)?
72 .map(|sealed_block| sealed_block.entity)
73 .ok_or(anyhow::anyhow!("Block not found at height: {:?}", height))?;
74 Ok(block)
75 }
76}
77
78impl configuration::CompressionConfigProvider
79 for crate::service::config::DaCompressionConfig
80{
81 fn config(&self, chain_id: ChainId) -> config::CompressionConfig {
82 config::CompressionConfig::new(
83 self.retention_duration,
84 self.starting_height,
85 self.metrics,
86 chain_id,
87 )
88 }
89}
90
91impl compression_storage::LatestHeight for Database<CompressionDatabase> {
92 fn latest_height(&self) -> Option<u32> {
93 HistoricalView::latest_height(self).map(Into::into)
94 }
95}
96
97impl canonical_height::CanonicalHeight for Database<OnChain> {
98 fn get(&self) -> Option<u32> {
99 HistoricalView::latest_height(self).map(Into::into)
100 }
101}
102
103pub struct CompressionServiceAdapter {
104 db: Database<CompressionDatabase>,
105}
106
107impl CompressionServiceAdapter {
108 pub fn new(db: Database<CompressionDatabase>) -> Self {
109 Self { db }
110 }
111
112 pub fn storage(&self) -> &Database<CompressionDatabase> {
113 &self.db
114 }
115}