1use std::marker::PhantomData;
2
3use commonware_codec::{Codec, Decode, Encode};
4use commonware_cryptography::Hasher;
5use commonware_storage::{mmr::Location, qmdb::keyless::Operation as KeylessOperation};
6use exoware_sdk::{SerializableReadSession, StoreClient};
7
8use crate::auth::AuthenticatedBackendNamespace;
9use crate::auth::{
10 compute_auth_root, load_auth_operation_at, load_auth_operation_bytes_range,
11 read_latest_auth_watermark, require_published_auth_watermark,
12};
13use crate::codec::mmr_size_for_watermark;
14use crate::connect::OperationKv;
15use crate::core::retry_transient_post_ingest_query;
16use crate::error::QmdbError;
17use crate::proof::{OperationRangeCheckpoint, RawBatchMultiProof, VerifiedOperationRange};
18use crate::storage::AuthKvMmrStorage;
19
20#[derive(Clone, Debug)]
21pub struct KeylessClient<H: Hasher, V: Codec + Send + Sync> {
22 client: StoreClient,
23 value_cfg: V::Cfg,
24 _marker: PhantomData<H>,
25}
26
27impl<H, V> KeylessClient<H, V>
28where
29 H: Hasher,
30 V: Codec + Clone + Send + Sync,
31 V::Cfg: Clone,
32 KeylessOperation<V>: Encode + Decode<Cfg = V::Cfg> + Clone,
33{
34 pub fn new(url: &str, value_cfg: V::Cfg) -> Self {
35 Self::from_client(StoreClient::new(url), value_cfg)
36 }
37
38 pub fn from_client(client: StoreClient, value_cfg: V::Cfg) -> Self {
39 Self {
40 client,
41 value_cfg,
42 _marker: PhantomData,
43 }
44 }
45
46 pub(crate) fn store_client(&self) -> &StoreClient {
47 &self.client
48 }
49
50 pub(crate) fn extract_operation_kv(
51 &self,
52 location: Location,
53 bytes: &[u8],
54 ) -> Result<OperationKv, QmdbError>
55 where
56 V: AsRef<[u8]>,
57 {
58 let op = KeylessOperation::<V>::decode_cfg(bytes, &self.value_cfg).map_err(|e| {
59 QmdbError::CorruptData(format!(
60 "failed to decode keyless operation at location {location}: {e}"
61 ))
62 })?;
63 let value = match &op {
64 KeylessOperation::Append(value) => Some(value.as_ref().to_vec()),
65 KeylessOperation::Commit(Some(value)) => Some(value.as_ref().to_vec()),
66 KeylessOperation::Commit(None) => None,
67 };
68 Ok((None, value))
69 }
70
71 pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
72 retry_transient_post_ingest_query(|| {
73 let session = self.client.create_session();
74 async move {
75 read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Keyless).await
76 }
77 })
78 .await
79 }
80
81 pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
82 let namespace = AuthenticatedBackendNamespace::Keyless;
83 let session = self.client.create_session();
84 require_published_auth_watermark(&session, namespace, watermark).await?;
85 compute_auth_root::<H>(&session, namespace, watermark).await
86 }
87
88 pub async fn get_at(
89 &self,
90 location: Location,
91 watermark: Location,
92 ) -> Result<Option<V>, QmdbError> {
93 let namespace = AuthenticatedBackendNamespace::Keyless;
94 let session = self.client.create_session();
95 require_published_auth_watermark(&session, namespace, watermark).await?;
96 let count = watermark
97 .checked_add(1)
98 .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
99 if location >= count {
100 return Err(QmdbError::RangeStartOutOfBounds {
101 start: location,
102 count,
103 });
104 }
105 let operation = load_auth_operation_at::<KeylessOperation<V>>(
106 &session,
107 namespace,
108 location,
109 &self.value_cfg,
110 )
111 .await?;
112 Ok(operation.into_value())
113 }
114
115 pub async fn operation_range_checkpoint(
116 &self,
117 watermark: Location,
118 start_location: Location,
119 max_locations: u32,
120 ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
121 let session = self.client.create_session();
122 self.operation_range_checkpoint_in_session(
123 &session,
124 watermark,
125 start_location,
126 max_locations,
127 )
128 .await
129 }
130
131 pub(crate) async fn batch_multi_proof_with_read_floor(
132 &self,
133 read_floor_sequence: u64,
134 watermark: Location,
135 operations: Vec<(Location, Vec<u8>)>,
136 ) -> Result<RawBatchMultiProof<H::Digest>, QmdbError> {
137 let namespace = AuthenticatedBackendNamespace::Keyless;
138 let session = self
139 .client
140 .create_session_with_sequence(read_floor_sequence);
141 require_published_auth_watermark(&session, namespace, watermark).await?;
142 let storage = AuthKvMmrStorage {
143 session: &session,
144 namespace,
145 mmr_size: mmr_size_for_watermark(watermark)?,
146 _marker: PhantomData::<H::Digest>,
147 };
148 let root = compute_auth_root::<H>(&session, namespace, watermark).await?;
149 crate::proof::build_batch_multi_proof::<H, _>(&storage, watermark, root, operations).await
150 }
151
152 async fn operation_range_checkpoint_in_session(
153 &self,
154 session: &SerializableReadSession,
155 watermark: Location,
156 start_location: Location,
157 max_locations: u32,
158 ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
159 let namespace = AuthenticatedBackendNamespace::Keyless;
160 require_published_auth_watermark(session, namespace, watermark).await?;
161 let end = crate::proof::resolve_range_bounds(watermark, start_location, max_locations)?;
162 let storage = AuthKvMmrStorage {
163 session,
164 namespace,
165 mmr_size: mmr_size_for_watermark(watermark)?,
166 _marker: PhantomData::<H::Digest>,
167 };
168 let root = compute_auth_root::<H>(session, namespace, watermark).await?;
169 let encoded_operations =
170 load_auth_operation_bytes_range(session, namespace, start_location, end).await?;
171 crate::proof::build_operation_range_checkpoint::<H, _>(
172 &storage,
173 watermark,
174 start_location,
175 end,
176 root,
177 encoded_operations,
178 )
179 .await
180 }
181
182 pub async fn operation_range_proof(
184 &self,
185 watermark: Location,
186 start_location: Location,
187 max_locations: u32,
188 ) -> Result<VerifiedOperationRange<H::Digest, KeylessOperation<V>>, QmdbError> {
189 let checkpoint = self
190 .operation_range_checkpoint(watermark, start_location, max_locations)
191 .await?;
192 let operations = checkpoint
193 .encoded_operations
194 .iter()
195 .enumerate()
196 .map(|(offset, bytes)| {
197 let location = checkpoint.start_location + offset as u64;
198 KeylessOperation::<V>::decode_cfg(bytes.as_slice(), &self.value_cfg).map_err(|e| {
199 QmdbError::CorruptData(format!(
200 "failed to decode authenticated operation at location {location}: {e}"
201 ))
202 })
203 })
204 .collect::<Result<Vec<_>, _>>()?;
205 Ok(VerifiedOperationRange {
206 root: checkpoint.root,
207 start_location: checkpoint.start_location,
208 operations,
209 })
210 }
211}