Skip to main content

exoware_simplex/
client.rs

1use bytes::Bytes;
2use commonware_codec::{Decode, Encode};
3use commonware_consensus::{Block, Viewable};
4use commonware_cryptography::{certificate, Digest};
5use exoware_sdk::keys::Key;
6use exoware_sdk::{ClientError, RangeMode, StoreBatchUpload, StoreClient, StoreWriteBatch};
7use futures::future::BoxFuture;
8
9use crate::error::SimplexError;
10use crate::keys::{self, RecordKind};
11use crate::types::{
12    encode_block_data, BlockData, Finalized, Notarized, UploadReceipt, UploadSummary,
13};
14
15#[derive(Clone, Debug)]
16pub struct PreparedEntry {
17    pub key: Key,
18    pub value: Bytes,
19}
20
21#[derive(Clone, Debug, Default)]
22#[must_use]
23pub struct PreparedUpload {
24    entries: Vec<PreparedEntry>,
25    summary: UploadSummary,
26}
27
28impl PreparedUpload {
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    pub fn len(&self) -> usize {
34        self.entries.len()
35    }
36
37    pub fn is_empty(&self) -> bool {
38        self.entries.is_empty()
39    }
40
41    pub fn summary(&self) -> UploadSummary {
42        self.summary
43    }
44
45    pub fn entries(&self) -> &[PreparedEntry] {
46        &self.entries
47    }
48
49    pub fn extend(&mut self, other: PreparedUpload) {
50        self.summary.headers += other.summary.headers;
51        self.summary.blocks += other.summary.blocks;
52        self.summary.notarizations += other.summary.notarizations;
53        self.summary.finalizations += other.summary.finalizations;
54        self.summary.finalized_height_indexes += other.summary.finalized_height_indexes;
55        self.entries.extend(other.entries);
56    }
57
58    fn push(&mut self, key: Key, value: Bytes) {
59        self.entries.push(PreparedEntry { key, value });
60    }
61}
62
63/// Store-backed writer for Commonware Simplex blocks and certificates.
64///
65/// The writer stores five logical indexes:
66///
67/// - header bytes by header digest
68/// - full `{ header, body }` bytes by header digest
69/// - notarized `{ proof, header }` bytes by Simplex view
70/// - finalized `{ proof, header }` bytes by Simplex view
71/// - finalized `{ proof, header }` bytes by header height
72#[derive(Clone, Debug)]
73pub struct SimplexClient {
74    client: StoreClient,
75}
76
77impl SimplexClient {
78    pub fn new(store_url: &str) -> Self {
79        Self::from_client(StoreClient::new(store_url))
80    }
81
82    pub fn from_client(client: StoreClient) -> Self {
83        Self { client }
84    }
85
86    pub fn store_client(&self) -> &StoreClient {
87        &self.client
88    }
89
90    pub fn into_store_client(self) -> StoreClient {
91        self.client
92    }
93
94    pub fn prepare_header<B>(&self, header: &B) -> PreparedUpload
95    where
96        B: Block,
97    {
98        let mut prepared = PreparedUpload::new();
99        prepared.summary.headers = 1;
100        prepared.push(keys::header_by_digest(&header.digest()), header.encode());
101        prepared
102    }
103
104    pub fn prepare_block<B>(&self, header: &B, body: impl Into<Bytes>) -> PreparedUpload
105    where
106        B: Block,
107    {
108        let body = body.into();
109        let mut prepared = self.prepare_header(header);
110        prepared.summary.blocks = 1;
111        prepared.push(
112            keys::block_by_digest(&header.digest()),
113            encode_block_data(header, &body),
114        );
115        prepared
116    }
117
118    pub fn prepare_block_data<B>(&self, data: &BlockData<B>) -> PreparedUpload
119    where
120        B: Block,
121    {
122        self.prepare_block(&data.header, data.body.clone())
123    }
124
125    pub fn prepare_notarized<B, S, D>(
126        &self,
127        notarized: &Notarized<B, S, D>,
128    ) -> Result<PreparedUpload, SimplexError>
129    where
130        B: Block<Digest = D>,
131        S: certificate::Scheme,
132        D: Digest,
133    {
134        if notarized.proof.proposal.payload != notarized.header.digest() {
135            return Err(SimplexError::ProofBlockMismatch);
136        }
137
138        let mut prepared = self.prepare_header(&notarized.header);
139        prepared.summary.notarizations = 1;
140        prepared.push(
141            keys::notarization_by_view(notarized.proof.view()),
142            notarized.encode(),
143        );
144        Ok(prepared)
145    }
146
147    pub fn prepare_finalized<B, S, D>(
148        &self,
149        finalized: &Finalized<B, S, D>,
150    ) -> Result<PreparedUpload, SimplexError>
151    where
152        B: Block<Digest = D>,
153        S: certificate::Scheme,
154        D: Digest,
155    {
156        if finalized.proof.proposal.payload != finalized.header.digest() {
157            return Err(SimplexError::ProofBlockMismatch);
158        }
159
160        let mut prepared = self.prepare_header(&finalized.header);
161        let encoded = finalized.encode();
162        prepared.summary.finalizations = 1;
163        prepared.summary.finalized_height_indexes = 1;
164        prepared.push(
165            keys::finalization_by_view(finalized.proof.view()),
166            encoded.clone(),
167        );
168        prepared.push(
169            keys::finalized_by_height(finalized.header.height()),
170            encoded,
171        );
172        Ok(prepared)
173    }
174
175    pub fn stage_upload(
176        &self,
177        prepared: &PreparedUpload,
178        batch: &mut StoreWriteBatch,
179    ) -> Result<(), SimplexError> {
180        if prepared.is_empty() {
181            return Err(SimplexError::EmptyUpload);
182        }
183        for entry in prepared.entries() {
184            batch.push(&self.client, &entry.key, entry.value.clone())?;
185        }
186        Ok(())
187    }
188
189    pub async fn mark_upload_persisted(
190        &self,
191        prepared: PreparedUpload,
192        sequence_number: u64,
193    ) -> UploadReceipt {
194        UploadReceipt {
195            store_sequence_number: sequence_number,
196            summary: prepared.summary,
197        }
198    }
199
200    pub async fn mark_upload_failed(&self, _prepared: PreparedUpload, _err: impl ToString) {}
201
202    pub async fn upload_header<B>(&self, header: &B) -> Result<UploadReceipt, SimplexError>
203    where
204        B: Block,
205    {
206        let prepared = self.prepare_header(header);
207        self.commit_upload(&self.client, prepared).await
208    }
209
210    pub async fn upload_block<B>(
211        &self,
212        header: &B,
213        body: impl Into<Bytes>,
214    ) -> Result<UploadReceipt, SimplexError>
215    where
216        B: Block,
217    {
218        let prepared = self.prepare_block(header, body);
219        self.commit_upload(&self.client, prepared).await
220    }
221
222    pub async fn upload_notarized<B, S, D>(
223        &self,
224        notarized: &Notarized<B, S, D>,
225    ) -> Result<UploadReceipt, SimplexError>
226    where
227        B: Block<Digest = D>,
228        S: certificate::Scheme,
229        D: Digest,
230    {
231        let prepared = self.prepare_notarized(notarized)?;
232        self.commit_upload(&self.client, prepared).await
233    }
234
235    pub async fn upload_finalized<B, S, D>(
236        &self,
237        finalized: &Finalized<B, S, D>,
238    ) -> Result<UploadReceipt, SimplexError>
239    where
240        B: Block<Digest = D>,
241        S: certificate::Scheme,
242        D: Digest,
243    {
244        let prepared = self.prepare_finalized(finalized)?;
245        self.commit_upload(&self.client, prepared).await
246    }
247
248    pub async fn get_header_raw<D: Digest>(
249        &self,
250        digest: &D,
251    ) -> Result<Option<Bytes>, SimplexError> {
252        self.get_raw(keys::header_by_digest(digest)).await
253    }
254
255    pub async fn get_block_raw<D: Digest>(
256        &self,
257        digest: &D,
258    ) -> Result<Option<Bytes>, SimplexError> {
259        self.get_raw(keys::block_by_digest(digest)).await
260    }
261
262    pub async fn get_notarized_raw(
263        &self,
264        view: commonware_consensus::types::View,
265    ) -> Result<Option<Bytes>, SimplexError> {
266        self.get_raw(keys::notarization_by_view(view)).await
267    }
268
269    pub async fn get_finalized_by_view_raw(
270        &self,
271        view: commonware_consensus::types::View,
272    ) -> Result<Option<Bytes>, SimplexError> {
273        self.get_raw(keys::finalization_by_view(view)).await
274    }
275
276    pub async fn get_finalized_by_height_raw(
277        &self,
278        height: commonware_consensus::types::Height,
279    ) -> Result<Option<Bytes>, SimplexError> {
280        self.get_raw(keys::finalized_by_height(height)).await
281    }
282
283    pub async fn latest_finalized_raw(&self) -> Result<Option<Bytes>, SimplexError> {
284        self.latest_raw(RecordKind::FinalizedByHeight).await
285    }
286
287    pub async fn get_header<B, D>(
288        &self,
289        digest: &D,
290        cfg: &<B as commonware_codec::Read>::Cfg,
291    ) -> Result<Option<B>, SimplexError>
292    where
293        B: Block<Digest = D>,
294        D: Digest,
295    {
296        self.decode_optional(self.get_header_raw(digest).await?, cfg)
297    }
298
299    pub async fn get_block<B, D>(
300        &self,
301        digest: &D,
302        cfg: &<BlockData<B> as commonware_codec::Read>::Cfg,
303    ) -> Result<Option<BlockData<B>>, SimplexError>
304    where
305        B: Block<Digest = D>,
306        D: Digest,
307    {
308        self.decode_optional(self.get_block_raw(digest).await?, cfg)
309    }
310
311    pub async fn get_notarized<B, S, D>(
312        &self,
313        view: commonware_consensus::types::View,
314        cfg: &<Notarized<B, S, D> as commonware_codec::Read>::Cfg,
315    ) -> Result<Option<Notarized<B, S, D>>, SimplexError>
316    where
317        B: Block<Digest = D>,
318        S: certificate::Scheme,
319        D: Digest,
320        <S::Certificate as commonware_codec::Read>::Cfg: Clone,
321    {
322        self.decode_optional(self.get_notarized_raw(view).await?, cfg)
323    }
324
325    pub async fn get_finalized_by_height<B, S, D>(
326        &self,
327        height: commonware_consensus::types::Height,
328        cfg: &<Finalized<B, S, D> as commonware_codec::Read>::Cfg,
329    ) -> Result<Option<Finalized<B, S, D>>, SimplexError>
330    where
331        B: Block<Digest = D>,
332        S: certificate::Scheme,
333        D: Digest,
334        <S::Certificate as commonware_codec::Read>::Cfg: Clone,
335    {
336        self.decode_optional(self.get_finalized_by_height_raw(height).await?, cfg)
337    }
338
339    pub async fn get_finalized_by_view<B, S, D>(
340        &self,
341        view: commonware_consensus::types::View,
342        cfg: &<Finalized<B, S, D> as commonware_codec::Read>::Cfg,
343    ) -> Result<Option<Finalized<B, S, D>>, SimplexError>
344    where
345        B: Block<Digest = D>,
346        S: certificate::Scheme,
347        D: Digest,
348        <S::Certificate as commonware_codec::Read>::Cfg: Clone,
349    {
350        self.decode_optional(self.get_finalized_by_view_raw(view).await?, cfg)
351    }
352
353    pub async fn latest_finalized<B, S, D>(
354        &self,
355        cfg: &<Finalized<B, S, D> as commonware_codec::Read>::Cfg,
356    ) -> Result<Option<Finalized<B, S, D>>, SimplexError>
357    where
358        B: Block<Digest = D>,
359        S: certificate::Scheme,
360        D: Digest,
361        <S::Certificate as commonware_codec::Read>::Cfg: Clone,
362    {
363        self.decode_optional(self.latest_finalized_raw().await?, cfg)
364    }
365
366    async fn get_raw(&self, key: Key) -> Result<Option<Bytes>, SimplexError> {
367        Ok(self.client.query().get(&key).await?)
368    }
369
370    async fn latest_raw(&self, kind: RecordKind) -> Result<Option<Bytes>, SimplexError> {
371        let (start, end) = keys::range_for_kind(kind);
372        let rows = self
373            .client
374            .query()
375            .range_with_mode(&start, &end, 1, RangeMode::Reverse)
376            .await?;
377        Ok(rows.into_iter().next().map(|(_, value)| value))
378    }
379
380    fn decode_optional<T: Decode>(
381        &self,
382        value: Option<Bytes>,
383        cfg: &T::Cfg,
384    ) -> Result<Option<T>, SimplexError> {
385        value
386            .map(|bytes| T::decode_cfg(bytes, cfg).map_err(SimplexError::from))
387            .transpose()
388    }
389}
390
391impl StoreBatchUpload for SimplexClient {
392    type Prepared = PreparedUpload;
393    type Receipt = UploadReceipt;
394    type Error = SimplexError;
395
396    fn stage_upload(
397        &self,
398        prepared: &Self::Prepared,
399        batch: &mut StoreWriteBatch,
400    ) -> Result<(), Self::Error> {
401        SimplexClient::stage_upload(self, prepared, batch)
402    }
403
404    fn commit_error(&self, error: ClientError) -> Self::Error {
405        SimplexError::Client(error)
406    }
407
408    fn mark_upload_persisted<'a>(
409        &'a self,
410        prepared: Self::Prepared,
411        sequence_number: u64,
412    ) -> BoxFuture<'a, Self::Receipt>
413    where
414        Self: Sync + 'a,
415        Self::Prepared: 'a,
416    {
417        Box::pin(async move {
418            SimplexClient::mark_upload_persisted(self, prepared, sequence_number).await
419        })
420    }
421
422    fn mark_upload_failed<'a>(
423        &'a self,
424        prepared: Self::Prepared,
425        error: String,
426    ) -> BoxFuture<'a, ()>
427    where
428        Self: Sync + 'a,
429        Self::Prepared: 'a,
430    {
431        Box::pin(async move {
432            SimplexClient::mark_upload_failed(self, prepared, error).await;
433        })
434    }
435}