Skip to main content

store_qmdb/
keyless.rs

1use std::marker::PhantomData;
2use std::sync::{atomic::AtomicU64, Arc};
3
4use commonware_codec::{Codec, Decode, Encode};
5use commonware_cryptography::Hasher;
6use commonware_storage::{
7    mmr::{verification, Location, Position},
8    qmdb::keyless::Operation as KeylessOperation,
9};
10use exoware_sdk_rs::StoreClient;
11
12use crate::auth::AuthenticatedBackendNamespace;
13use crate::auth::{
14    append_auth_nodes_incrementally, build_auth_upload_rows, compute_auth_root,
15    encode_auth_presence_key, encode_auth_watermark_key, load_auth_operation_at,
16    load_auth_operation_bytes_range, load_auth_operation_range, read_latest_auth_watermark,
17    require_auth_uploaded_boundary, require_published_auth_watermark,
18};
19use crate::codec::{ensure_encoded_value_size, mmr_size_for_watermark};
20use crate::core::{retry_transient_post_ingest_query, wait_until_query_visible_sequence};
21use crate::error::QmdbError;
22use crate::proof::AuthenticatedOperationRangeProof;
23use crate::storage::AuthKvMmrStorage;
24use crate::UploadReceipt;
25
26#[derive(Clone, Debug)]
27pub struct KeylessClient<H: Hasher, V: Codec + Send + Sync> {
28    client: StoreClient,
29    value_cfg: V::Cfg,
30    query_visible_sequence: Option<Arc<AtomicU64>>,
31    _marker: PhantomData<H>,
32}
33
34impl<H, V> KeylessClient<H, V>
35where
36    H: Hasher,
37    V: Codec + Clone + Send + Sync,
38    V::Cfg: Clone,
39    KeylessOperation<V>: Encode + Decode<Cfg = V::Cfg> + Clone,
40{
41    pub fn new(url: &str, value_cfg: V::Cfg) -> Self {
42        Self::from_client(StoreClient::new(url), value_cfg)
43    }
44
45    pub fn from_client(client: StoreClient, value_cfg: V::Cfg) -> Self {
46        Self {
47            client,
48            value_cfg,
49            query_visible_sequence: None,
50            _marker: PhantomData,
51        }
52    }
53
54    pub fn with_query_visible_sequence(mut self, seq: Arc<AtomicU64>) -> Self {
55        self.query_visible_sequence = Some(seq);
56        self
57    }
58
59    pub fn inner(&self) -> &StoreClient {
60        &self.client
61    }
62
63    pub fn sequence_number(&self) -> u64 {
64        self.client.sequence_number()
65    }
66
67    async fn sync_after_ingest(&self) -> Result<(), QmdbError> {
68        let token = self.client.sequence_number();
69        wait_until_query_visible_sequence(self.query_visible_sequence.as_ref(), token).await
70    }
71
72    pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
73        retry_transient_post_ingest_query(|| {
74            let session = self.client.create_session();
75            async move {
76                read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Keyless).await
77            }
78        })
79        .await
80    }
81
82    pub async fn upload_operations(
83        &self,
84        latest_location: Location,
85        operations: &[KeylessOperation<V>],
86    ) -> Result<UploadReceipt, QmdbError> {
87        if operations.is_empty() {
88            return Err(QmdbError::EmptyBatch);
89        }
90        let namespace = AuthenticatedBackendNamespace::Keyless;
91        if self
92            .client
93            .get(&encode_auth_presence_key(namespace, latest_location))
94            .await?
95            .is_some()
96        {
97            return Err(QmdbError::DuplicateBatchWatermark { latest_location });
98        }
99        let encoded = operations
100            .iter()
101            .map(|operation| {
102                let bytes = operation.encode().to_vec();
103                ensure_encoded_value_size(bytes.len())?;
104                Ok(bytes)
105            })
106            .collect::<Result<Vec<_>, QmdbError>>()?;
107        let (_, rows) = build_auth_upload_rows(namespace, latest_location, &encoded)?;
108        let refs = rows
109            .iter()
110            .map(|(key, value)| (key, value.as_slice()))
111            .collect::<Vec<_>>();
112        self.client.put(&refs).await?;
113        self.sync_after_ingest().await?;
114
115        Ok(UploadReceipt {
116            latest_location,
117            operation_count: Location::new(operations.len() as u64),
118            keyed_operation_count: 0,
119            writer_location_watermark: self.writer_location_watermark().await?,
120            sequence_number: self.client.sequence_number(),
121        })
122    }
123
124    pub async fn publish_writer_location_watermark(
125        &self,
126        location: Location,
127    ) -> Result<Location, QmdbError> {
128        let namespace = AuthenticatedBackendNamespace::Keyless;
129        let session = self.client.create_session();
130        let latest = read_latest_auth_watermark(&session, namespace).await?;
131        if let Some(watermark) = latest {
132            if watermark >= location {
133                return Ok(watermark);
134            }
135        }
136        require_auth_uploaded_boundary(&session, namespace, location).await?;
137        let previous_ops_size = match latest {
138            Some(previous) => mmr_size_for_watermark(previous)?,
139            None => Position::new(0),
140        };
141        let delta_start = latest.map_or(Location::new(0), |watermark| watermark + 1);
142        let end_exclusive = location
143            .checked_add(1)
144            .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
145        let encoded =
146            load_auth_operation_bytes_range(&session, namespace, delta_start, end_exclusive)
147                .await?;
148        let mut rows = Vec::new();
149        append_auth_nodes_incrementally::<H>(
150            &session,
151            namespace,
152            previous_ops_size,
153            &encoded,
154            &mut rows,
155        )
156        .await?;
157        rows.push((encode_auth_watermark_key(namespace, location), Vec::new()));
158        let refs = rows
159            .iter()
160            .map(|(key, value)| (key, value.as_slice()))
161            .collect::<Vec<_>>();
162        self.client.put(&refs).await?;
163        self.sync_after_ingest().await?;
164        let visible = self.writer_location_watermark().await?;
165        if visible < Some(location) {
166            return Err(QmdbError::CorruptData(format!(
167                "keyless watermark publish did not become query-visible: requested={location}, visible={visible:?}"
168            )));
169        }
170        Ok(location)
171    }
172
173    pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
174        let namespace = AuthenticatedBackendNamespace::Keyless;
175        let session = self.client.create_session();
176        require_published_auth_watermark(&session, namespace, watermark).await?;
177        compute_auth_root::<H>(&session, namespace, watermark).await
178    }
179
180    pub async fn get_at(
181        &self,
182        location: Location,
183        watermark: Location,
184    ) -> Result<Option<V>, QmdbError> {
185        let namespace = AuthenticatedBackendNamespace::Keyless;
186        let session = self.client.create_session();
187        require_published_auth_watermark(&session, namespace, watermark).await?;
188        let count = watermark
189            .checked_add(1)
190            .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
191        if location >= count {
192            return Err(QmdbError::RangeStartOutOfBounds {
193                start: location,
194                count,
195            });
196        }
197        let operation = load_auth_operation_at::<KeylessOperation<V>>(
198            &session,
199            namespace,
200            location,
201            &self.value_cfg,
202        )
203        .await?;
204        Ok(operation.into_value())
205    }
206
207    pub async fn operation_range_proof(
208        &self,
209        watermark: Location,
210        start_location: Location,
211        max_locations: u32,
212    ) -> Result<AuthenticatedOperationRangeProof<H::Digest, KeylessOperation<V>>, QmdbError> {
213        if max_locations == 0 {
214            return Err(QmdbError::InvalidRangeLength);
215        }
216        let namespace = AuthenticatedBackendNamespace::Keyless;
217        let session = self.client.create_session();
218        require_published_auth_watermark(&session, namespace, watermark).await?;
219        let count = watermark
220            .checked_add(1)
221            .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
222        if start_location >= count {
223            return Err(QmdbError::RangeStartOutOfBounds {
224                start: start_location,
225                count,
226            });
227        }
228        let end = start_location
229            .saturating_add(max_locations as u64)
230            .min(count);
231        let storage = AuthKvMmrStorage {
232            session: &session,
233            namespace,
234            mmr_size: mmr_size_for_watermark(watermark)?,
235            _marker: PhantomData::<H::Digest>,
236        };
237        let proof = verification::range_proof(&storage, start_location..end)
238            .await
239            .map_err(|e| QmdbError::CommonwareMmr(e.to_string()))?;
240        Ok(AuthenticatedOperationRangeProof {
241            watermark,
242            root: compute_auth_root::<H>(&session, namespace, watermark).await?,
243            start_location,
244            proof,
245            operations: load_auth_operation_range::<KeylessOperation<V>>(
246                &session,
247                namespace,
248                start_location,
249                end,
250                &self.value_cfg,
251            )
252            .await?,
253        })
254    }
255}