data_anchor_client/client/
ledger_client.rs

1use std::collections::{HashMap, HashSet};
2
3use anchor_lang::{
4    AnchorDeserialize, Discriminator, prelude::Pubkey, solana_program::message::VersionedMessage,
5};
6use data_anchor_api::{
7    BloberWithNamespace, LedgerDataBlobError, RelevantInstruction, RelevantInstructionWithAccounts,
8    extract_relevant_instructions, get_account_at_index, get_blob_data_from_instructions,
9};
10use data_anchor_blober::{
11    BLOB_ACCOUNT_INSTRUCTION_IDX, BLOB_BLOBER_INSTRUCTION_IDX, checkpoint::Checkpoint,
12    find_checkpoint_address, state::blober::Blober,
13};
14use data_anchor_utils::{
15    compression::DataAnchorCompressionAsync,
16    encoding::{DataAnchorEncoding, Decodable},
17};
18use futures::{StreamExt, TryStreamExt};
19use solana_account_decoder_client_types::UiAccountEncoding;
20use solana_client::rpc_config::{
21    RpcAccountInfoConfig, RpcBlockConfig, RpcProgramAccountsConfig, RpcTransactionConfig,
22};
23use solana_rpc_client_api::client_error::Error;
24use solana_signature::Signature;
25use solana_signer::Signer;
26use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
27
28use super::BloberIdentifier;
29use crate::{
30    DataAnchorClient, DataAnchorClientResult, OutcomeError,
31    constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
32    helpers::filter_relevant_instructions,
33};
34
35/// An error that can occur when uploading a blob to a blober account.
36#[derive(thiserror::Error, Debug)]
37pub enum ChainError {
38    /// Failed to query Solana RPC: {0}
39    #[error("Failed to query Solana RPC: {0}")]
40    SolanaRpc(#[from] Error),
41    /// Failed when sending transactions. Transaction errors:\n{}
42    #[error(transparent)]
43    TransactionFailure(#[from] OutcomeError),
44    /// Fee Strategy conversion failure: {0}
45    #[error("Fee Strategy conversion failure: {0}")]
46    ConversionError(&'static str),
47    /// Failed to declare blob: {0}
48    #[error("Failed to declare blob: {0}")]
49    DeclareBlob(OutcomeError),
50    /// Failed to insert chunks: {0}
51    #[error("Failed to insert chunks: {0}")]
52    InsertChunks(OutcomeError),
53    /// Failed to finalize blob: {0}
54    #[error("Failed to finalize blob: {0}")]
55    FinalizeBlob(OutcomeError),
56    /// Failed to discard blob: {0}
57    #[error("Failed to discard blob: {0}")]
58    DiscardBlob(OutcomeError),
59    /// Failed to compound upload: {0}
60    #[error("Failed to compound upload: {0}")]
61    CompoundUpload(OutcomeError),
62    /// Failed to initialize blober: {0}
63    #[error("Failed to initialize blober: {0}")]
64    InitializeBlober(OutcomeError),
65    /// Failed to close blober: {0}
66    #[error("Failed to close blober: {0}")]
67    CloseBlober(OutcomeError),
68    /// Missing blober namespace
69    #[error("Missing blober namespace. Namespace is required for creating a blober account.")]
70    MissingBloberNamespace,
71    /// Account already exists: {0}
72    #[error("Account already exists: {0}")]
73    AccountExists(String),
74    /// Account does not exist: {0}
75    #[error("Account does not exist: {0}")]
76    AccountDoesNotExist(String),
77    /// Payer has insufficient balance to pay for the transaction: required {0} lamports, available {1} lamports
78    #[error(
79        "Payer has insufficient balance to pay for the transaction: required {0} lamports, available {1} lamports"
80    )]
81    InsufficientBalance(u64, u64),
82    /// Could not calculate cost
83    #[error("Could not calculate cost")]
84    CouldNotCalculateCost,
85    /// Failed to configure checkpoint: {0}
86    #[error("Failed to configure checkpoint: {0}")]
87    ConfigureCheckpoint(OutcomeError),
88    /// Provided proof commitment does not match the blober's address
89    #[error("Provided proof commitment does not match the blober's address expected {0}, got {1}")]
90    ProofBloberMismatch(Pubkey, Pubkey),
91    #[error("Checkpoint account is not up to date with current blober state")]
92    CheckpointNotUpToDate,
93}
94
95impl<Encoding, Compression> DataAnchorClient<Encoding, Compression>
96where
97    Encoding: DataAnchorEncoding + Default,
98    Compression: DataAnchorCompressionAsync,
99{
100    /// Returns the raw blob data from the ledger for the given signatures.
101    pub async fn get_ledger_blobs_from_signatures<T>(
102        &self,
103        identifier: BloberIdentifier,
104        signatures: Vec<Signature>,
105    ) -> DataAnchorClientResult<T>
106    where
107        T: Decodable,
108    {
109        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
110
111        let relevant_transactions = futures::stream::iter(signatures)
112            .map(|signature| async move {
113                self.rpc_client
114                    .get_transaction_with_config(
115                        &signature,
116                        RpcTransactionConfig {
117                            commitment: Some(self.rpc_client.commitment()),
118                            encoding: Some(UiTransactionEncoding::Base58),
119                            max_supported_transaction_version: Some(0),
120                        },
121                    )
122                    .await
123            })
124            .buffer_unordered(DEFAULT_CONCURRENCY)
125            .collect::<Vec<_>>()
126            .await
127            .into_iter()
128            .collect::<Result<Vec<_>, _>>()?;
129
130        let relevant_instructions = extract_relevant_instructions(
131            &self.program_id,
132            &relevant_transactions
133                .iter()
134                .filter_map(|encoded| match &encoded.transaction.meta {
135                    Some(meta) if meta.status.is_err() => None,
136                    _ => encoded.transaction.transaction.decode(),
137                })
138                .collect::<Vec<_>>(),
139        );
140
141        let declares = relevant_instructions
142            .iter()
143            .filter_map(|instruction| {
144                (instruction.blober == blober
145                    && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
146                .then_some(instruction.blob)
147            })
148            .collect::<Vec<Pubkey>>();
149
150        let Some(blob) = declares.first() else {
151            return Err(LedgerDataBlobError::DeclareNotFound.into());
152        };
153
154        if declares.len() > 1 {
155            return Err(LedgerDataBlobError::MultipleDeclares.into());
156        }
157
158        if relevant_instructions
159            .iter()
160            .filter(|instruction| {
161                matches!(
162                    instruction.instruction,
163                    RelevantInstruction::FinalizeBlob(_)
164                )
165            })
166            .count()
167            > 1
168        {
169            return Err(LedgerDataBlobError::MultipleFinalizes.into());
170        }
171
172        let data = get_blob_data_from_instructions(&relevant_instructions, blober, *blob)?;
173
174        self.decompress_and_decode(&data).await
175    }
176
177    /// Fetches all blobs finalized in a given slot from the ledger.
178    pub async fn get_ledger_blobs<T>(
179        &self,
180        slot: u64,
181        identifier: BloberIdentifier,
182        lookback_slots: Option<u64>,
183    ) -> DataAnchorClientResult<Vec<T>>
184    where
185        T: Decodable,
186    {
187        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
188
189        let block_config = RpcBlockConfig {
190            commitment: Some(self.rpc_client.commitment()),
191            encoding: Some(UiTransactionEncoding::Base58),
192            max_supported_transaction_version: Some(0),
193            ..Default::default()
194        };
195        let block = self
196            .rpc_client
197            .get_block_with_config(slot, block_config)
198            .await?;
199
200        let Some(transactions) = block.transactions else {
201            // If there are no transactions in the block, that means there are no blobs to fetch.
202            return Ok(Vec::new());
203        };
204
205        let relevant_instructions = extract_relevant_instructions(
206            &self.program_id,
207            &transactions
208                .iter()
209                .filter_map(|tx| match &tx.meta {
210                    Some(meta) if meta.status.is_err() => None,
211                    _ => tx.transaction.decode(),
212                })
213                .collect::<Vec<_>>(),
214        );
215        let finalized_blobs = relevant_instructions
216            .iter()
217            .filter_map(|instruction| {
218                (instruction.blober == blober
219                    && matches!(
220                        instruction.instruction,
221                        RelevantInstruction::FinalizeBlob(_)
222                    ))
223                .then_some(instruction.blob)
224            })
225            .collect::<HashSet<Pubkey>>();
226
227        let mut relevant_instructions_map = HashMap::new();
228        filter_relevant_instructions(
229            relevant_instructions,
230            &finalized_blobs,
231            &mut relevant_instructions_map,
232        );
233
234        let mut blobs = HashMap::with_capacity(finalized_blobs.len());
235        for blob in &finalized_blobs {
236            let instructions = relevant_instructions_map
237                .get(blob)
238                .expect("This should never happen since we at least have the finalize instruction");
239
240            if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
241                blobs.insert(blob, blob_data);
242            }
243        }
244
245        // If all blobs are found, return them.
246        if blobs.len() == finalized_blobs.len() {
247            let blob_data = futures::stream::iter(blobs.values())
248                .map(|data| async move { self.decompress_and_decode(data).await })
249                .buffer_unordered(DEFAULT_CONCURRENCY)
250                .try_collect()
251                .await?;
252
253            return Ok(blob_data);
254        }
255
256        let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
257
258        let block_slots = self
259            .rpc_client
260            .get_blocks_with_commitment(
261                slot - lookback_slots,
262                Some(slot - 1),
263                self.rpc_client.commitment(),
264            )
265            .await?;
266
267        for slot in block_slots.into_iter().rev() {
268            let block = self
269                .rpc_client
270                .get_block_with_config(slot, block_config)
271                .await?;
272            let Some(transactions) = block.transactions else {
273                // If there are no transactions in the block, go to the next block.
274                continue;
275            };
276            let new_relevant_instructions = extract_relevant_instructions(
277                &self.program_id,
278                &transactions
279                    .iter()
280                    .filter_map(|tx| match &tx.meta {
281                        Some(meta) if meta.status.is_err() => None,
282                        _ => tx.transaction.decode(),
283                    })
284                    .collect::<Vec<_>>(),
285            );
286            filter_relevant_instructions(
287                new_relevant_instructions,
288                &finalized_blobs,
289                &mut relevant_instructions_map,
290            );
291            for blob in &finalized_blobs {
292                if blobs.contains_key(blob) {
293                    continue;
294                }
295                let instructions = relevant_instructions_map.get(blob).expect(
296                    "This should never happen since we at least have the finalize instruction",
297                );
298
299                if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
300                {
301                    blobs.insert(blob, blob_data);
302                }
303            }
304            if blobs.len() == finalized_blobs.len() {
305                break;
306            }
307        }
308
309        let blob_data = futures::stream::iter(blobs.values())
310            .map(|data| async move { self.decompress_and_decode(data).await })
311            .buffer_unordered(DEFAULT_CONCURRENCY)
312            .try_collect()
313            .await?;
314
315        Ok(blob_data)
316    }
317
318    /// Fetches blob messages for a given slot
319    /// Returns a tuple of ([`Pubkey`], [`VersionedMessage`]) where the Pubkey is the address of
320    /// the [`data_anchor_blober::state::blob::Blob`] account and the VersionedMessage is the message
321    /// that included the [`data_anchor_blober::instruction::FinalizeBlob`] instruction.
322    pub async fn get_blob_messages(
323        &self,
324        slot: u64,
325        identifier: BloberIdentifier,
326    ) -> DataAnchorClientResult<Vec<(Pubkey, VersionedMessage)>> {
327        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
328
329        let block: EncodedConfirmedBlock = self
330            .rpc_client
331            .get_block_with_config(
332                slot,
333                RpcBlockConfig {
334                    commitment: Some(self.rpc_client.commitment()),
335                    encoding: Some(UiTransactionEncoding::Base58),
336                    max_supported_transaction_version: Some(0),
337                    ..Default::default()
338                },
339            )
340            .await?
341            .into();
342
343        let finalized = block
344            .transactions
345            .iter()
346            .filter_map(|tx| match &tx.meta {
347                Some(meta) if meta.status.is_err() => None,
348                _ => tx.transaction.decode(),
349            })
350            .filter_map(|tx| {
351                let instructions = tx
352                    .message
353                    .instructions()
354                    .iter()
355                    .filter_map(|compiled_instruction| {
356                        Some(RelevantInstructionWithAccounts {
357                            blob: get_account_at_index(
358                                &tx,
359                                compiled_instruction,
360                                BLOB_ACCOUNT_INSTRUCTION_IDX,
361                            )?,
362                            blober: get_account_at_index(
363                                &tx,
364                                compiled_instruction,
365                                BLOB_BLOBER_INSTRUCTION_IDX,
366                            )?,
367                            instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
368                        })
369                    })
370                    .filter(|instruction| {
371                        instruction.blober == blober
372                            && matches!(
373                                instruction.instruction,
374                                RelevantInstruction::FinalizeBlob(_)
375                            )
376                    })
377                    .collect::<Vec<_>>();
378
379                instructions.is_empty().then_some(
380                    instructions
381                        .iter()
382                        .map(|instruction| (instruction.blob, tx.message.clone()))
383                        .collect::<Vec<_>>(),
384                )
385            })
386            .flatten()
387            .collect::<Vec<_>>();
388
389        Ok(finalized)
390    }
391
392    /// Lists all blober accounts owned by the payer.
393    pub async fn list_blobers(&self) -> DataAnchorClientResult<Vec<BloberWithNamespace>> {
394        let blobers = self
395            .rpc_client
396            .get_program_accounts_with_config(
397                &self.program_id,
398                RpcProgramAccountsConfig {
399                    account_config: RpcAccountInfoConfig {
400                        encoding: Some(UiAccountEncoding::Base64),
401                        ..Default::default()
402                    },
403                    ..Default::default()
404                },
405            )
406            .await?;
407
408        Ok(blobers
409            .into_iter()
410            .filter_map(|(pubkey, account)| {
411                if !account.data.starts_with(Blober::DISCRIMINATOR) {
412                    return None;
413                }
414
415                let mut state = account.data.get(Blober::DISCRIMINATOR.len()..)?;
416                let blober_state = Blober::deserialize(&mut state).ok()?;
417
418                (blober_state.caller == self.payer.pubkey()).then_some(BloberWithNamespace {
419                    address: pubkey.into(),
420                    namespace: blober_state.namespace,
421                })
422            })
423            .collect())
424    }
425
426    /// Retrieves a blober account by its identifier.
427    pub async fn get_blober(
428        &self,
429        identifier: BloberIdentifier,
430    ) -> DataAnchorClientResult<Option<Blober>> {
431        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
432        let account = self
433            .rpc_client
434            .get_account_with_commitment(&blober, self.rpc_client.commitment())
435            .await?
436            .value;
437
438        let Some(account) = account else {
439            return Ok(None);
440        };
441
442        if !account.data.starts_with(Blober::DISCRIMINATOR) {
443            return Err(LedgerDataBlobError::InvalidBloberAccount(
444                "Invalid discriminator".to_owned(),
445            )
446            .into());
447        }
448
449        let mut state = account.data.get(Blober::DISCRIMINATOR.len()..).ok_or(
450            LedgerDataBlobError::InvalidBloberAccount("No state data".to_owned()),
451        )?;
452
453        if state.is_empty() {
454            return Err(
455                LedgerDataBlobError::InvalidBloberAccount("Empty state data".to_owned()).into(),
456            );
457        }
458
459        let blober = Blober::deserialize(&mut state).map_err(|e| {
460            LedgerDataBlobError::InvalidBloberAccount(format!("Failed to deserialize: {e:?}"))
461        })?;
462
463        Ok(Some(blober))
464    }
465
466    /// Retrieves the checkpoint containing the Groth16 proof for a given blober account.
467    pub async fn get_checkpoint(
468        &self,
469        blober: BloberIdentifier,
470    ) -> DataAnchorClientResult<Option<Checkpoint>> {
471        let blober = blober.to_blober_address(self.program_id, self.payer.pubkey());
472        let checkpoint_address = find_checkpoint_address(self.program_id, blober);
473        let account = self
474            .rpc_client
475            .get_account_with_commitment(&checkpoint_address, self.rpc_client.commitment())
476            .await?
477            .value;
478
479        let Some(account) = account else {
480            return Ok(None);
481        };
482
483        if account.owner != self.program_id {
484            return Err(LedgerDataBlobError::AccountNotOwnedByProgram.into());
485        }
486
487        if !account.data.starts_with(Checkpoint::DISCRIMINATOR) {
488            return Err(LedgerDataBlobError::InvalidCheckpointAccount(
489                "Invalid discriminator".to_owned(),
490            )
491            .into());
492        }
493
494        let mut state = account.data.get(Checkpoint::DISCRIMINATOR.len()..).ok_or(
495            LedgerDataBlobError::InvalidCheckpointAccount("No state data".to_owned()),
496        )?;
497
498        if state.is_empty() {
499            return Err(LedgerDataBlobError::InvalidCheckpointAccount(
500                "Empty state data".to_owned(),
501            )
502            .into());
503        }
504
505        let checkpoint = Checkpoint::deserialize(&mut state).map_err(|e| {
506            LedgerDataBlobError::InvalidCheckpointAccount(format!("Failed to deserialize: {e:?}"))
507        })?;
508
509        Ok(Some(checkpoint))
510    }
511}