1use bytes::Bytes;
2use commonware_codec::{Decode, Encode};
3use commonware_consensus::{Block, Viewable};
4use commonware_cryptography::{certificate, Digest};
5use exoware_sdk::keys::Key;
6use exoware_sdk::{ClientError, RangeMode, StoreBatchUpload, StoreClient, StoreWriteBatch};
7use futures::future::BoxFuture;
8
9use crate::error::SimplexError;
10use crate::keys::{self, RecordKind};
11use crate::types::{
12 encode_block_data, BlockData, Finalized, Notarized, UploadReceipt, UploadSummary,
13};
14
15#[derive(Clone, Debug)]
16pub struct PreparedEntry {
17 pub key: Key,
18 pub value: Bytes,
19}
20
21#[derive(Clone, Debug, Default)]
22#[must_use]
23pub struct PreparedUpload {
24 entries: Vec<PreparedEntry>,
25 summary: UploadSummary,
26}
27
28impl PreparedUpload {
29 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn len(&self) -> usize {
34 self.entries.len()
35 }
36
37 pub fn is_empty(&self) -> bool {
38 self.entries.is_empty()
39 }
40
41 pub fn summary(&self) -> UploadSummary {
42 self.summary
43 }
44
45 pub fn entries(&self) -> &[PreparedEntry] {
46 &self.entries
47 }
48
49 pub fn extend(&mut self, other: PreparedUpload) {
50 self.summary.headers += other.summary.headers;
51 self.summary.blocks += other.summary.blocks;
52 self.summary.notarizations += other.summary.notarizations;
53 self.summary.finalizations += other.summary.finalizations;
54 self.summary.finalized_height_indexes += other.summary.finalized_height_indexes;
55 self.entries.extend(other.entries);
56 }
57
58 fn push(&mut self, key: Key, value: Bytes) {
59 self.entries.push(PreparedEntry { key, value });
60 }
61}
62
63#[derive(Clone, Debug)]
73pub struct SimplexClient {
74 client: StoreClient,
75}
76
77impl SimplexClient {
78 pub fn new(store_url: &str) -> Self {
79 Self::from_client(StoreClient::new(store_url))
80 }
81
82 pub fn from_client(client: StoreClient) -> Self {
83 Self { client }
84 }
85
86 pub fn store_client(&self) -> &StoreClient {
87 &self.client
88 }
89
90 pub fn into_store_client(self) -> StoreClient {
91 self.client
92 }
93
94 pub fn prepare_header<B>(&self, header: &B) -> PreparedUpload
95 where
96 B: Block,
97 {
98 let mut prepared = PreparedUpload::new();
99 prepared.summary.headers = 1;
100 prepared.push(keys::header_by_digest(&header.digest()), header.encode());
101 prepared
102 }
103
104 pub fn prepare_block<B>(&self, header: &B, body: impl Into<Bytes>) -> PreparedUpload
105 where
106 B: Block,
107 {
108 let body = body.into();
109 let mut prepared = self.prepare_header(header);
110 prepared.summary.blocks = 1;
111 prepared.push(
112 keys::block_by_digest(&header.digest()),
113 encode_block_data(header, &body),
114 );
115 prepared
116 }
117
118 pub fn prepare_block_data<B>(&self, data: &BlockData<B>) -> PreparedUpload
119 where
120 B: Block,
121 {
122 self.prepare_block(&data.header, data.body.clone())
123 }
124
125 pub fn prepare_notarized<B, S, D>(
126 &self,
127 notarized: &Notarized<B, S, D>,
128 ) -> Result<PreparedUpload, SimplexError>
129 where
130 B: Block<Digest = D>,
131 S: certificate::Scheme,
132 D: Digest,
133 {
134 if notarized.proof.proposal.payload != notarized.header.digest() {
135 return Err(SimplexError::ProofBlockMismatch);
136 }
137
138 let mut prepared = self.prepare_header(¬arized.header);
139 prepared.summary.notarizations = 1;
140 prepared.push(
141 keys::notarization_by_view(notarized.proof.view()),
142 notarized.encode(),
143 );
144 Ok(prepared)
145 }
146
147 pub fn prepare_finalized<B, S, D>(
148 &self,
149 finalized: &Finalized<B, S, D>,
150 ) -> Result<PreparedUpload, SimplexError>
151 where
152 B: Block<Digest = D>,
153 S: certificate::Scheme,
154 D: Digest,
155 {
156 if finalized.proof.proposal.payload != finalized.header.digest() {
157 return Err(SimplexError::ProofBlockMismatch);
158 }
159
160 let mut prepared = self.prepare_header(&finalized.header);
161 let encoded = finalized.encode();
162 prepared.summary.finalizations = 1;
163 prepared.summary.finalized_height_indexes = 1;
164 prepared.push(
165 keys::finalization_by_view(finalized.proof.view()),
166 encoded.clone(),
167 );
168 prepared.push(
169 keys::finalized_by_height(finalized.header.height()),
170 encoded,
171 );
172 Ok(prepared)
173 }
174
175 pub fn stage_upload(
176 &self,
177 prepared: &PreparedUpload,
178 batch: &mut StoreWriteBatch,
179 ) -> Result<(), SimplexError> {
180 if prepared.is_empty() {
181 return Err(SimplexError::EmptyUpload);
182 }
183 for entry in prepared.entries() {
184 batch.push(&self.client, &entry.key, entry.value.clone())?;
185 }
186 Ok(())
187 }
188
189 pub async fn mark_upload_persisted(
190 &self,
191 prepared: PreparedUpload,
192 sequence_number: u64,
193 ) -> UploadReceipt {
194 UploadReceipt {
195 store_sequence_number: sequence_number,
196 summary: prepared.summary,
197 }
198 }
199
200 pub async fn mark_upload_failed(&self, _prepared: PreparedUpload, _err: impl ToString) {}
201
202 pub async fn upload_header<B>(&self, header: &B) -> Result<UploadReceipt, SimplexError>
203 where
204 B: Block,
205 {
206 let prepared = self.prepare_header(header);
207 self.commit_upload(&self.client, prepared).await
208 }
209
210 pub async fn upload_block<B>(
211 &self,
212 header: &B,
213 body: impl Into<Bytes>,
214 ) -> Result<UploadReceipt, SimplexError>
215 where
216 B: Block,
217 {
218 let prepared = self.prepare_block(header, body);
219 self.commit_upload(&self.client, prepared).await
220 }
221
222 pub async fn upload_notarized<B, S, D>(
223 &self,
224 notarized: &Notarized<B, S, D>,
225 ) -> Result<UploadReceipt, SimplexError>
226 where
227 B: Block<Digest = D>,
228 S: certificate::Scheme,
229 D: Digest,
230 {
231 let prepared = self.prepare_notarized(notarized)?;
232 self.commit_upload(&self.client, prepared).await
233 }
234
235 pub async fn upload_finalized<B, S, D>(
236 &self,
237 finalized: &Finalized<B, S, D>,
238 ) -> Result<UploadReceipt, SimplexError>
239 where
240 B: Block<Digest = D>,
241 S: certificate::Scheme,
242 D: Digest,
243 {
244 let prepared = self.prepare_finalized(finalized)?;
245 self.commit_upload(&self.client, prepared).await
246 }
247
248 pub async fn get_header_raw<D: Digest>(
249 &self,
250 digest: &D,
251 ) -> Result<Option<Bytes>, SimplexError> {
252 self.get_raw(keys::header_by_digest(digest)).await
253 }
254
255 pub async fn get_block_raw<D: Digest>(
256 &self,
257 digest: &D,
258 ) -> Result<Option<Bytes>, SimplexError> {
259 self.get_raw(keys::block_by_digest(digest)).await
260 }
261
262 pub async fn get_notarized_raw(
263 &self,
264 view: commonware_consensus::types::View,
265 ) -> Result<Option<Bytes>, SimplexError> {
266 self.get_raw(keys::notarization_by_view(view)).await
267 }
268
269 pub async fn get_finalized_by_view_raw(
270 &self,
271 view: commonware_consensus::types::View,
272 ) -> Result<Option<Bytes>, SimplexError> {
273 self.get_raw(keys::finalization_by_view(view)).await
274 }
275
276 pub async fn get_finalized_by_height_raw(
277 &self,
278 height: commonware_consensus::types::Height,
279 ) -> Result<Option<Bytes>, SimplexError> {
280 self.get_raw(keys::finalized_by_height(height)).await
281 }
282
283 pub async fn latest_finalized_raw(&self) -> Result<Option<Bytes>, SimplexError> {
284 self.latest_raw(RecordKind::FinalizedByHeight).await
285 }
286
287 pub async fn get_header<B, D>(
288 &self,
289 digest: &D,
290 cfg: &<B as commonware_codec::Read>::Cfg,
291 ) -> Result<Option<B>, SimplexError>
292 where
293 B: Block<Digest = D>,
294 D: Digest,
295 {
296 self.decode_optional(self.get_header_raw(digest).await?, cfg)
297 }
298
299 pub async fn get_block<B, D>(
300 &self,
301 digest: &D,
302 cfg: &<BlockData<B> as commonware_codec::Read>::Cfg,
303 ) -> Result<Option<BlockData<B>>, SimplexError>
304 where
305 B: Block<Digest = D>,
306 D: Digest,
307 {
308 self.decode_optional(self.get_block_raw(digest).await?, cfg)
309 }
310
311 pub async fn get_notarized<B, S, D>(
312 &self,
313 view: commonware_consensus::types::View,
314 cfg: &<Notarized<B, S, D> as commonware_codec::Read>::Cfg,
315 ) -> Result<Option<Notarized<B, S, D>>, SimplexError>
316 where
317 B: Block<Digest = D>,
318 S: certificate::Scheme,
319 D: Digest,
320 <S::Certificate as commonware_codec::Read>::Cfg: Clone,
321 {
322 self.decode_optional(self.get_notarized_raw(view).await?, cfg)
323 }
324
325 pub async fn get_finalized_by_height<B, S, D>(
326 &self,
327 height: commonware_consensus::types::Height,
328 cfg: &<Finalized<B, S, D> as commonware_codec::Read>::Cfg,
329 ) -> Result<Option<Finalized<B, S, D>>, SimplexError>
330 where
331 B: Block<Digest = D>,
332 S: certificate::Scheme,
333 D: Digest,
334 <S::Certificate as commonware_codec::Read>::Cfg: Clone,
335 {
336 self.decode_optional(self.get_finalized_by_height_raw(height).await?, cfg)
337 }
338
339 pub async fn get_finalized_by_view<B, S, D>(
340 &self,
341 view: commonware_consensus::types::View,
342 cfg: &<Finalized<B, S, D> as commonware_codec::Read>::Cfg,
343 ) -> Result<Option<Finalized<B, S, D>>, SimplexError>
344 where
345 B: Block<Digest = D>,
346 S: certificate::Scheme,
347 D: Digest,
348 <S::Certificate as commonware_codec::Read>::Cfg: Clone,
349 {
350 self.decode_optional(self.get_finalized_by_view_raw(view).await?, cfg)
351 }
352
353 pub async fn latest_finalized<B, S, D>(
354 &self,
355 cfg: &<Finalized<B, S, D> as commonware_codec::Read>::Cfg,
356 ) -> Result<Option<Finalized<B, S, D>>, SimplexError>
357 where
358 B: Block<Digest = D>,
359 S: certificate::Scheme,
360 D: Digest,
361 <S::Certificate as commonware_codec::Read>::Cfg: Clone,
362 {
363 self.decode_optional(self.latest_finalized_raw().await?, cfg)
364 }
365
366 async fn get_raw(&self, key: Key) -> Result<Option<Bytes>, SimplexError> {
367 Ok(self.client.query().get(&key).await?)
368 }
369
370 async fn latest_raw(&self, kind: RecordKind) -> Result<Option<Bytes>, SimplexError> {
371 let (start, end) = keys::range_for_kind(kind);
372 let rows = self
373 .client
374 .query()
375 .range_with_mode(&start, &end, 1, RangeMode::Reverse)
376 .await?;
377 Ok(rows.into_iter().next().map(|(_, value)| value))
378 }
379
380 fn decode_optional<T: Decode>(
381 &self,
382 value: Option<Bytes>,
383 cfg: &T::Cfg,
384 ) -> Result<Option<T>, SimplexError> {
385 value
386 .map(|bytes| T::decode_cfg(bytes, cfg).map_err(SimplexError::from))
387 .transpose()
388 }
389}
390
391impl StoreBatchUpload for SimplexClient {
392 type Prepared = PreparedUpload;
393 type Receipt = UploadReceipt;
394 type Error = SimplexError;
395
396 fn stage_upload(
397 &self,
398 prepared: &Self::Prepared,
399 batch: &mut StoreWriteBatch,
400 ) -> Result<(), Self::Error> {
401 SimplexClient::stage_upload(self, prepared, batch)
402 }
403
404 fn commit_error(&self, error: ClientError) -> Self::Error {
405 SimplexError::Client(error)
406 }
407
408 fn mark_upload_persisted<'a>(
409 &'a self,
410 prepared: Self::Prepared,
411 sequence_number: u64,
412 ) -> BoxFuture<'a, Self::Receipt>
413 where
414 Self: Sync + 'a,
415 Self::Prepared: 'a,
416 {
417 Box::pin(async move {
418 SimplexClient::mark_upload_persisted(self, prepared, sequence_number).await
419 })
420 }
421
422 fn mark_upload_failed<'a>(
423 &'a self,
424 prepared: Self::Prepared,
425 error: String,
426 ) -> BoxFuture<'a, ()>
427 where
428 Self: Sync + 'a,
429 Self::Prepared: 'a,
430 {
431 Box::pin(async move {
432 SimplexClient::mark_upload_failed(self, prepared, error).await;
433 })
434 }
435}