Skip to main content

store_qmdb/
immutable.rs

1use std::marker::PhantomData;
2use std::sync::{atomic::AtomicU64, Arc};
3
4use commonware_codec::{Codec, Decode, Encode, Read as CodecRead};
5use commonware_cryptography::Hasher;
6use commonware_storage::{
7    mmr::{verification, Location, Position},
8    qmdb::immutable::Operation as ImmutableOperation,
9};
10use commonware_utils::Array;
11use exoware_sdk_rs::StoreClient;
12
13use crate::auth::AuthenticatedBackendNamespace;
14use crate::auth::{
15    append_auth_nodes_incrementally, build_auth_immutable_upload_rows, compute_auth_root,
16    decode_auth_immutable_update_location, encode_auth_presence_key, encode_auth_watermark_key,
17    load_auth_operation_at, load_auth_operation_range, load_latest_auth_immutable_update_row,
18    read_latest_auth_watermark, require_auth_uploaded_boundary, require_published_auth_watermark,
19};
20use crate::codec::{mmr_size_for_watermark, UpdateRow};
21use crate::core::{retry_transient_post_ingest_query, wait_until_query_visible_sequence};
22use crate::error::QmdbError;
23use crate::proof::AuthenticatedOperationRangeProof;
24use crate::storage::AuthKvMmrStorage;
25use crate::{UploadReceipt, VersionedValue};
26
27#[derive(Clone, Debug)]
28pub struct ImmutableClient<H: Hasher, K: AsRef<[u8]> + Codec, V: Codec + Send + Sync> {
29    client: StoreClient,
30    value_cfg: V::Cfg,
31    update_row_cfg: (K::Cfg, V::Cfg),
32    query_visible_sequence: Option<Arc<AtomicU64>>,
33    _marker: PhantomData<(H, K)>,
34}
35
36impl<H, K, V> ImmutableClient<H, K, V>
37where
38    H: Hasher,
39    K: Array + Codec + Clone + AsRef<[u8]>,
40    V: Codec + Clone + Send + Sync,
41    V::Cfg: Clone,
42    K::Cfg: Clone,
43    ImmutableOperation<K, V>: Encode + Decode<Cfg = V::Cfg> + Clone,
44{
45    pub fn new(url: &str, value_cfg: V::Cfg, update_row_cfg: (K::Cfg, V::Cfg)) -> Self {
46        Self::from_client(StoreClient::new(url), value_cfg, update_row_cfg)
47    }
48
49    pub fn from_client(
50        client: StoreClient,
51        value_cfg: V::Cfg,
52        update_row_cfg: (K::Cfg, V::Cfg),
53    ) -> Self {
54        Self {
55            client,
56            value_cfg,
57            update_row_cfg,
58            query_visible_sequence: None,
59            _marker: PhantomData,
60        }
61    }
62
63    pub fn with_query_visible_sequence(mut self, seq: Arc<AtomicU64>) -> Self {
64        self.query_visible_sequence = Some(seq);
65        self
66    }
67
68    pub fn inner(&self) -> &StoreClient {
69        &self.client
70    }
71
72    pub fn sequence_number(&self) -> u64 {
73        self.client.sequence_number()
74    }
75
76    async fn sync_after_ingest(&self) -> Result<(), QmdbError> {
77        let token = self.client.sequence_number();
78        wait_until_query_visible_sequence(self.query_visible_sequence.as_ref(), token).await
79    }
80
81    pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
82        retry_transient_post_ingest_query(|| {
83            let session = self.client.create_session();
84            async move {
85                read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Immutable).await
86            }
87        })
88        .await
89    }
90
91    pub async fn upload_operations(
92        &self,
93        latest_location: Location,
94        operations: &[ImmutableOperation<K, V>],
95    ) -> Result<UploadReceipt, QmdbError> {
96        if operations.is_empty() {
97            return Err(QmdbError::EmptyBatch);
98        }
99        let namespace = AuthenticatedBackendNamespace::Immutable;
100        if self
101            .client
102            .get(&encode_auth_presence_key(namespace, latest_location))
103            .await?
104            .is_some()
105        {
106            return Err(QmdbError::DuplicateBatchWatermark { latest_location });
107        }
108        let (keyed_operation_count, rows) =
109            build_auth_immutable_upload_rows(latest_location, operations)?;
110        let refs = rows
111            .iter()
112            .map(|(key, value)| (key, value.as_slice()))
113            .collect::<Vec<_>>();
114        self.client.put(&refs).await?;
115        self.sync_after_ingest().await?;
116        Ok(UploadReceipt {
117            latest_location,
118            operation_count: Location::new(operations.len() as u64),
119            keyed_operation_count,
120            writer_location_watermark: self.writer_location_watermark().await?,
121            sequence_number: self.client.sequence_number(),
122        })
123    }
124
125    pub async fn publish_writer_location_watermark(
126        &self,
127        location: Location,
128    ) -> Result<Location, QmdbError> {
129        let namespace = AuthenticatedBackendNamespace::Immutable;
130        let session = self.client.create_session();
131        let latest = read_latest_auth_watermark(&session, namespace).await?;
132        if let Some(watermark) = latest {
133            if watermark >= location {
134                return Ok(watermark);
135            }
136        }
137        require_auth_uploaded_boundary(&session, namespace, location).await?;
138        let previous_ops_size = match latest {
139            Some(previous) => mmr_size_for_watermark(previous)?,
140            None => Position::new(0),
141        };
142        let delta_start = latest.map_or(Location::new(0), |watermark| watermark + 1);
143        let end_exclusive = location
144            .checked_add(1)
145            .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
146        let encoded = crate::auth::load_auth_operation_bytes_range(
147            &session,
148            namespace,
149            delta_start,
150            end_exclusive,
151        )
152        .await?;
153        let mut rows = Vec::new();
154        append_auth_nodes_incrementally::<H>(
155            &session,
156            namespace,
157            previous_ops_size,
158            &encoded,
159            &mut rows,
160        )
161        .await?;
162        rows.push((encode_auth_watermark_key(namespace, location), Vec::new()));
163        let refs = rows
164            .iter()
165            .map(|(key, value)| (key, value.as_slice()))
166            .collect::<Vec<_>>();
167        self.client.put(&refs).await?;
168        self.sync_after_ingest().await?;
169        let visible = self.writer_location_watermark().await?;
170        if visible < Some(location) {
171            return Err(QmdbError::CorruptData(format!(
172                "immutable watermark publish did not become query-visible: requested={location}, visible={visible:?}"
173            )));
174        }
175        Ok(location)
176    }
177
178    pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
179        let namespace = AuthenticatedBackendNamespace::Immutable;
180        let session = self.client.create_session();
181        require_published_auth_watermark(&session, namespace, watermark).await?;
182        compute_auth_root::<H>(&session, namespace, watermark).await
183    }
184
185    pub async fn get_at(
186        &self,
187        key: &K,
188        watermark: Location,
189    ) -> Result<Option<VersionedValue<K, V>>, QmdbError> {
190        let namespace = AuthenticatedBackendNamespace::Immutable;
191        let session = self.client.create_session();
192        require_published_auth_watermark(&session, namespace, watermark).await?;
193        let Some((row_key, row_value)) =
194            load_latest_auth_immutable_update_row(&session, watermark, key.as_ref()).await?
195        else {
196            return Ok(None);
197        };
198        let location = decode_auth_immutable_update_location(&row_key)?;
199        let decoded =
200            <UpdateRow<K, V> as CodecRead>::read_cfg(&mut row_value.as_ref(), &self.update_row_cfg)
201                .map_err(|e| QmdbError::CorruptData(format!("update row decode: {e}")))?;
202        if <K as AsRef<[u8]>>::as_ref(&decoded.key) != key.as_ref() {
203            return Err(QmdbError::CorruptData(format!(
204                "authenticated immutable update row key mismatch at location {location}"
205            )));
206        }
207        let operation = load_auth_operation_at::<ImmutableOperation<K, V>>(
208            &session,
209            namespace,
210            location,
211            &self.value_cfg,
212        )
213        .await?;
214        match operation {
215            ImmutableOperation::Set(operation_key, value) if operation_key == *key => {
216                Ok(Some(VersionedValue {
217                    key: operation_key,
218                    location,
219                    value: Some(value),
220                }))
221            }
222            ImmutableOperation::Set(_, _) => Err(QmdbError::CorruptData(format!(
223                "authenticated immutable update row does not match operation key at location {location}"
224            ))),
225            ImmutableOperation::Commit(_) => Err(QmdbError::CorruptData(format!(
226                "authenticated immutable update row points at commit location {location}"
227            ))),
228        }
229    }
230
231    pub async fn operation_range_proof(
232        &self,
233        watermark: Location,
234        start_location: Location,
235        max_locations: u32,
236    ) -> Result<AuthenticatedOperationRangeProof<H::Digest, ImmutableOperation<K, V>>, QmdbError>
237    {
238        if max_locations == 0 {
239            return Err(QmdbError::InvalidRangeLength);
240        }
241        let namespace = AuthenticatedBackendNamespace::Immutable;
242        let session = self.client.create_session();
243        require_published_auth_watermark(&session, namespace, watermark).await?;
244        let count = watermark
245            .checked_add(1)
246            .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
247        if start_location >= count {
248            return Err(QmdbError::RangeStartOutOfBounds {
249                start: start_location,
250                count,
251            });
252        }
253        let end = start_location
254            .saturating_add(max_locations as u64)
255            .min(count);
256        let storage = AuthKvMmrStorage {
257            session: &session,
258            namespace,
259            mmr_size: mmr_size_for_watermark(watermark)?,
260            _marker: PhantomData::<H::Digest>,
261        };
262        let proof = verification::range_proof(&storage, start_location..end)
263            .await
264            .map_err(|e| QmdbError::CommonwareMmr(e.to_string()))?;
265        Ok(AuthenticatedOperationRangeProof {
266            watermark,
267            root: compute_auth_root::<H>(&session, namespace, watermark).await?,
268            start_location,
269            proof,
270            operations: load_auth_operation_range::<ImmutableOperation<K, V>>(
271                &session,
272                namespace,
273                start_location,
274                end,
275                &self.value_cfg,
276            )
277            .await?,
278        })
279    }
280}