data_anchor_client/client/
ledger_client.rs1use std::collections::{HashMap, HashSet};
2
3use anchor_lang::{AccountDeserialize, prelude::Pubkey, solana_program::message::VersionedMessage};
4use data_anchor_api::{
5 BloberWithNamespace, LedgerDataBlobError, RelevantInstruction, RelevantInstructionWithAccounts,
6 extract_relevant_instructions, get_account_at_index, get_blob_data_from_instructions,
7};
8use data_anchor_blober::{
9 BLOB_ACCOUNT_INSTRUCTION_IDX, BLOB_BLOBER_INSTRUCTION_IDX, checkpoint::Checkpoint,
10 find_checkpoint_address, state::blober::Blober,
11};
12use data_anchor_utils::encoding::Decodable;
13use futures::{StreamExt, TryStreamExt};
14use solana_account_decoder_client_types::UiAccountEncoding;
15use solana_client::rpc_config::{
16 RpcAccountInfoConfig, RpcBlockConfig, RpcProgramAccountsConfig, RpcTransactionConfig,
17};
18use solana_rpc_client_api::client_error::Error;
19use solana_signature::Signature;
20use solana_signer::Signer;
21use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
22
23use super::BloberIdentifier;
24use crate::{
25 DataAnchorClient, DataAnchorClientResult, OutcomeError,
26 constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
27 helpers::filter_relevant_instructions,
28};
29
30#[derive(thiserror::Error, Debug)]
32pub enum ChainError {
33 #[error("Failed to query Solana RPC: {0}")]
35 SolanaRpc(#[from] Error),
36 #[error(transparent)]
38 TransactionFailure(#[from] OutcomeError),
39 #[error("Fee Strategy conversion failure: {0}")]
41 ConversionError(&'static str),
42 #[error("Failed to declare blob: {0}")]
44 DeclareBlob(OutcomeError),
45 #[error("Failed to insert chunks: {0}")]
47 InsertChunks(OutcomeError),
48 #[error("Failed to finalize blob: {0}")]
50 FinalizeBlob(OutcomeError),
51 #[error("Failed to discard blob: {0}")]
53 DiscardBlob(OutcomeError),
54 #[error("Failed to compound upload: {0}")]
56 CompoundUpload(OutcomeError),
57 #[error("Failed to initialize blober: {0}")]
59 InitializeBlober(OutcomeError),
60 #[error("Failed to close blober: {0}")]
62 CloseBlober(OutcomeError),
63 #[error("Missing blober namespace. Namespace is required for creating a blober account.")]
65 MissingBloberNamespace,
66 #[error("Account already exists: {0}")]
68 AccountExists(String),
69 #[error("Account does not exist: {0}")]
71 AccountDoesNotExist(String),
72 #[error(
74 "Payer has insufficient balance to pay for the transaction: required {0} lamports, available {1} lamports"
75 )]
76 InsufficientBalance(u64, u64),
77 #[error("Could not calculate cost")]
79 CouldNotCalculateCost,
80 #[error("Failed to configure checkpoint: {0}")]
82 ConfigureCheckpoint(OutcomeError),
83 #[error("Provided proof commitment does not match the blober's address expected {0}, got {1}")]
85 ProofBloberMismatch(Pubkey, Pubkey),
86 #[error("Checkpoint account is not up to date with current blober state")]
87 CheckpointNotUpToDate,
88}
89
90impl DataAnchorClient {
91 pub async fn get_ledger_blobs_from_signatures<T>(
93 &self,
94 identifier: BloberIdentifier,
95 signatures: Vec<Signature>,
96 ) -> DataAnchorClientResult<T>
97 where
98 T: Decodable,
99 {
100 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
101
102 let relevant_transactions = futures::stream::iter(signatures)
103 .map(|signature| async move {
104 self.rpc_client
105 .get_transaction_with_config(
106 &signature,
107 RpcTransactionConfig {
108 commitment: Some(self.rpc_client.commitment()),
109 encoding: Some(UiTransactionEncoding::Base58),
110 max_supported_transaction_version: Some(0),
111 },
112 )
113 .await
114 })
115 .buffer_unordered(DEFAULT_CONCURRENCY)
116 .collect::<Vec<_>>()
117 .await
118 .into_iter()
119 .collect::<Result<Vec<_>, _>>()?;
120
121 let relevant_instructions = extract_relevant_instructions(
122 &self.program_id,
123 &relevant_transactions
124 .iter()
125 .filter_map(|encoded| match &encoded.transaction.meta {
126 Some(meta) if meta.status.is_err() => None,
127 _ => encoded.transaction.transaction.decode(),
128 })
129 .collect::<Vec<_>>(),
130 );
131
132 let declares = relevant_instructions
133 .iter()
134 .filter_map(|instruction| {
135 (instruction.blober == blober
136 && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
137 .then_some(instruction.blob)
138 })
139 .collect::<Vec<Pubkey>>();
140
141 let Some(blob) = declares.first() else {
142 return Err(LedgerDataBlobError::DeclareNotFound.into());
143 };
144
145 if declares.len() > 1 {
146 return Err(LedgerDataBlobError::MultipleDeclares.into());
147 }
148
149 if relevant_instructions
150 .iter()
151 .filter(|instruction| {
152 matches!(
153 instruction.instruction,
154 RelevantInstruction::FinalizeBlob(_)
155 )
156 })
157 .count()
158 > 1
159 {
160 return Err(LedgerDataBlobError::MultipleFinalizes.into());
161 }
162
163 let data = get_blob_data_from_instructions(&relevant_instructions, blober, *blob)?;
164
165 self.decompress_and_decode(&data).await
166 }
167
168 pub async fn get_ledger_blobs<T>(
170 &self,
171 slot: u64,
172 identifier: BloberIdentifier,
173 lookback_slots: Option<u64>,
174 ) -> DataAnchorClientResult<Vec<T>>
175 where
176 T: Decodable,
177 {
178 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
179
180 let block_config = RpcBlockConfig {
181 commitment: Some(self.rpc_client.commitment()),
182 encoding: Some(UiTransactionEncoding::Base58),
183 max_supported_transaction_version: Some(0),
184 ..Default::default()
185 };
186 let block = self
187 .rpc_client
188 .get_block_with_config(slot, block_config)
189 .await?;
190
191 let Some(transactions) = block.transactions else {
192 return Ok(Vec::new());
194 };
195
196 let relevant_instructions = extract_relevant_instructions(
197 &self.program_id,
198 &transactions
199 .iter()
200 .filter_map(|tx| match &tx.meta {
201 Some(meta) if meta.status.is_err() => None,
202 _ => tx.transaction.decode(),
203 })
204 .collect::<Vec<_>>(),
205 );
206 let finalized_blobs = relevant_instructions
207 .iter()
208 .filter_map(|instruction| {
209 (instruction.blober == blober
210 && matches!(
211 instruction.instruction,
212 RelevantInstruction::FinalizeBlob(_)
213 ))
214 .then_some(instruction.blob)
215 })
216 .collect::<HashSet<Pubkey>>();
217
218 let mut relevant_instructions_map = HashMap::new();
219 filter_relevant_instructions(
220 relevant_instructions,
221 &finalized_blobs,
222 &mut relevant_instructions_map,
223 );
224
225 let mut blobs = HashMap::with_capacity(finalized_blobs.len());
226 for blob in &finalized_blobs {
227 let instructions = relevant_instructions_map
228 .get(blob)
229 .expect("This should never happen since we at least have the finalize instruction");
230
231 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
232 blobs.insert(blob, blob_data);
233 }
234 }
235
236 if blobs.len() == finalized_blobs.len() {
238 let blob_data = futures::stream::iter(blobs.values())
239 .map(|data| async move { self.decompress_and_decode(data).await })
240 .buffer_unordered(DEFAULT_CONCURRENCY)
241 .try_collect()
242 .await?;
243
244 return Ok(blob_data);
245 }
246
247 let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
248
249 let block_slots = self
250 .rpc_client
251 .get_blocks_with_commitment(
252 slot - lookback_slots,
253 Some(slot - 1),
254 self.rpc_client.commitment(),
255 )
256 .await?;
257
258 for slot in block_slots.into_iter().rev() {
259 let block = self
260 .rpc_client
261 .get_block_with_config(slot, block_config)
262 .await?;
263 let Some(transactions) = block.transactions else {
264 continue;
266 };
267 let new_relevant_instructions = extract_relevant_instructions(
268 &self.program_id,
269 &transactions
270 .iter()
271 .filter_map(|tx| match &tx.meta {
272 Some(meta) if meta.status.is_err() => None,
273 _ => tx.transaction.decode(),
274 })
275 .collect::<Vec<_>>(),
276 );
277 filter_relevant_instructions(
278 new_relevant_instructions,
279 &finalized_blobs,
280 &mut relevant_instructions_map,
281 );
282 for blob in &finalized_blobs {
283 if blobs.contains_key(blob) {
284 continue;
285 }
286 let instructions = relevant_instructions_map.get(blob).expect(
287 "This should never happen since we at least have the finalize instruction",
288 );
289
290 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
291 {
292 blobs.insert(blob, blob_data);
293 }
294 }
295 if blobs.len() == finalized_blobs.len() {
296 break;
297 }
298 }
299
300 let blob_data = futures::stream::iter(blobs.values())
301 .map(|data| async move { self.decompress_and_decode(data).await })
302 .buffer_unordered(DEFAULT_CONCURRENCY)
303 .try_collect()
304 .await?;
305
306 Ok(blob_data)
307 }
308
309 pub async fn get_blob_messages(
314 &self,
315 slot: u64,
316 identifier: BloberIdentifier,
317 ) -> DataAnchorClientResult<Vec<(Pubkey, VersionedMessage)>> {
318 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
319
320 let block: EncodedConfirmedBlock = self
321 .rpc_client
322 .get_block_with_config(
323 slot,
324 RpcBlockConfig {
325 commitment: Some(self.rpc_client.commitment()),
326 encoding: Some(UiTransactionEncoding::Base58),
327 max_supported_transaction_version: Some(0),
328 ..Default::default()
329 },
330 )
331 .await?
332 .into();
333
334 let finalized = block
335 .transactions
336 .iter()
337 .filter_map(|tx| match &tx.meta {
338 Some(meta) if meta.status.is_err() => None,
339 _ => tx.transaction.decode(),
340 })
341 .filter_map(|tx| {
342 let instructions = tx
343 .message
344 .instructions()
345 .iter()
346 .filter_map(|compiled_instruction| {
347 Some(RelevantInstructionWithAccounts {
348 blob: get_account_at_index(
349 tx.message.static_account_keys(),
350 compiled_instruction,
351 BLOB_ACCOUNT_INSTRUCTION_IDX,
352 )?,
353 blober: get_account_at_index(
354 tx.message.static_account_keys(),
355 compiled_instruction,
356 BLOB_BLOBER_INSTRUCTION_IDX,
357 )?,
358 instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
359 })
360 })
361 .filter(|instruction| {
362 instruction.blober == blober
363 && matches!(
364 instruction.instruction,
365 RelevantInstruction::FinalizeBlob(_)
366 )
367 })
368 .collect::<Vec<_>>();
369
370 instructions.is_empty().then_some(
371 instructions
372 .iter()
373 .map(|instruction| (instruction.blob, tx.message.clone()))
374 .collect::<Vec<_>>(),
375 )
376 })
377 .flatten()
378 .collect::<Vec<_>>();
379
380 Ok(finalized)
381 }
382
383 pub async fn list_blobers(&self) -> DataAnchorClientResult<Vec<BloberWithNamespace>> {
385 let blobers = self
386 .rpc_client
387 .get_program_accounts_with_config(
388 &self.program_id,
389 RpcProgramAccountsConfig {
390 account_config: RpcAccountInfoConfig {
391 encoding: Some(UiAccountEncoding::Base64),
392 ..Default::default()
393 },
394 ..Default::default()
395 },
396 )
397 .await?;
398
399 Ok(blobers
400 .into_iter()
401 .filter_map(|(pubkey, account)| {
402 let blober_state = Blober::try_deserialize(&mut account.data.as_slice()).ok()?;
403
404 (blober_state.caller == self.payer.pubkey()).then_some(BloberWithNamespace {
405 address: pubkey.into(),
406 namespace: blober_state.namespace,
407 })
408 })
409 .collect())
410 }
411
412 pub async fn get_blober(
414 &self,
415 identifier: BloberIdentifier,
416 ) -> DataAnchorClientResult<Option<Blober>> {
417 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
418 let account = self
419 .rpc_client
420 .get_account_with_commitment(&blober, self.rpc_client.commitment())
421 .await?
422 .value;
423
424 let Some(account) = account else {
425 return Ok(None);
426 };
427
428 let mut data = account.data.as_slice();
429
430 let blober = Blober::try_deserialize(&mut data).map_err(LedgerDataBlobError::from)?;
431
432 Ok(Some(blober))
433 }
434
435 pub async fn get_checkpoint(
437 &self,
438 blober: BloberIdentifier,
439 ) -> DataAnchorClientResult<Option<Checkpoint>> {
440 let blober = blober.to_blober_address(self.program_id, self.payer.pubkey());
441 let checkpoint_address = find_checkpoint_address(self.program_id, blober);
442 let account = self
443 .rpc_client
444 .get_account_with_commitment(&checkpoint_address, self.rpc_client.commitment())
445 .await?
446 .value;
447
448 let Some(account) = account else {
449 return Ok(None);
450 };
451
452 if account.owner != self.program_id {
453 return Err(LedgerDataBlobError::AccountNotOwnedByProgram.into());
454 }
455
456 let mut data = account.data.as_slice();
457
458 let checkpoint =
459 Checkpoint::try_deserialize(&mut data).map_err(LedgerDataBlobError::from)?;
460
461 Ok(Some(checkpoint))
462 }
463}