fuel_core_compression/
compress.rs

1use crate::{
2    config::Config,
3    eviction_policy::CacheEvictor,
4    ports::{
5        EvictorDb,
6        TemporalRegistry,
7        UtxoIdToPointer,
8    },
9    registry::{
10        EvictorDbAll,
11        PerRegistryKeyspace,
12        RegistrationsPerTable,
13        TemporalRegistryAll,
14    },
15    VersionedCompressedBlock,
16};
17use anyhow::Context;
18use fuel_core_types::{
19    blockchain::block::Block,
20    fuel_compression::{
21        CompressibleBy,
22        ContextError,
23        RegistryKey,
24    },
25    fuel_tx::{
26        input::PredicateCode,
27        CompressedUtxoId,
28        ScriptCode,
29        TxPointer,
30        UtxoId,
31    },
32    fuel_types::{
33        Address,
34        AssetId,
35        ContractId,
36    },
37    tai64::Tai64,
38};
39use std::collections::{
40    HashMap,
41    HashSet,
42};
43
44pub trait CompressDb: TemporalRegistryAll + EvictorDbAll + UtxoIdToPointer {}
45impl<T> CompressDb for T where T: TemporalRegistryAll + EvictorDbAll + UtxoIdToPointer {}
46
47/// This must be called for all new blocks in sequence, otherwise the result will be garbage, since
48/// the registry is valid for only the current block height. On any other height you could be
49/// referring to keys that have already been overwritten, or have not been written to yet.
50pub async fn compress<D>(
51    config: Config,
52    mut db: D,
53    block: &Block,
54) -> anyhow::Result<VersionedCompressedBlock>
55where
56    D: CompressDb,
57{
58    let target = block.transactions_vec();
59
60    let mut prepare_ctx = PrepareCtx {
61        config,
62        timestamp: block.header().time(),
63        db: &mut db,
64        accessed_keys: Default::default(),
65    };
66    let _ = target.compress_with(&mut prepare_ctx).await?;
67
68    let mut ctx = prepare_ctx.into_compression_context()?;
69    let transactions = target.compress_with(&mut ctx).await?;
70    let registrations: RegistrationsPerTable = ctx.finalize()?;
71
72    Ok(VersionedCompressedBlock::new(
73        block.header(),
74        registrations,
75        transactions,
76    ))
77}
78
79/// Preparation pass through the block to collect all keys accessed during compression.
80/// Returns dummy values. The resulting "compressed block" should be discarded.
81struct PrepareCtx<D> {
82    config: Config,
83    /// Current timestamp
84    timestamp: Tai64,
85    /// Database handle
86    db: D,
87    /// Keys accessed during the compression.
88    accessed_keys: PerRegistryKeyspace<HashSet<RegistryKey>>,
89}
90
91impl<D> ContextError for PrepareCtx<D> {
92    type Error = anyhow::Error;
93}
94
95impl<D> CompressibleBy<PrepareCtx<D>> for UtxoId
96where
97    D: CompressDb,
98{
99    async fn compress_with(
100        &self,
101        _ctx: &mut PrepareCtx<D>,
102    ) -> anyhow::Result<CompressedUtxoId> {
103        Ok(CompressedUtxoId {
104            tx_pointer: TxPointer::default(),
105            output_index: 0,
106        })
107    }
108}
109
110#[derive(Debug)]
111struct CompressCtxKeyspace<T> {
112    /// Cache evictor state for this keyspace
113    cache_evictor: CacheEvictor<T>,
114    /// Changes to the temporary registry, to be included in the compressed block header
115    changes: HashMap<RegistryKey, T>,
116    /// Reverse lookup into changes
117    changes_lookup: HashMap<T, RegistryKey>,
118}
119
120macro_rules! compression {
121    ($($ident:ty: $type:ty),*) => { paste::paste! {
122        pub struct CompressCtx<D> {
123            config: Config,
124            timestamp: Tai64,
125            db: D,
126            $($ident: CompressCtxKeyspace<$type>,)*
127        }
128
129        impl<D> PrepareCtx<D> where D: CompressDb {
130            /// Converts the preparation context into a [`CompressCtx`]
131            /// keeping accessed keys to avoid its eviction during compression.
132            /// Initializes the cache evictors from the database, which may fail.
133            pub fn into_compression_context(mut self) -> anyhow::Result<CompressCtx<D>> {
134                Ok(CompressCtx {
135                    $(
136                        $ident: CompressCtxKeyspace {
137                            changes: Default::default(),
138                            changes_lookup: Default::default(),
139                            cache_evictor: CacheEvictor::new_from_db(&mut self.db, self.accessed_keys.$ident.into())?,
140                        },
141                    )*
142                    config: self.config,
143                    timestamp: self.timestamp,
144                    db: self.db,
145                })
146            }
147        }
148
149        impl<D> CompressCtx<D> where D: CompressDb {
150            /// Finalizes the compression context, returning the changes to the registry.
151            /// Commits the registrations and cache evictor states to the database.
152            fn finalize(mut self) -> anyhow::Result<RegistrationsPerTable> {
153                let mut registrations = RegistrationsPerTable::default();
154                $(
155                    self.$ident.cache_evictor.commit(&mut self.db)?;
156                    for (key, value) in self.$ident.changes.into_iter() {
157                        registrations.$ident.push((key, value));
158                    }
159                )*
160                registrations.write_to_registry(&mut self.db, self.timestamp)?;
161                Ok(registrations)
162            }
163        }
164
165        $(
166            impl<D> CompressibleBy<PrepareCtx<D>> for $type
167            where
168                D: TemporalRegistry<$type> + EvictorDb<$type>
169            {
170                async fn compress_with(
171                    &self,
172                    ctx: &mut PrepareCtx<D>,
173                ) -> anyhow::Result<RegistryKey> {
174                    if *self == <$type>::default() {
175                        return Ok(RegistryKey::ZERO);
176                    }
177                    if let Some(found) = ctx.db.registry_index_lookup(self)? {
178                        if !ctx.accessed_keys.$ident.contains(&found) {
179                            let key_timestamp = ctx.db.read_timestamp(&found)
180                                .context("Database invariant violated: no timestamp stored but key found")?;
181                            if ctx.config.is_timestamp_accessible(ctx.timestamp, key_timestamp)? {
182                                ctx.accessed_keys.$ident.insert(found);
183                            }
184                        }
185                    }
186                    Ok(RegistryKey::ZERO)
187                }
188            }
189
190            impl<D> CompressibleBy<CompressCtx<D>> for $type
191            where
192                D: TemporalRegistry<$type> + EvictorDb<$type>
193            {
194                async fn compress_with(
195                    &self,
196                    ctx: &mut CompressCtx<D>,
197                ) -> anyhow::Result<RegistryKey> {
198                    if self == &Default::default() {
199                        return Ok(RegistryKey::DEFAULT_VALUE);
200                    }
201                    if let Some(found) = ctx.$ident.changes_lookup.get(self) {
202                        return Ok(*found);
203                    }
204                    if let Some(found) = ctx.db.registry_index_lookup(self)? {
205                        let key_timestamp = ctx.db.read_timestamp(&found)
206                            .context("Database invariant violated: no timestamp stored but key found")?;
207                        if ctx.config.is_timestamp_accessible(ctx.timestamp, key_timestamp)? {
208                            return Ok(found);
209                        }
210                    }
211
212                    let key = ctx.$ident.cache_evictor.next_key();
213                    let old = ctx.$ident.changes.insert(key, self.clone());
214                    let old_rev = ctx.$ident.changes_lookup.insert(self.clone(), key);
215                    debug_assert!(old.is_none(), "Key collision in registry substitution");
216                    debug_assert!(old_rev.is_none(), "Key collision in registry substitution");
217                    Ok(key)
218                }
219            }
220        )*
221    }};
222}
223
224compression!(
225    address: Address,
226    asset_id: AssetId,
227    contract_id: ContractId,
228    script_code: ScriptCode,
229    predicate_code: PredicateCode
230);
231
232impl<D> ContextError for CompressCtx<D> {
233    type Error = anyhow::Error;
234}
235
236impl<D> CompressibleBy<CompressCtx<D>> for UtxoId
237where
238    D: CompressDb,
239{
240    async fn compress_with(
241        &self,
242        ctx: &mut CompressCtx<D>,
243    ) -> anyhow::Result<CompressedUtxoId> {
244        ctx.db.lookup(*self)
245    }
246}