Skip to main content

exoware_qmdb/
immutable.rs

1use std::marker::PhantomData;
2
3use commonware_codec::{Codec, Decode, Encode, Read as CodecRead};
4use commonware_cryptography::Hasher;
5use commonware_storage::{mmr::Location, qmdb::immutable::Operation as ImmutableOperation};
6use commonware_utils::Array;
7use exoware_sdk::{SerializableReadSession, StoreClient};
8
9use crate::auth::AuthenticatedBackendNamespace;
10use crate::auth::{
11    compute_auth_root, decode_auth_immutable_update_location, load_auth_operation_at,
12    load_auth_operation_bytes_range, load_latest_auth_immutable_update_row,
13    read_latest_auth_watermark, require_published_auth_watermark,
14};
15use crate::codec::{mmr_size_for_watermark, UpdateRow};
16use crate::connect::OperationKv;
17use crate::core::retry_transient_post_ingest_query;
18use crate::error::QmdbError;
19use crate::proof::{OperationRangeCheckpoint, RawBatchMultiProof, VerifiedOperationRange};
20use crate::storage::AuthKvMmrStorage;
21use crate::VersionedValue;
22
23#[derive(Clone, Debug)]
24pub struct ImmutableClient<H: Hasher, K: AsRef<[u8]> + Codec, V: Codec + Send + Sync> {
25    client: StoreClient,
26    value_cfg: V::Cfg,
27    update_row_cfg: (K::Cfg, V::Cfg),
28    _marker: PhantomData<(H, K)>,
29}
30
31impl<H, K, V> ImmutableClient<H, K, V>
32where
33    H: Hasher,
34    K: Array + Codec + Clone + AsRef<[u8]>,
35    V: Codec + Clone + Send + Sync,
36    V::Cfg: Clone,
37    K::Cfg: Clone,
38    ImmutableOperation<K, V>: Encode + Decode<Cfg = V::Cfg> + Clone,
39{
40    pub fn new(url: &str, value_cfg: V::Cfg, update_row_cfg: (K::Cfg, V::Cfg)) -> Self {
41        Self::from_client(StoreClient::new(url), value_cfg, update_row_cfg)
42    }
43
44    pub fn from_client(
45        client: StoreClient,
46        value_cfg: V::Cfg,
47        update_row_cfg: (K::Cfg, V::Cfg),
48    ) -> Self {
49        Self {
50            client,
51            value_cfg,
52            update_row_cfg,
53            _marker: PhantomData,
54        }
55    }
56
57    pub(crate) fn store_client(&self) -> &StoreClient {
58        &self.client
59    }
60
61    pub(crate) fn extract_operation_kv(
62        &self,
63        location: Location,
64        bytes: &[u8],
65    ) -> Result<OperationKv, QmdbError>
66    where
67        V: AsRef<[u8]>,
68    {
69        let op = ImmutableOperation::<K, V>::decode_cfg(bytes, &self.value_cfg).map_err(|e| {
70            QmdbError::CorruptData(format!(
71                "failed to decode immutable operation at location {location}: {e}"
72            ))
73        })?;
74        let key = op.key().map(|k| <K as AsRef<[u8]>>::as_ref(k).to_vec());
75        let value = match &op {
76            ImmutableOperation::Set(_, value) => Some(value.as_ref().to_vec()),
77            ImmutableOperation::Commit(Some(value)) => Some(value.as_ref().to_vec()),
78            ImmutableOperation::Commit(None) => None,
79        };
80        Ok((key, value))
81    }
82
83    pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
84        retry_transient_post_ingest_query(|| {
85            let session = self.client.create_session();
86            async move {
87                read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Immutable).await
88            }
89        })
90        .await
91    }
92
93    pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
94        let namespace = AuthenticatedBackendNamespace::Immutable;
95        let session = self.client.create_session();
96        require_published_auth_watermark(&session, namespace, watermark).await?;
97        compute_auth_root::<H>(&session, namespace, watermark).await
98    }
99
100    pub async fn get_at(
101        &self,
102        key: &K,
103        watermark: Location,
104    ) -> Result<Option<VersionedValue<K, V>>, QmdbError> {
105        let namespace = AuthenticatedBackendNamespace::Immutable;
106        let session = self.client.create_session();
107        require_published_auth_watermark(&session, namespace, watermark).await?;
108        let Some((row_key, row_value)) =
109            load_latest_auth_immutable_update_row(&session, watermark, key.as_ref()).await?
110        else {
111            return Ok(None);
112        };
113        let location = decode_auth_immutable_update_location(&row_key)?;
114        let decoded =
115            <UpdateRow<K, V> as CodecRead>::read_cfg(&mut row_value.as_ref(), &self.update_row_cfg)
116                .map_err(|e| QmdbError::CorruptData(format!("update row decode: {e}")))?;
117        if <K as AsRef<[u8]>>::as_ref(&decoded.key) != key.as_ref() {
118            return Err(QmdbError::CorruptData(format!(
119                "authenticated immutable update row key mismatch at location {location}"
120            )));
121        }
122        let operation = load_auth_operation_at::<ImmutableOperation<K, V>>(
123            &session,
124            namespace,
125            location,
126            &self.value_cfg,
127        )
128        .await?;
129        match operation {
130            ImmutableOperation::Set(operation_key, value) if operation_key == *key => {
131                Ok(Some(VersionedValue {
132                    key: operation_key,
133                    location,
134                    value: Some(value),
135                }))
136            }
137            ImmutableOperation::Set(_, _) => Err(QmdbError::CorruptData(format!(
138                "authenticated immutable update row does not match operation key at location {location}"
139            ))),
140            ImmutableOperation::Commit(_) => Err(QmdbError::CorruptData(format!(
141                "authenticated immutable update row points at commit location {location}"
142            ))),
143        }
144    }
145
146    pub async fn operation_range_checkpoint(
147        &self,
148        watermark: Location,
149        start_location: Location,
150        max_locations: u32,
151    ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
152        let session = self.client.create_session();
153        self.operation_range_checkpoint_in_session(
154            &session,
155            watermark,
156            start_location,
157            max_locations,
158        )
159        .await
160    }
161
162    pub(crate) async fn batch_multi_proof_with_read_floor(
163        &self,
164        read_floor_sequence: u64,
165        watermark: Location,
166        operations: Vec<(Location, Vec<u8>)>,
167    ) -> Result<RawBatchMultiProof<H::Digest>, QmdbError> {
168        let namespace = AuthenticatedBackendNamespace::Immutable;
169        let session = self
170            .client
171            .create_session_with_sequence(read_floor_sequence);
172        require_published_auth_watermark(&session, namespace, watermark).await?;
173        let storage = AuthKvMmrStorage {
174            session: &session,
175            namespace,
176            mmr_size: mmr_size_for_watermark(watermark)?,
177            _marker: PhantomData::<H::Digest>,
178        };
179        let root = compute_auth_root::<H>(&session, namespace, watermark).await?;
180        crate::proof::build_batch_multi_proof::<H, _>(&storage, watermark, root, operations).await
181    }
182
183    async fn operation_range_checkpoint_in_session(
184        &self,
185        session: &SerializableReadSession,
186        watermark: Location,
187        start_location: Location,
188        max_locations: u32,
189    ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
190        let namespace = AuthenticatedBackendNamespace::Immutable;
191        require_published_auth_watermark(session, namespace, watermark).await?;
192        let end = crate::proof::resolve_range_bounds(watermark, start_location, max_locations)?;
193        let storage = AuthKvMmrStorage {
194            session,
195            namespace,
196            mmr_size: mmr_size_for_watermark(watermark)?,
197            _marker: PhantomData::<H::Digest>,
198        };
199        let root = compute_auth_root::<H>(session, namespace, watermark).await?;
200        let encoded_operations =
201            load_auth_operation_bytes_range(session, namespace, start_location, end).await?;
202        crate::proof::build_operation_range_checkpoint::<H, _>(
203            &storage,
204            watermark,
205            start_location,
206            end,
207            root,
208            encoded_operations,
209        )
210        .await
211    }
212
213    /// Verified contiguous range of operations.
214    pub async fn operation_range_proof(
215        &self,
216        watermark: Location,
217        start_location: Location,
218        max_locations: u32,
219    ) -> Result<VerifiedOperationRange<H::Digest, ImmutableOperation<K, V>>, QmdbError> {
220        let checkpoint = self
221            .operation_range_checkpoint(watermark, start_location, max_locations)
222            .await?;
223        let operations = checkpoint
224            .encoded_operations
225            .iter()
226            .enumerate()
227            .map(|(offset, bytes)| {
228                let location = checkpoint.start_location + offset as u64;
229                ImmutableOperation::<K, V>::decode_cfg(bytes.as_slice(), &self.value_cfg).map_err(
230                    |e| {
231                        QmdbError::CorruptData(format!(
232                            "failed to decode authenticated operation at location {location}: {e}"
233                        ))
234                    },
235                )
236            })
237            .collect::<Result<Vec<_>, _>>()?;
238        Ok(VerifiedOperationRange {
239            root: checkpoint.root,
240            start_location: checkpoint.start_location,
241            operations,
242        })
243    }
244}