1use std::marker::PhantomData;
2use std::sync::{atomic::AtomicU64, Arc};
3
4use commonware_codec::{Codec, Decode, Encode};
5use commonware_cryptography::Hasher;
6use commonware_storage::{
7 mmr::{verification, Location, Position},
8 qmdb::keyless::Operation as KeylessOperation,
9};
10use exoware_sdk_rs::StoreClient;
11
12use crate::auth::AuthenticatedBackendNamespace;
13use crate::auth::{
14 append_auth_nodes_incrementally, build_auth_upload_rows, compute_auth_root,
15 encode_auth_presence_key, encode_auth_watermark_key, load_auth_operation_at,
16 load_auth_operation_bytes_range, load_auth_operation_range, read_latest_auth_watermark,
17 require_auth_uploaded_boundary, require_published_auth_watermark,
18};
19use crate::codec::{ensure_encoded_value_size, mmr_size_for_watermark};
20use crate::core::{retry_transient_post_ingest_query, wait_until_query_visible_sequence};
21use crate::error::QmdbError;
22use crate::proof::AuthenticatedOperationRangeProof;
23use crate::storage::AuthKvMmrStorage;
24use crate::UploadReceipt;
25
26#[derive(Clone, Debug)]
27pub struct KeylessClient<H: Hasher, V: Codec + Send + Sync> {
28 client: StoreClient,
29 value_cfg: V::Cfg,
30 query_visible_sequence: Option<Arc<AtomicU64>>,
31 _marker: PhantomData<H>,
32}
33
34impl<H, V> KeylessClient<H, V>
35where
36 H: Hasher,
37 V: Codec + Clone + Send + Sync,
38 V::Cfg: Clone,
39 KeylessOperation<V>: Encode + Decode<Cfg = V::Cfg> + Clone,
40{
41 pub fn new(url: &str, value_cfg: V::Cfg) -> Self {
42 Self::from_client(StoreClient::new(url), value_cfg)
43 }
44
45 pub fn from_client(client: StoreClient, value_cfg: V::Cfg) -> Self {
46 Self {
47 client,
48 value_cfg,
49 query_visible_sequence: None,
50 _marker: PhantomData,
51 }
52 }
53
54 pub fn with_query_visible_sequence(mut self, seq: Arc<AtomicU64>) -> Self {
55 self.query_visible_sequence = Some(seq);
56 self
57 }
58
59 pub fn inner(&self) -> &StoreClient {
60 &self.client
61 }
62
63 pub fn sequence_number(&self) -> u64 {
64 self.client.sequence_number()
65 }
66
67 async fn sync_after_ingest(&self) -> Result<(), QmdbError> {
68 let token = self.client.sequence_number();
69 wait_until_query_visible_sequence(self.query_visible_sequence.as_ref(), token).await
70 }
71
72 pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
73 retry_transient_post_ingest_query(|| {
74 let session = self.client.create_session();
75 async move {
76 read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Keyless).await
77 }
78 })
79 .await
80 }
81
82 pub async fn upload_operations(
83 &self,
84 latest_location: Location,
85 operations: &[KeylessOperation<V>],
86 ) -> Result<UploadReceipt, QmdbError> {
87 if operations.is_empty() {
88 return Err(QmdbError::EmptyBatch);
89 }
90 let namespace = AuthenticatedBackendNamespace::Keyless;
91 if self
92 .client
93 .get(&encode_auth_presence_key(namespace, latest_location))
94 .await?
95 .is_some()
96 {
97 return Err(QmdbError::DuplicateBatchWatermark { latest_location });
98 }
99 let encoded = operations
100 .iter()
101 .map(|operation| {
102 let bytes = operation.encode().to_vec();
103 ensure_encoded_value_size(bytes.len())?;
104 Ok(bytes)
105 })
106 .collect::<Result<Vec<_>, QmdbError>>()?;
107 let (_, rows) = build_auth_upload_rows(namespace, latest_location, &encoded)?;
108 let refs = rows
109 .iter()
110 .map(|(key, value)| (key, value.as_slice()))
111 .collect::<Vec<_>>();
112 self.client.put(&refs).await?;
113 self.sync_after_ingest().await?;
114
115 Ok(UploadReceipt {
116 latest_location,
117 operation_count: Location::new(operations.len() as u64),
118 keyed_operation_count: 0,
119 writer_location_watermark: self.writer_location_watermark().await?,
120 sequence_number: self.client.sequence_number(),
121 })
122 }
123
124 pub async fn publish_writer_location_watermark(
125 &self,
126 location: Location,
127 ) -> Result<Location, QmdbError> {
128 let namespace = AuthenticatedBackendNamespace::Keyless;
129 let session = self.client.create_session();
130 let latest = read_latest_auth_watermark(&session, namespace).await?;
131 if let Some(watermark) = latest {
132 if watermark >= location {
133 return Ok(watermark);
134 }
135 }
136 require_auth_uploaded_boundary(&session, namespace, location).await?;
137 let previous_ops_size = match latest {
138 Some(previous) => mmr_size_for_watermark(previous)?,
139 None => Position::new(0),
140 };
141 let delta_start = latest.map_or(Location::new(0), |watermark| watermark + 1);
142 let end_exclusive = location
143 .checked_add(1)
144 .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
145 let encoded =
146 load_auth_operation_bytes_range(&session, namespace, delta_start, end_exclusive)
147 .await?;
148 let mut rows = Vec::new();
149 append_auth_nodes_incrementally::<H>(
150 &session,
151 namespace,
152 previous_ops_size,
153 &encoded,
154 &mut rows,
155 )
156 .await?;
157 rows.push((encode_auth_watermark_key(namespace, location), Vec::new()));
158 let refs = rows
159 .iter()
160 .map(|(key, value)| (key, value.as_slice()))
161 .collect::<Vec<_>>();
162 self.client.put(&refs).await?;
163 self.sync_after_ingest().await?;
164 let visible = self.writer_location_watermark().await?;
165 if visible < Some(location) {
166 return Err(QmdbError::CorruptData(format!(
167 "keyless watermark publish did not become query-visible: requested={location}, visible={visible:?}"
168 )));
169 }
170 Ok(location)
171 }
172
173 pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
174 let namespace = AuthenticatedBackendNamespace::Keyless;
175 let session = self.client.create_session();
176 require_published_auth_watermark(&session, namespace, watermark).await?;
177 compute_auth_root::<H>(&session, namespace, watermark).await
178 }
179
180 pub async fn get_at(
181 &self,
182 location: Location,
183 watermark: Location,
184 ) -> Result<Option<V>, QmdbError> {
185 let namespace = AuthenticatedBackendNamespace::Keyless;
186 let session = self.client.create_session();
187 require_published_auth_watermark(&session, namespace, watermark).await?;
188 let count = watermark
189 .checked_add(1)
190 .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
191 if location >= count {
192 return Err(QmdbError::RangeStartOutOfBounds {
193 start: location,
194 count,
195 });
196 }
197 let operation = load_auth_operation_at::<KeylessOperation<V>>(
198 &session,
199 namespace,
200 location,
201 &self.value_cfg,
202 )
203 .await?;
204 Ok(operation.into_value())
205 }
206
207 pub async fn operation_range_proof(
208 &self,
209 watermark: Location,
210 start_location: Location,
211 max_locations: u32,
212 ) -> Result<AuthenticatedOperationRangeProof<H::Digest, KeylessOperation<V>>, QmdbError> {
213 if max_locations == 0 {
214 return Err(QmdbError::InvalidRangeLength);
215 }
216 let namespace = AuthenticatedBackendNamespace::Keyless;
217 let session = self.client.create_session();
218 require_published_auth_watermark(&session, namespace, watermark).await?;
219 let count = watermark
220 .checked_add(1)
221 .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
222 if start_location >= count {
223 return Err(QmdbError::RangeStartOutOfBounds {
224 start: start_location,
225 count,
226 });
227 }
228 let end = start_location
229 .saturating_add(max_locations as u64)
230 .min(count);
231 let storage = AuthKvMmrStorage {
232 session: &session,
233 namespace,
234 mmr_size: mmr_size_for_watermark(watermark)?,
235 _marker: PhantomData::<H::Digest>,
236 };
237 let proof = verification::range_proof(&storage, start_location..end)
238 .await
239 .map_err(|e| QmdbError::CommonwareMmr(e.to_string()))?;
240 Ok(AuthenticatedOperationRangeProof {
241 watermark,
242 root: compute_auth_root::<H>(&session, namespace, watermark).await?,
243 start_location,
244 proof,
245 operations: load_auth_operation_range::<KeylessOperation<V>>(
246 &session,
247 namespace,
248 start_location,
249 end,
250 &self.value_cfg,
251 )
252 .await?,
253 })
254 }
255}