data_anchor_client/client/
ledger_client.rs1use std::collections::{HashMap, HashSet};
2
3use anchor_lang::{
4 AnchorDeserialize, Discriminator, prelude::Pubkey, solana_program::message::VersionedMessage,
5};
6use data_anchor_api::{
7 BloberWithNamespace, LedgerDataBlobError, RelevantInstruction, RelevantInstructionWithAccounts,
8 extract_relevant_instructions, get_account_at_index, get_blob_data_from_instructions,
9};
10use data_anchor_blober::{
11 BLOB_ACCOUNT_INSTRUCTION_IDX, BLOB_BLOBER_INSTRUCTION_IDX, checkpoint::Checkpoint,
12 find_checkpoint_address, state::blober::Blober,
13};
14use data_anchor_utils::{
15 compression::DataAnchorCompressionAsync,
16 encoding::{DataAnchorEncoding, Decodable},
17};
18use futures::{StreamExt, TryStreamExt};
19use solana_account_decoder_client_types::UiAccountEncoding;
20use solana_client::rpc_config::{
21 RpcAccountInfoConfig, RpcBlockConfig, RpcProgramAccountsConfig, RpcTransactionConfig,
22};
23use solana_rpc_client_api::client_error::Error;
24use solana_signature::Signature;
25use solana_signer::Signer;
26use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
27
28use super::BloberIdentifier;
29use crate::{
30 DataAnchorClient, DataAnchorClientResult, OutcomeError,
31 constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
32 helpers::filter_relevant_instructions,
33};
34
35#[derive(thiserror::Error, Debug)]
37pub enum ChainError {
38 #[error("Failed to query Solana RPC: {0}")]
40 SolanaRpc(#[from] Error),
41 #[error(transparent)]
43 TransactionFailure(#[from] OutcomeError),
44 #[error("Fee Strategy conversion failure: {0}")]
46 ConversionError(&'static str),
47 #[error("Failed to declare blob: {0}")]
49 DeclareBlob(OutcomeError),
50 #[error("Failed to insert chunks: {0}")]
52 InsertChunks(OutcomeError),
53 #[error("Failed to finalize blob: {0}")]
55 FinalizeBlob(OutcomeError),
56 #[error("Failed to discard blob: {0}")]
58 DiscardBlob(OutcomeError),
59 #[error("Failed to compound upload: {0}")]
61 CompoundUpload(OutcomeError),
62 #[error("Failed to initialize blober: {0}")]
64 InitializeBlober(OutcomeError),
65 #[error("Failed to close blober: {0}")]
67 CloseBlober(OutcomeError),
68 #[error("Missing blober namespace. Namespace is required for creating a blober account.")]
70 MissingBloberNamespace,
71 #[error("Account already exists: {0}")]
73 AccountExists(String),
74 #[error("Account does not exist: {0}")]
76 AccountDoesNotExist(String),
77 #[error(
79 "Payer has insufficient balance to pay for the transaction: required {0} lamports, available {1} lamports"
80 )]
81 InsufficientBalance(u64, u64),
82 #[error("Could not calculate cost")]
84 CouldNotCalculateCost,
85 #[error("Failed to configure checkpoint: {0}")]
87 ConfigureCheckpoint(OutcomeError),
88 #[error("Provided proof commitment does not match the blober's address expected {0}, got {1}")]
90 ProofBloberMismatch(Pubkey, Pubkey),
91 #[error("Checkpoint account is not up to date with current blober state")]
92 CheckpointNotUpToDate,
93}
94
95impl<Encoding, Compression> DataAnchorClient<Encoding, Compression>
96where
97 Encoding: DataAnchorEncoding + Default,
98 Compression: DataAnchorCompressionAsync,
99{
100 pub async fn get_ledger_blobs_from_signatures<T>(
102 &self,
103 identifier: BloberIdentifier,
104 signatures: Vec<Signature>,
105 ) -> DataAnchorClientResult<T>
106 where
107 T: Decodable,
108 {
109 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
110
111 let relevant_transactions = futures::stream::iter(signatures)
112 .map(|signature| async move {
113 self.rpc_client
114 .get_transaction_with_config(
115 &signature,
116 RpcTransactionConfig {
117 commitment: Some(self.rpc_client.commitment()),
118 encoding: Some(UiTransactionEncoding::Base58),
119 max_supported_transaction_version: Some(0),
120 },
121 )
122 .await
123 })
124 .buffer_unordered(DEFAULT_CONCURRENCY)
125 .collect::<Vec<_>>()
126 .await
127 .into_iter()
128 .collect::<Result<Vec<_>, _>>()?;
129
130 let relevant_instructions = extract_relevant_instructions(
131 &self.program_id,
132 &relevant_transactions
133 .iter()
134 .filter_map(|encoded| match &encoded.transaction.meta {
135 Some(meta) if meta.status.is_err() => None,
136 _ => encoded.transaction.transaction.decode(),
137 })
138 .collect::<Vec<_>>(),
139 );
140
141 let declares = relevant_instructions
142 .iter()
143 .filter_map(|instruction| {
144 (instruction.blober == blober
145 && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
146 .then_some(instruction.blob)
147 })
148 .collect::<Vec<Pubkey>>();
149
150 let Some(blob) = declares.first() else {
151 return Err(LedgerDataBlobError::DeclareNotFound.into());
152 };
153
154 if declares.len() > 1 {
155 return Err(LedgerDataBlobError::MultipleDeclares.into());
156 }
157
158 if relevant_instructions
159 .iter()
160 .filter(|instruction| {
161 matches!(
162 instruction.instruction,
163 RelevantInstruction::FinalizeBlob(_)
164 )
165 })
166 .count()
167 > 1
168 {
169 return Err(LedgerDataBlobError::MultipleFinalizes.into());
170 }
171
172 let data = get_blob_data_from_instructions(&relevant_instructions, blober, *blob)?;
173
174 self.decompress_and_decode(&data).await
175 }
176
177 pub async fn get_ledger_blobs<T>(
179 &self,
180 slot: u64,
181 identifier: BloberIdentifier,
182 lookback_slots: Option<u64>,
183 ) -> DataAnchorClientResult<Vec<T>>
184 where
185 T: Decodable,
186 {
187 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
188
189 let block_config = RpcBlockConfig {
190 commitment: Some(self.rpc_client.commitment()),
191 encoding: Some(UiTransactionEncoding::Base58),
192 max_supported_transaction_version: Some(0),
193 ..Default::default()
194 };
195 let block = self
196 .rpc_client
197 .get_block_with_config(slot, block_config)
198 .await?;
199
200 let Some(transactions) = block.transactions else {
201 return Ok(Vec::new());
203 };
204
205 let relevant_instructions = extract_relevant_instructions(
206 &self.program_id,
207 &transactions
208 .iter()
209 .filter_map(|tx| match &tx.meta {
210 Some(meta) if meta.status.is_err() => None,
211 _ => tx.transaction.decode(),
212 })
213 .collect::<Vec<_>>(),
214 );
215 let finalized_blobs = relevant_instructions
216 .iter()
217 .filter_map(|instruction| {
218 (instruction.blober == blober
219 && matches!(
220 instruction.instruction,
221 RelevantInstruction::FinalizeBlob(_)
222 ))
223 .then_some(instruction.blob)
224 })
225 .collect::<HashSet<Pubkey>>();
226
227 let mut relevant_instructions_map = HashMap::new();
228 filter_relevant_instructions(
229 relevant_instructions,
230 &finalized_blobs,
231 &mut relevant_instructions_map,
232 );
233
234 let mut blobs = HashMap::with_capacity(finalized_blobs.len());
235 for blob in &finalized_blobs {
236 let instructions = relevant_instructions_map
237 .get(blob)
238 .expect("This should never happen since we at least have the finalize instruction");
239
240 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
241 blobs.insert(blob, blob_data);
242 }
243 }
244
245 if blobs.len() == finalized_blobs.len() {
247 let blob_data = futures::stream::iter(blobs.values())
248 .map(|data| async move { self.decompress_and_decode(data).await })
249 .buffer_unordered(DEFAULT_CONCURRENCY)
250 .try_collect()
251 .await?;
252
253 return Ok(blob_data);
254 }
255
256 let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
257
258 let block_slots = self
259 .rpc_client
260 .get_blocks_with_commitment(
261 slot - lookback_slots,
262 Some(slot - 1),
263 self.rpc_client.commitment(),
264 )
265 .await?;
266
267 for slot in block_slots.into_iter().rev() {
268 let block = self
269 .rpc_client
270 .get_block_with_config(slot, block_config)
271 .await?;
272 let Some(transactions) = block.transactions else {
273 continue;
275 };
276 let new_relevant_instructions = extract_relevant_instructions(
277 &self.program_id,
278 &transactions
279 .iter()
280 .filter_map(|tx| match &tx.meta {
281 Some(meta) if meta.status.is_err() => None,
282 _ => tx.transaction.decode(),
283 })
284 .collect::<Vec<_>>(),
285 );
286 filter_relevant_instructions(
287 new_relevant_instructions,
288 &finalized_blobs,
289 &mut relevant_instructions_map,
290 );
291 for blob in &finalized_blobs {
292 if blobs.contains_key(blob) {
293 continue;
294 }
295 let instructions = relevant_instructions_map.get(blob).expect(
296 "This should never happen since we at least have the finalize instruction",
297 );
298
299 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
300 {
301 blobs.insert(blob, blob_data);
302 }
303 }
304 if blobs.len() == finalized_blobs.len() {
305 break;
306 }
307 }
308
309 let blob_data = futures::stream::iter(blobs.values())
310 .map(|data| async move { self.decompress_and_decode(data).await })
311 .buffer_unordered(DEFAULT_CONCURRENCY)
312 .try_collect()
313 .await?;
314
315 Ok(blob_data)
316 }
317
318 pub async fn get_blob_messages(
323 &self,
324 slot: u64,
325 identifier: BloberIdentifier,
326 ) -> DataAnchorClientResult<Vec<(Pubkey, VersionedMessage)>> {
327 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
328
329 let block: EncodedConfirmedBlock = self
330 .rpc_client
331 .get_block_with_config(
332 slot,
333 RpcBlockConfig {
334 commitment: Some(self.rpc_client.commitment()),
335 encoding: Some(UiTransactionEncoding::Base58),
336 max_supported_transaction_version: Some(0),
337 ..Default::default()
338 },
339 )
340 .await?
341 .into();
342
343 let finalized = block
344 .transactions
345 .iter()
346 .filter_map(|tx| match &tx.meta {
347 Some(meta) if meta.status.is_err() => None,
348 _ => tx.transaction.decode(),
349 })
350 .filter_map(|tx| {
351 let instructions = tx
352 .message
353 .instructions()
354 .iter()
355 .filter_map(|compiled_instruction| {
356 Some(RelevantInstructionWithAccounts {
357 blob: get_account_at_index(
358 &tx,
359 compiled_instruction,
360 BLOB_ACCOUNT_INSTRUCTION_IDX,
361 )?,
362 blober: get_account_at_index(
363 &tx,
364 compiled_instruction,
365 BLOB_BLOBER_INSTRUCTION_IDX,
366 )?,
367 instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
368 })
369 })
370 .filter(|instruction| {
371 instruction.blober == blober
372 && matches!(
373 instruction.instruction,
374 RelevantInstruction::FinalizeBlob(_)
375 )
376 })
377 .collect::<Vec<_>>();
378
379 instructions.is_empty().then_some(
380 instructions
381 .iter()
382 .map(|instruction| (instruction.blob, tx.message.clone()))
383 .collect::<Vec<_>>(),
384 )
385 })
386 .flatten()
387 .collect::<Vec<_>>();
388
389 Ok(finalized)
390 }
391
392 pub async fn list_blobers(&self) -> DataAnchorClientResult<Vec<BloberWithNamespace>> {
394 let blobers = self
395 .rpc_client
396 .get_program_accounts_with_config(
397 &self.program_id,
398 RpcProgramAccountsConfig {
399 account_config: RpcAccountInfoConfig {
400 encoding: Some(UiAccountEncoding::Base64),
401 ..Default::default()
402 },
403 ..Default::default()
404 },
405 )
406 .await?;
407
408 Ok(blobers
409 .into_iter()
410 .filter_map(|(pubkey, account)| {
411 if !account.data.starts_with(Blober::DISCRIMINATOR) {
412 return None;
413 }
414
415 let mut state = account.data.get(Blober::DISCRIMINATOR.len()..)?;
416 let blober_state = Blober::deserialize(&mut state).ok()?;
417
418 (blober_state.caller == self.payer.pubkey()).then_some(BloberWithNamespace {
419 address: pubkey.into(),
420 namespace: blober_state.namespace,
421 })
422 })
423 .collect())
424 }
425
426 pub async fn get_blober(
428 &self,
429 identifier: BloberIdentifier,
430 ) -> DataAnchorClientResult<Option<Blober>> {
431 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
432 let account = self
433 .rpc_client
434 .get_account_with_commitment(&blober, self.rpc_client.commitment())
435 .await?
436 .value;
437
438 let Some(account) = account else {
439 return Ok(None);
440 };
441
442 if !account.data.starts_with(Blober::DISCRIMINATOR) {
443 return Err(LedgerDataBlobError::InvalidBloberAccount(
444 "Invalid discriminator".to_owned(),
445 )
446 .into());
447 }
448
449 let mut state = account.data.get(Blober::DISCRIMINATOR.len()..).ok_or(
450 LedgerDataBlobError::InvalidBloberAccount("No state data".to_owned()),
451 )?;
452
453 if state.is_empty() {
454 return Err(
455 LedgerDataBlobError::InvalidBloberAccount("Empty state data".to_owned()).into(),
456 );
457 }
458
459 let blober = Blober::deserialize(&mut state).map_err(|e| {
460 LedgerDataBlobError::InvalidBloberAccount(format!("Failed to deserialize: {e:?}"))
461 })?;
462
463 Ok(Some(blober))
464 }
465
466 pub async fn get_checkpoint(
468 &self,
469 blober: BloberIdentifier,
470 ) -> DataAnchorClientResult<Option<Checkpoint>> {
471 let blober = blober.to_blober_address(self.program_id, self.payer.pubkey());
472 let checkpoint_address = find_checkpoint_address(self.program_id, blober);
473 let account = self
474 .rpc_client
475 .get_account_with_commitment(&checkpoint_address, self.rpc_client.commitment())
476 .await?
477 .value;
478
479 let Some(account) = account else {
480 return Ok(None);
481 };
482
483 if account.owner != self.program_id {
484 return Err(LedgerDataBlobError::AccountNotOwnedByProgram.into());
485 }
486
487 if !account.data.starts_with(Checkpoint::DISCRIMINATOR) {
488 return Err(LedgerDataBlobError::InvalidCheckpointAccount(
489 "Invalid discriminator".to_owned(),
490 )
491 .into());
492 }
493
494 let mut state = account.data.get(Checkpoint::DISCRIMINATOR.len()..).ok_or(
495 LedgerDataBlobError::InvalidCheckpointAccount("No state data".to_owned()),
496 )?;
497
498 if state.is_empty() {
499 return Err(LedgerDataBlobError::InvalidCheckpointAccount(
500 "Empty state data".to_owned(),
501 )
502 .into());
503 }
504
505 let checkpoint = Checkpoint::deserialize(&mut state).map_err(|e| {
506 LedgerDataBlobError::InvalidCheckpointAccount(format!("Failed to deserialize: {e:?}"))
507 })?;
508
509 Ok(Some(checkpoint))
510 }
511}