data_anchor_client/client/
ledger_client.rs

1use std::collections::{HashMap, HashSet};
2
3use anchor_lang::{AccountDeserialize, prelude::Pubkey, solana_program::message::VersionedMessage};
4use data_anchor_api::{
5    BloberWithNamespace, LedgerDataBlobError, RelevantInstruction, RelevantInstructionWithAccounts,
6    extract_relevant_instructions, get_account_at_index, get_blob_data_from_instructions,
7};
8use data_anchor_blober::{
9    BLOB_ACCOUNT_INSTRUCTION_IDX, BLOB_BLOBER_INSTRUCTION_IDX, checkpoint::Checkpoint,
10    find_checkpoint_address, state::blober::Blober,
11};
12use data_anchor_utils::encoding::Decodable;
13use futures::{StreamExt, TryStreamExt};
14use solana_account_decoder_client_types::UiAccountEncoding;
15use solana_client::rpc_config::{
16    RpcAccountInfoConfig, RpcBlockConfig, RpcProgramAccountsConfig, RpcTransactionConfig,
17};
18use solana_rpc_client_api::client_error::Error;
19use solana_signature::Signature;
20use solana_signer::Signer;
21use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
22
23use super::BloberIdentifier;
24use crate::{
25    DataAnchorClient, DataAnchorClientResult, OutcomeError,
26    constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
27    helpers::filter_relevant_instructions,
28};
29
30/// An error that can occur when uploading a blob to a blober account.
31#[derive(thiserror::Error, Debug)]
32pub enum ChainError {
33    /// Failed to query Solana RPC: {0}
34    #[error("Failed to query Solana RPC: {0}")]
35    SolanaRpc(#[from] Error),
36    /// Failed when sending transactions. Transaction errors:\n{}
37    #[error(transparent)]
38    TransactionFailure(#[from] OutcomeError),
39    /// Fee Strategy conversion failure: {0}
40    #[error("Fee Strategy conversion failure: {0}")]
41    ConversionError(&'static str),
42    /// Failed to declare blob: {0}
43    #[error("Failed to declare blob: {0}")]
44    DeclareBlob(OutcomeError),
45    /// Failed to insert chunks: {0}
46    #[error("Failed to insert chunks: {0}")]
47    InsertChunks(OutcomeError),
48    /// Failed to finalize blob: {0}
49    #[error("Failed to finalize blob: {0}")]
50    FinalizeBlob(OutcomeError),
51    /// Failed to discard blob: {0}
52    #[error("Failed to discard blob: {0}")]
53    DiscardBlob(OutcomeError),
54    /// Failed to compound upload: {0}
55    #[error("Failed to compound upload: {0}")]
56    CompoundUpload(OutcomeError),
57    /// Failed to initialize blober: {0}
58    #[error("Failed to initialize blober: {0}")]
59    InitializeBlober(OutcomeError),
60    /// Failed to close blober: {0}
61    #[error("Failed to close blober: {0}")]
62    CloseBlober(OutcomeError),
63    /// Missing blober namespace
64    #[error("Missing blober namespace. Namespace is required for creating a blober account.")]
65    MissingBloberNamespace,
66    /// Account already exists: {0}
67    #[error("Account already exists: {0}")]
68    AccountExists(String),
69    /// Account does not exist: {0}
70    #[error("Account does not exist: {0}")]
71    AccountDoesNotExist(String),
72    /// Payer has insufficient balance to pay for the transaction: required {0} lamports, available {1} lamports
73    #[error(
74        "Payer has insufficient balance to pay for the transaction: required {0} lamports, available {1} lamports"
75    )]
76    InsufficientBalance(u64, u64),
77    /// Could not calculate cost
78    #[error("Could not calculate cost")]
79    CouldNotCalculateCost,
80    /// Failed to configure checkpoint: {0}
81    #[error("Failed to configure checkpoint: {0}")]
82    ConfigureCheckpoint(OutcomeError),
83    /// Provided proof commitment does not match the blober's address
84    #[error("Provided proof commitment does not match the blober's address expected {0}, got {1}")]
85    ProofBloberMismatch(Pubkey, Pubkey),
86    #[error("Checkpoint account is not up to date with current blober state")]
87    CheckpointNotUpToDate,
88}
89
90impl DataAnchorClient {
91    /// Returns the raw blob data from the ledger for the given signatures.
92    pub async fn get_ledger_blobs_from_signatures<T>(
93        &self,
94        identifier: BloberIdentifier,
95        signatures: Vec<Signature>,
96    ) -> DataAnchorClientResult<T>
97    where
98        T: Decodable,
99    {
100        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
101
102        let relevant_transactions = futures::stream::iter(signatures)
103            .map(|signature| async move {
104                self.rpc_client
105                    .get_transaction_with_config(
106                        &signature,
107                        RpcTransactionConfig {
108                            commitment: Some(self.rpc_client.commitment()),
109                            encoding: Some(UiTransactionEncoding::Base58),
110                            max_supported_transaction_version: Some(0),
111                        },
112                    )
113                    .await
114            })
115            .buffer_unordered(DEFAULT_CONCURRENCY)
116            .collect::<Vec<_>>()
117            .await
118            .into_iter()
119            .collect::<Result<Vec<_>, _>>()?;
120
121        let relevant_instructions = extract_relevant_instructions(
122            &self.program_id,
123            &relevant_transactions
124                .iter()
125                .filter_map(|encoded| match &encoded.transaction.meta {
126                    Some(meta) if meta.status.is_err() => None,
127                    _ => encoded.transaction.transaction.decode(),
128                })
129                .collect::<Vec<_>>(),
130        );
131
132        let declares = relevant_instructions
133            .iter()
134            .filter_map(|instruction| {
135                (instruction.blober == blober
136                    && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
137                .then_some(instruction.blob)
138            })
139            .collect::<Vec<Pubkey>>();
140
141        let Some(blob) = declares.first() else {
142            return Err(LedgerDataBlobError::DeclareNotFound.into());
143        };
144
145        if declares.len() > 1 {
146            return Err(LedgerDataBlobError::MultipleDeclares.into());
147        }
148
149        if relevant_instructions
150            .iter()
151            .filter(|instruction| {
152                matches!(
153                    instruction.instruction,
154                    RelevantInstruction::FinalizeBlob(_)
155                )
156            })
157            .count()
158            > 1
159        {
160            return Err(LedgerDataBlobError::MultipleFinalizes.into());
161        }
162
163        let data = get_blob_data_from_instructions(&relevant_instructions, blober, *blob)?;
164
165        self.decompress_and_decode(&data).await
166    }
167
168    /// Fetches all blobs finalized in a given slot from the ledger.
169    pub async fn get_ledger_blobs<T>(
170        &self,
171        slot: u64,
172        identifier: BloberIdentifier,
173        lookback_slots: Option<u64>,
174    ) -> DataAnchorClientResult<Vec<T>>
175    where
176        T: Decodable,
177    {
178        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
179
180        let block_config = RpcBlockConfig {
181            commitment: Some(self.rpc_client.commitment()),
182            encoding: Some(UiTransactionEncoding::Base58),
183            max_supported_transaction_version: Some(0),
184            ..Default::default()
185        };
186        let block = self
187            .rpc_client
188            .get_block_with_config(slot, block_config)
189            .await?;
190
191        let Some(transactions) = block.transactions else {
192            // If there are no transactions in the block, that means there are no blobs to fetch.
193            return Ok(Vec::new());
194        };
195
196        let relevant_instructions = extract_relevant_instructions(
197            &self.program_id,
198            &transactions
199                .iter()
200                .filter_map(|tx| match &tx.meta {
201                    Some(meta) if meta.status.is_err() => None,
202                    _ => tx.transaction.decode(),
203                })
204                .collect::<Vec<_>>(),
205        );
206        let finalized_blobs = relevant_instructions
207            .iter()
208            .filter_map(|instruction| {
209                (instruction.blober == blober
210                    && matches!(
211                        instruction.instruction,
212                        RelevantInstruction::FinalizeBlob(_)
213                    ))
214                .then_some(instruction.blob)
215            })
216            .collect::<HashSet<Pubkey>>();
217
218        let mut relevant_instructions_map = HashMap::new();
219        filter_relevant_instructions(
220            relevant_instructions,
221            &finalized_blobs,
222            &mut relevant_instructions_map,
223        );
224
225        let mut blobs = HashMap::with_capacity(finalized_blobs.len());
226        for blob in &finalized_blobs {
227            let instructions = relevant_instructions_map
228                .get(blob)
229                .expect("This should never happen since we at least have the finalize instruction");
230
231            if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
232                blobs.insert(blob, blob_data);
233            }
234        }
235
236        // If all blobs are found, return them.
237        if blobs.len() == finalized_blobs.len() {
238            let blob_data = futures::stream::iter(blobs.values())
239                .map(|data| async move { self.decompress_and_decode(data).await })
240                .buffer_unordered(DEFAULT_CONCURRENCY)
241                .try_collect()
242                .await?;
243
244            return Ok(blob_data);
245        }
246
247        let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
248
249        let block_slots = self
250            .rpc_client
251            .get_blocks_with_commitment(
252                slot - lookback_slots,
253                Some(slot - 1),
254                self.rpc_client.commitment(),
255            )
256            .await?;
257
258        for slot in block_slots.into_iter().rev() {
259            let block = self
260                .rpc_client
261                .get_block_with_config(slot, block_config)
262                .await?;
263            let Some(transactions) = block.transactions else {
264                // If there are no transactions in the block, go to the next block.
265                continue;
266            };
267            let new_relevant_instructions = extract_relevant_instructions(
268                &self.program_id,
269                &transactions
270                    .iter()
271                    .filter_map(|tx| match &tx.meta {
272                        Some(meta) if meta.status.is_err() => None,
273                        _ => tx.transaction.decode(),
274                    })
275                    .collect::<Vec<_>>(),
276            );
277            filter_relevant_instructions(
278                new_relevant_instructions,
279                &finalized_blobs,
280                &mut relevant_instructions_map,
281            );
282            for blob in &finalized_blobs {
283                if blobs.contains_key(blob) {
284                    continue;
285                }
286                let instructions = relevant_instructions_map.get(blob).expect(
287                    "This should never happen since we at least have the finalize instruction",
288                );
289
290                if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
291                {
292                    blobs.insert(blob, blob_data);
293                }
294            }
295            if blobs.len() == finalized_blobs.len() {
296                break;
297            }
298        }
299
300        let blob_data = futures::stream::iter(blobs.values())
301            .map(|data| async move { self.decompress_and_decode(data).await })
302            .buffer_unordered(DEFAULT_CONCURRENCY)
303            .try_collect()
304            .await?;
305
306        Ok(blob_data)
307    }
308
309    /// Fetches blob messages for a given slot
310    /// Returns a tuple of ([`Pubkey`], [`VersionedMessage`]) where the Pubkey is the address of
311    /// the [`data_anchor_blober::state::blob::Blob`] account and the VersionedMessage is the message
312    /// that included the [`data_anchor_blober::instruction::FinalizeBlob`] instruction.
313    pub async fn get_blob_messages(
314        &self,
315        slot: u64,
316        identifier: BloberIdentifier,
317    ) -> DataAnchorClientResult<Vec<(Pubkey, VersionedMessage)>> {
318        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
319
320        let block: EncodedConfirmedBlock = self
321            .rpc_client
322            .get_block_with_config(
323                slot,
324                RpcBlockConfig {
325                    commitment: Some(self.rpc_client.commitment()),
326                    encoding: Some(UiTransactionEncoding::Base58),
327                    max_supported_transaction_version: Some(0),
328                    ..Default::default()
329                },
330            )
331            .await?
332            .into();
333
334        let finalized = block
335            .transactions
336            .iter()
337            .filter_map(|tx| match &tx.meta {
338                Some(meta) if meta.status.is_err() => None,
339                _ => tx.transaction.decode(),
340            })
341            .filter_map(|tx| {
342                let instructions = tx
343                    .message
344                    .instructions()
345                    .iter()
346                    .filter_map(|compiled_instruction| {
347                        Some(RelevantInstructionWithAccounts {
348                            blob: get_account_at_index(
349                                tx.message.static_account_keys(),
350                                compiled_instruction,
351                                BLOB_ACCOUNT_INSTRUCTION_IDX,
352                            )?,
353                            blober: get_account_at_index(
354                                tx.message.static_account_keys(),
355                                compiled_instruction,
356                                BLOB_BLOBER_INSTRUCTION_IDX,
357                            )?,
358                            instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
359                        })
360                    })
361                    .filter(|instruction| {
362                        instruction.blober == blober
363                            && matches!(
364                                instruction.instruction,
365                                RelevantInstruction::FinalizeBlob(_)
366                            )
367                    })
368                    .collect::<Vec<_>>();
369
370                instructions.is_empty().then_some(
371                    instructions
372                        .iter()
373                        .map(|instruction| (instruction.blob, tx.message.clone()))
374                        .collect::<Vec<_>>(),
375                )
376            })
377            .flatten()
378            .collect::<Vec<_>>();
379
380        Ok(finalized)
381    }
382
383    /// Lists all blober accounts owned by the payer.
384    pub async fn list_blobers(&self) -> DataAnchorClientResult<Vec<BloberWithNamespace>> {
385        let blobers = self
386            .rpc_client
387            .get_program_accounts_with_config(
388                &self.program_id,
389                RpcProgramAccountsConfig {
390                    account_config: RpcAccountInfoConfig {
391                        encoding: Some(UiAccountEncoding::Base64),
392                        ..Default::default()
393                    },
394                    ..Default::default()
395                },
396            )
397            .await?;
398
399        Ok(blobers
400            .into_iter()
401            .filter_map(|(pubkey, account)| {
402                let blober_state = Blober::try_deserialize(&mut account.data.as_slice()).ok()?;
403
404                (blober_state.caller == self.payer.pubkey()).then_some(BloberWithNamespace {
405                    address: pubkey.into(),
406                    namespace: blober_state.namespace,
407                })
408            })
409            .collect())
410    }
411
412    /// Retrieves a blober account by its identifier.
413    pub async fn get_blober(
414        &self,
415        identifier: BloberIdentifier,
416    ) -> DataAnchorClientResult<Option<Blober>> {
417        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
418        let account = self
419            .rpc_client
420            .get_account_with_commitment(&blober, self.rpc_client.commitment())
421            .await?
422            .value;
423
424        let Some(account) = account else {
425            return Ok(None);
426        };
427
428        let mut data = account.data.as_slice();
429
430        let blober = Blober::try_deserialize(&mut data).map_err(LedgerDataBlobError::from)?;
431
432        Ok(Some(blober))
433    }
434
435    /// Retrieves the checkpoint containing the Groth16 proof for a given blober account.
436    pub async fn get_checkpoint(
437        &self,
438        blober: BloberIdentifier,
439    ) -> DataAnchorClientResult<Option<Checkpoint>> {
440        let blober = blober.to_blober_address(self.program_id, self.payer.pubkey());
441        let checkpoint_address = find_checkpoint_address(self.program_id, blober);
442        let account = self
443            .rpc_client
444            .get_account_with_commitment(&checkpoint_address, self.rpc_client.commitment())
445            .await?
446            .value;
447
448        let Some(account) = account else {
449            return Ok(None);
450        };
451
452        if account.owner != self.program_id {
453            return Err(LedgerDataBlobError::AccountNotOwnedByProgram.into());
454        }
455
456        let mut data = account.data.as_slice();
457
458        let checkpoint =
459            Checkpoint::try_deserialize(&mut data).map_err(LedgerDataBlobError::from)?;
460
461        Ok(Some(checkpoint))
462    }
463}