1use std::marker::PhantomData;
2
3use commonware_codec::{Codec, Decode, Encode, Read as CodecRead};
4use commonware_cryptography::Hasher;
5use commonware_storage::{mmr::Location, qmdb::immutable::Operation as ImmutableOperation};
6use commonware_utils::Array;
7use exoware_sdk::{SerializableReadSession, StoreClient};
8
9use crate::auth::AuthenticatedBackendNamespace;
10use crate::auth::{
11 compute_auth_root, decode_auth_immutable_update_location, load_auth_operation_at,
12 load_auth_operation_bytes_range, load_latest_auth_immutable_update_row,
13 read_latest_auth_watermark, require_published_auth_watermark,
14};
15use crate::codec::{mmr_size_for_watermark, UpdateRow};
16use crate::connect::OperationKv;
17use crate::core::retry_transient_post_ingest_query;
18use crate::error::QmdbError;
19use crate::proof::{OperationRangeCheckpoint, RawBatchMultiProof, VerifiedOperationRange};
20use crate::storage::AuthKvMmrStorage;
21use crate::VersionedValue;
22
23#[derive(Clone, Debug)]
24pub struct ImmutableClient<H: Hasher, K: AsRef<[u8]> + Codec, V: Codec + Send + Sync> {
25 client: StoreClient,
26 value_cfg: V::Cfg,
27 update_row_cfg: (K::Cfg, V::Cfg),
28 _marker: PhantomData<(H, K)>,
29}
30
31impl<H, K, V> ImmutableClient<H, K, V>
32where
33 H: Hasher,
34 K: Array + Codec + Clone + AsRef<[u8]>,
35 V: Codec + Clone + Send + Sync,
36 V::Cfg: Clone,
37 K::Cfg: Clone,
38 ImmutableOperation<K, V>: Encode + Decode<Cfg = V::Cfg> + Clone,
39{
40 pub fn new(url: &str, value_cfg: V::Cfg, update_row_cfg: (K::Cfg, V::Cfg)) -> Self {
41 Self::from_client(StoreClient::new(url), value_cfg, update_row_cfg)
42 }
43
44 pub fn from_client(
45 client: StoreClient,
46 value_cfg: V::Cfg,
47 update_row_cfg: (K::Cfg, V::Cfg),
48 ) -> Self {
49 Self {
50 client,
51 value_cfg,
52 update_row_cfg,
53 _marker: PhantomData,
54 }
55 }
56
57 pub(crate) fn store_client(&self) -> &StoreClient {
58 &self.client
59 }
60
61 pub(crate) fn extract_operation_kv(
62 &self,
63 location: Location,
64 bytes: &[u8],
65 ) -> Result<OperationKv, QmdbError>
66 where
67 V: AsRef<[u8]>,
68 {
69 let op = ImmutableOperation::<K, V>::decode_cfg(bytes, &self.value_cfg).map_err(|e| {
70 QmdbError::CorruptData(format!(
71 "failed to decode immutable operation at location {location}: {e}"
72 ))
73 })?;
74 let key = op.key().map(|k| <K as AsRef<[u8]>>::as_ref(k).to_vec());
75 let value = match &op {
76 ImmutableOperation::Set(_, value) => Some(value.as_ref().to_vec()),
77 ImmutableOperation::Commit(Some(value)) => Some(value.as_ref().to_vec()),
78 ImmutableOperation::Commit(None) => None,
79 };
80 Ok((key, value))
81 }
82
83 pub async fn writer_location_watermark(&self) -> Result<Option<Location>, QmdbError> {
84 retry_transient_post_ingest_query(|| {
85 let session = self.client.create_session();
86 async move {
87 read_latest_auth_watermark(&session, AuthenticatedBackendNamespace::Immutable).await
88 }
89 })
90 .await
91 }
92
93 pub async fn root_at(&self, watermark: Location) -> Result<H::Digest, QmdbError> {
94 let namespace = AuthenticatedBackendNamespace::Immutable;
95 let session = self.client.create_session();
96 require_published_auth_watermark(&session, namespace, watermark).await?;
97 compute_auth_root::<H>(&session, namespace, watermark).await
98 }
99
100 pub async fn get_at(
101 &self,
102 key: &K,
103 watermark: Location,
104 ) -> Result<Option<VersionedValue<K, V>>, QmdbError> {
105 let namespace = AuthenticatedBackendNamespace::Immutable;
106 let session = self.client.create_session();
107 require_published_auth_watermark(&session, namespace, watermark).await?;
108 let Some((row_key, row_value)) =
109 load_latest_auth_immutable_update_row(&session, watermark, key.as_ref()).await?
110 else {
111 return Ok(None);
112 };
113 let location = decode_auth_immutable_update_location(&row_key)?;
114 let decoded =
115 <UpdateRow<K, V> as CodecRead>::read_cfg(&mut row_value.as_ref(), &self.update_row_cfg)
116 .map_err(|e| QmdbError::CorruptData(format!("update row decode: {e}")))?;
117 if <K as AsRef<[u8]>>::as_ref(&decoded.key) != key.as_ref() {
118 return Err(QmdbError::CorruptData(format!(
119 "authenticated immutable update row key mismatch at location {location}"
120 )));
121 }
122 let operation = load_auth_operation_at::<ImmutableOperation<K, V>>(
123 &session,
124 namespace,
125 location,
126 &self.value_cfg,
127 )
128 .await?;
129 match operation {
130 ImmutableOperation::Set(operation_key, value) if operation_key == *key => {
131 Ok(Some(VersionedValue {
132 key: operation_key,
133 location,
134 value: Some(value),
135 }))
136 }
137 ImmutableOperation::Set(_, _) => Err(QmdbError::CorruptData(format!(
138 "authenticated immutable update row does not match operation key at location {location}"
139 ))),
140 ImmutableOperation::Commit(_) => Err(QmdbError::CorruptData(format!(
141 "authenticated immutable update row points at commit location {location}"
142 ))),
143 }
144 }
145
146 pub async fn operation_range_checkpoint(
147 &self,
148 watermark: Location,
149 start_location: Location,
150 max_locations: u32,
151 ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
152 let session = self.client.create_session();
153 self.operation_range_checkpoint_in_session(
154 &session,
155 watermark,
156 start_location,
157 max_locations,
158 )
159 .await
160 }
161
162 pub(crate) async fn batch_multi_proof_with_read_floor(
163 &self,
164 read_floor_sequence: u64,
165 watermark: Location,
166 operations: Vec<(Location, Vec<u8>)>,
167 ) -> Result<RawBatchMultiProof<H::Digest>, QmdbError> {
168 let namespace = AuthenticatedBackendNamespace::Immutable;
169 let session = self
170 .client
171 .create_session_with_sequence(read_floor_sequence);
172 require_published_auth_watermark(&session, namespace, watermark).await?;
173 let storage = AuthKvMmrStorage {
174 session: &session,
175 namespace,
176 mmr_size: mmr_size_for_watermark(watermark)?,
177 _marker: PhantomData::<H::Digest>,
178 };
179 let root = compute_auth_root::<H>(&session, namespace, watermark).await?;
180 crate::proof::build_batch_multi_proof::<H, _>(&storage, watermark, root, operations).await
181 }
182
183 async fn operation_range_checkpoint_in_session(
184 &self,
185 session: &SerializableReadSession,
186 watermark: Location,
187 start_location: Location,
188 max_locations: u32,
189 ) -> Result<OperationRangeCheckpoint<H::Digest>, QmdbError> {
190 let namespace = AuthenticatedBackendNamespace::Immutable;
191 require_published_auth_watermark(session, namespace, watermark).await?;
192 let end = crate::proof::resolve_range_bounds(watermark, start_location, max_locations)?;
193 let storage = AuthKvMmrStorage {
194 session,
195 namespace,
196 mmr_size: mmr_size_for_watermark(watermark)?,
197 _marker: PhantomData::<H::Digest>,
198 };
199 let root = compute_auth_root::<H>(session, namespace, watermark).await?;
200 let encoded_operations =
201 load_auth_operation_bytes_range(session, namespace, start_location, end).await?;
202 crate::proof::build_operation_range_checkpoint::<H, _>(
203 &storage,
204 watermark,
205 start_location,
206 end,
207 root,
208 encoded_operations,
209 )
210 .await
211 }
212
213 pub async fn operation_range_proof(
215 &self,
216 watermark: Location,
217 start_location: Location,
218 max_locations: u32,
219 ) -> Result<VerifiedOperationRange<H::Digest, ImmutableOperation<K, V>>, QmdbError> {
220 let checkpoint = self
221 .operation_range_checkpoint(watermark, start_location, max_locations)
222 .await?;
223 let operations = checkpoint
224 .encoded_operations
225 .iter()
226 .enumerate()
227 .map(|(offset, bytes)| {
228 let location = checkpoint.start_location + offset as u64;
229 ImmutableOperation::<K, V>::decode_cfg(bytes.as_slice(), &self.value_cfg).map_err(
230 |e| {
231 QmdbError::CorruptData(format!(
232 "failed to decode authenticated operation at location {location}: {e}"
233 ))
234 },
235 )
236 })
237 .collect::<Result<Vec<_>, _>>()?;
238 Ok(VerifiedOperationRange {
239 root: checkpoint.root,
240 start_location: checkpoint.start_location,
241 operations,
242 })
243 }
244}