Skip to main content

exoware_qmdb/
keyless.rs

1use std::marker::PhantomData;
2
3use commonware_codec::{Codec, Decode, Encode};
4use commonware_cryptography::Hasher;
5use commonware_storage::{mmr::Location, qmdb::keyless::Operation as KeylessOperation};
6use exoware_sdk::{SerializableReadSession, StoreClient};
7
8use crate::auth::AuthenticatedBackendNamespace;
9use crate::auth::{
10    compute_auth_root, load_auth_operation_at, load_auth_operation_bytes_range,
11    read_latest_auth_watermark, require_published_auth_watermark,
12};
13use crate::codec::mmr_size_for_watermark;
14use crate::connect::OperationKv;
15use crate::core::retry_transient_post_ingest_query;
16use crate::error::QmdbError;
17use crate::proof::{OperationRangeCheckpoint, RawBatchMultiProof, VerifiedOperationRange};
18use crate::storage::AuthKvMmrStorage;
19
20#[derive(Clone, Debug)]
21pub struct KeylessClient<H: Hasher, V: Codec + Send + Sync> {
22    client: StoreClient,
23    value_cfg: V::Cfg,
24    _marker: PhantomData<H>,
25}
26
27impl<H, V> KeylessClient<H, V>
28where
29    H: Hasher,
30    V: Codec + Clone + Send + Sync,
31    V::Cfg: Clone,
32    KeylessOperation<V>: Encode + Decode<Cfg = V::Cfg> + Clone,
33{
34    pub fn new(url: &str, value_cfg: V::Cfg) -> Self {
35        Self::from_client(StoreClient::new(url), value_cfg)
36    }
37
38    pub fn from_client(client: StoreClient, value_cfg: V::Cfg) -> Self {
39        Self {
40            client,
41            value_cfg,
42            _marker: PhantomData,
43        }
44    }
45
46    pub(crate) fn store_client(&self) -> &StoreClient {
47        &self.client
48    }
49
50    pub(crate) fn extract_operation_kv(
51        &self,
52        location: Location,
53        bytes: &[u8],
54    ) -> Result<OperationKv, QmdbError>
55    where
56        V: AsRef<[u8]>,
57    {
58        let op = KeylessOperation::<V>::decode_cfg(bytes, &self.value_cfg).map_err(|e| {
59            QmdbError::CorruptData(format!(
60                "failed to decode keyless operation at location {location}: {e}"
61            ))
62        })?;
63        let value = match &op {
64            KeylessOperation::Append(value) => Some(value.as_ref().to_vec()),
65            KeylessOperation::Commit(Some(value)) => Some(value.as_ref().to_vec()),
66            KeylessOperation::Commit(None) => None,
67        };
68        Ok((None, value))
69    }
70
71    pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
72        retry_transient_post_ingest_query(|| {
73            let session = self.client.create_session();
74            async move {
75                read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Keyless).await
76            }
77        })
78        .await
79    }
80
81    pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
82        let namespace = AuthenticatedBackendNamespace::Keyless;
83        let session = self.client.create_session();
84        require_published_auth_watermark(&session, namespace, watermark).await?;
85        compute_auth_root::<H>(&session, namespace, watermark).await
86    }
87
88    pub async fn get_at(
89        &self,
90        location: Location,
91        watermark: Location,
92    ) -> Result<Option<V>, QmdbError> {
93        let namespace = AuthenticatedBackendNamespace::Keyless;
94        let session = self.client.create_session();
95        require_published_auth_watermark(&session, namespace, watermark).await?;
96        let count = watermark
97            .checked_add(1)
98            .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
99        if location >= count {
100            return Err(QmdbError::RangeStartOutOfBounds {
101                start: location,
102                count,
103            });
104        }
105        let operation = load_auth_operation_at::<KeylessOperation<V>>(
106            &session,
107            namespace,
108            location,
109            &self.value_cfg,
110        )
111        .await?;
112        Ok(operation.into_value())
113    }
114
115    pub async fn operation_range_checkpoint(
116        &self,
117        watermark: Location,
118        start_location: Location,
119        max_locations: u32,
120    ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
121        let session = self.client.create_session();
122        self.operation_range_checkpoint_in_session(
123            &session,
124            watermark,
125            start_location,
126            max_locations,
127        )
128        .await
129    }
130
131    pub(crate) async fn batch_multi_proof_with_read_floor(
132        &self,
133        read_floor_sequence: u64,
134        watermark: Location,
135        operations: Vec<(Location, Vec<u8>)>,
136    ) -> Result<RawBatchMultiProof<H::Digest>, QmdbError> {
137        let namespace = AuthenticatedBackendNamespace::Keyless;
138        let session = self
139            .client
140            .create_session_with_sequence(read_floor_sequence);
141        require_published_auth_watermark(&session, namespace, watermark).await?;
142        let storage = AuthKvMmrStorage {
143            session: &session,
144            namespace,
145            mmr_size: mmr_size_for_watermark(watermark)?,
146            _marker: PhantomData::<H::Digest>,
147        };
148        let root = compute_auth_root::<H>(&session, namespace, watermark).await?;
149        crate::proof::build_batch_multi_proof::<H, _>(&storage, watermark, root, operations).await
150    }
151
152    async fn operation_range_checkpoint_in_session(
153        &self,
154        session: &SerializableReadSession,
155        watermark: Location,
156        start_location: Location,
157        max_locations: u32,
158    ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
159        let namespace = AuthenticatedBackendNamespace::Keyless;
160        require_published_auth_watermark(session, namespace, watermark).await?;
161        let end = crate::proof::resolve_range_bounds(watermark, start_location, max_locations)?;
162        let storage = AuthKvMmrStorage {
163            session,
164            namespace,
165            mmr_size: mmr_size_for_watermark(watermark)?,
166            _marker: PhantomData::<H::Digest>,
167        };
168        let root = compute_auth_root::<H>(session, namespace, watermark).await?;
169        let encoded_operations =
170            load_auth_operation_bytes_range(session, namespace, start_location, end).await?;
171        crate::proof::build_operation_range_checkpoint::<H, _>(
172            &storage,
173            watermark,
174            start_location,
175            end,
176            root,
177            encoded_operations,
178        )
179        .await
180    }
181
182    /// Verified contiguous range of operations.
183    pub async fn operation_range_proof(
184        &self,
185        watermark: Location,
186        start_location: Location,
187        max_locations: u32,
188    ) -> Result<VerifiedOperationRange<H::Digest, KeylessOperation<V>>, QmdbError> {
189        let checkpoint = self
190            .operation_range_checkpoint(watermark, start_location, max_locations)
191            .await?;
192        let operations = checkpoint
193            .encoded_operations
194            .iter()
195            .enumerate()
196            .map(|(offset, bytes)| {
197                let location = checkpoint.start_location + offset as u64;
198                KeylessOperation::<V>::decode_cfg(bytes.as_slice(), &self.value_cfg).map_err(|e| {
199                    QmdbError::CorruptData(format!(
200                        "failed to decode authenticated operation at location {location}: {e}"
201                    ))
202                })
203            })
204            .collect::<Result<Vec<_>, _>>()?;
205        Ok(VerifiedOperationRange {
206            root: checkpoint.root,
207            start_location: checkpoint.start_location,
208            operations,
209        })
210    }
211}