1use std::marker::PhantomData;
2use std::sync::{atomic::AtomicU64, Arc};
3
4use commonware_codec::{Codec, Decode, Encode, Read as CodecRead};
5use commonware_cryptography::Hasher;
6use commonware_storage::{
7 mmr::{verification, Location, Position},
8 qmdb::immutable::Operation as ImmutableOperation,
9};
10use commonware_utils::Array;
11use exoware_sdk_rs::StoreClient;
12
13use crate::auth::AuthenticatedBackendNamespace;
14use crate::auth::{
15 append_auth_nodes_incrementally, build_auth_immutable_upload_rows, compute_auth_root,
16 decode_auth_immutable_update_location, encode_auth_presence_key, encode_auth_watermark_key,
17 load_auth_operation_at, load_auth_operation_range, load_latest_auth_immutable_update_row,
18 read_latest_auth_watermark, require_auth_uploaded_boundary, require_published_auth_watermark,
19};
20use crate::codec::{mmr_size_for_watermark, UpdateRow};
21use crate::core::{retry_transient_post_ingest_query, wait_until_query_visible_sequence};
22use crate::error::QmdbError;
23use crate::proof::AuthenticatedOperationRangeProof;
24use crate::storage::AuthKvMmrStorage;
25use crate::{UploadReceipt, VersionedValue};
26
27#[derive(Clone, Debug)]
28pub struct ImmutableClient<H: Hasher, K: AsRef<[u8]> + Codec, V: Codec + Send + Sync> {
29 client: StoreClient,
30 value_cfg: V::Cfg,
31 update_row_cfg: (K::Cfg, V::Cfg),
32 query_visible_sequence: Option<Arc<AtomicU64>>,
33 _marker: PhantomData<(H, K)>,
34}
35
36impl<H, K, V> ImmutableClient<H, K, V>
37where
38 H: Hasher,
39 K: Array + Codec + Clone + AsRef<[u8]>,
40 V: Codec + Clone + Send + Sync,
41 V::Cfg: Clone,
42 K::Cfg: Clone,
43 ImmutableOperation<K, V>: Encode + Decode<Cfg = V::Cfg> + Clone,
44{
45 pub fn new(url: &str, value_cfg: V::Cfg, update_row_cfg: (K::Cfg, V::Cfg)) -> Self {
46 Self::from_client(StoreClient::new(url), value_cfg, update_row_cfg)
47 }
48
49 pub fn from_client(
50 client: StoreClient,
51 value_cfg: V::Cfg,
52 update_row_cfg: (K::Cfg, V::Cfg),
53 ) -> Self {
54 Self {
55 client,
56 value_cfg,
57 update_row_cfg,
58 query_visible_sequence: None,
59 _marker: PhantomData,
60 }
61 }
62
63 pub fn with_query_visible_sequence(mut self, seq: Arc<AtomicU64>) -> Self {
64 self.query_visible_sequence = Some(seq);
65 self
66 }
67
68 pub fn inner(&self) -> &StoreClient {
69 &self.client
70 }
71
72 pub fn sequence_number(&self) -> u64 {
73 self.client.sequence_number()
74 }
75
76 async fn sync_after_ingest(&self) -> Result<(), QmdbError> {
77 let token = self.client.sequence_number();
78 wait_until_query_visible_sequence(self.query_visible_sequence.as_ref(), token).await
79 }
80
81 pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
82 retry_transient_post_ingest_query(|| {
83 let session = self.client.create_session();
84 async move {
85 read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Immutable).await
86 }
87 })
88 .await
89 }
90
91 pub async fn upload_operations(
92 &self,
93 latest_location: Location,
94 operations: &[ImmutableOperation<K, V>],
95 ) -> Result<UploadReceipt, QmdbError> {
96 if operations.is_empty() {
97 return Err(QmdbError::EmptyBatch);
98 }
99 let namespace = AuthenticatedBackendNamespace::Immutable;
100 if self
101 .client
102 .get(&encode_auth_presence_key(namespace, latest_location))
103 .await?
104 .is_some()
105 {
106 return Err(QmdbError::DuplicateBatchWatermark { latest_location });
107 }
108 let (keyed_operation_count, rows) =
109 build_auth_immutable_upload_rows(latest_location, operations)?;
110 let refs = rows
111 .iter()
112 .map(|(key, value)| (key, value.as_slice()))
113 .collect::<Vec<_>>();
114 self.client.put(&refs).await?;
115 self.sync_after_ingest().await?;
116 Ok(UploadReceipt {
117 latest_location,
118 operation_count: Location::new(operations.len() as u64),
119 keyed_operation_count,
120 writer_location_watermark: self.writer_location_watermark().await?,
121 sequence_number: self.client.sequence_number(),
122 })
123 }
124
125 pub async fn publish_writer_location_watermark(
126 &self,
127 location: Location,
128 ) -> Result<Location, QmdbError> {
129 let namespace = AuthenticatedBackendNamespace::Immutable;
130 let session = self.client.create_session();
131 let latest = read_latest_auth_watermark(&session, namespace).await?;
132 if let Some(watermark) = latest {
133 if watermark >= location {
134 return Ok(watermark);
135 }
136 }
137 require_auth_uploaded_boundary(&session, namespace, location).await?;
138 let previous_ops_size = match latest {
139 Some(previous) => mmr_size_for_watermark(previous)?,
140 None => Position::new(0),
141 };
142 let delta_start = latest.map_or(Location::new(0), |watermark| watermark + 1);
143 let end_exclusive = location
144 .checked_add(1)
145 .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
146 let encoded = crate::auth::load_auth_operation_bytes_range(
147 &session,
148 namespace,
149 delta_start,
150 end_exclusive,
151 )
152 .await?;
153 let mut rows = Vec::new();
154 append_auth_nodes_incrementally::<H>(
155 &session,
156 namespace,
157 previous_ops_size,
158 &encoded,
159 &mut rows,
160 )
161 .await?;
162 rows.push((encode_auth_watermark_key(namespace, location), Vec::new()));
163 let refs = rows
164 .iter()
165 .map(|(key, value)| (key, value.as_slice()))
166 .collect::<Vec<_>>();
167 self.client.put(&refs).await?;
168 self.sync_after_ingest().await?;
169 let visible = self.writer_location_watermark().await?;
170 if visible < Some(location) {
171 return Err(QmdbError::CorruptData(format!(
172 "immutable watermark publish did not become query-visible: requested={location}, visible={visible:?}"
173 )));
174 }
175 Ok(location)
176 }
177
178 pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
179 let namespace = AuthenticatedBackendNamespace::Immutable;
180 let session = self.client.create_session();
181 require_published_auth_watermark(&session, namespace, watermark).await?;
182 compute_auth_root::<H>(&session, namespace, watermark).await
183 }
184
185 pub async fn get_at(
186 &self,
187 key: &K,
188 watermark: Location,
189 ) -> Result<Option<VersionedValue<K, V>>, QmdbError> {
190 let namespace = AuthenticatedBackendNamespace::Immutable;
191 let session = self.client.create_session();
192 require_published_auth_watermark(&session, namespace, watermark).await?;
193 let Some((row_key, row_value)) =
194 load_latest_auth_immutable_update_row(&session, watermark, key.as_ref()).await?
195 else {
196 return Ok(None);
197 };
198 let location = decode_auth_immutable_update_location(&row_key)?;
199 let decoded =
200 <UpdateRow<K, V> as CodecRead>::read_cfg(&mut row_value.as_ref(), &self.update_row_cfg)
201 .map_err(|e| QmdbError::CorruptData(format!("update row decode: {e}")))?;
202 if <K as AsRef<[u8]>>::as_ref(&decoded.key) != key.as_ref() {
203 return Err(QmdbError::CorruptData(format!(
204 "authenticated immutable update row key mismatch at location {location}"
205 )));
206 }
207 let operation = load_auth_operation_at::<ImmutableOperation<K, V>>(
208 &session,
209 namespace,
210 location,
211 &self.value_cfg,
212 )
213 .await?;
214 match operation {
215 ImmutableOperation::Set(operation_key, value) if operation_key == *key => {
216 Ok(Some(VersionedValue {
217 key: operation_key,
218 location,
219 value: Some(value),
220 }))
221 }
222 ImmutableOperation::Set(_, _) => Err(QmdbError::CorruptData(format!(
223 "authenticated immutable update row does not match operation key at location {location}"
224 ))),
225 ImmutableOperation::Commit(_) => Err(QmdbError::CorruptData(format!(
226 "authenticated immutable update row points at commit location {location}"
227 ))),
228 }
229 }
230
231 pub async fn operation_range_proof(
232 &self,
233 watermark: Location,
234 start_location: Location,
235 max_locations: u32,
236 ) -> Result<AuthenticatedOperationRangeProof<H::Digest, ImmutableOperation<K, V>>, QmdbError>
237 {
238 if max_locations == 0 {
239 return Err(QmdbError::InvalidRangeLength);
240 }
241 let namespace = AuthenticatedBackendNamespace::Immutable;
242 let session = self.client.create_session();
243 require_published_auth_watermark(&session, namespace, watermark).await?;
244 let count = watermark
245 .checked_add(1)
246 .ok_or_else(|| QmdbError::CorruptData("watermark overflow".to_string()))?;
247 if start_location >= count {
248 return Err(QmdbError::RangeStartOutOfBounds {
249 start: start_location,
250 count,
251 });
252 }
253 let end = start_location
254 .saturating_add(max_locations as u64)
255 .min(count);
256 let storage = AuthKvMmrStorage {
257 session: &session,
258 namespace,
259 mmr_size: mmr_size_for_watermark(watermark)?,
260 _marker: PhantomData::<H::Digest>,
261 };
262 let proof = verification::range_proof(&storage, start_location..end)
263 .await
264 .map_err(|e| QmdbError::CommonwareMmr(e.to_string()))?;
265 Ok(AuthenticatedOperationRangeProof {
266 watermark,
267 root: compute_auth_root::<H>(&session, namespace, watermark).await?,
268 start_location,
269 proof,
270 operations: load_auth_operation_range::<ImmutableOperation<K, V>>(
271 &session,
272 namespace,
273 start_location,
274 end,
275 &self.value_cfg,
276 )
277 .await?,
278 })
279 }
280}