1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 sync::Arc,
5 time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use data_anchor_api::{
12 extract_relevant_instructions, get_account_at_index, BlobsByBlober, BlobsByPayer,
13 CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
14};
15use data_anchor_blober::{
16 find_blob_address, find_blober_address,
17 instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
18 state::blober::Blober,
19 BLOB_ACCOUNT_INSTRUCTION_IDX, BLOB_BLOBER_INSTRUCTION_IDX, CHUNK_SIZE,
20 COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE,
21};
22use futures::StreamExt;
23use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
24use solana_cli_config::Config;
25use solana_client::rpc_config::RpcTransactionConfig;
26use solana_rpc_client::nonblocking::rpc_client::RpcClient;
27use solana_rpc_client_api::config::RpcBlockConfig;
28use solana_sdk::{
29 commitment_config::CommitmentConfig,
30 message::VersionedMessage,
31 pubkey::Pubkey,
32 signature::{Keypair, Signature},
33 signer::Signer,
34};
35use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
36use tracing::{info_span, Instrument, Span};
37
38use crate::{
39 batch_client::{BatchClient, SuccessfulTransaction},
40 constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
41 fees::{Fee, FeeStrategy, Lamports, Priority},
42 helpers::{
43 check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
44 get_unique_timestamp,
45 },
46 tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
47 types::{IndexerError, TransactionType, UploadBlobError},
48 BloberClientError, BloberClientResult, LedgerDataBlobError,
49};
50
51#[derive(Builder, Clone)]
52pub struct BloberClient {
53 #[builder(getter(name = get_payer, vis = ""))]
54 pub(crate) payer: Arc<Keypair>,
55 pub(crate) program_id: Pubkey,
56 pub(crate) rpc_client: Arc<RpcClient>,
57 pub(crate) batch_client: BatchClient,
58 pub(crate) indexer_client: Option<Arc<WsClient>>,
60 #[builder(default = false)]
61 pub(crate) helius_fee_estimate: bool,
62}
63
64impl<State: blober_client_builder::State> BloberClientBuilder<State> {
65 pub async fn indexer_from_url(
77 self,
78 indexer_url: &str,
79 ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
80 where
81 State::IndexerClient: IsUnset,
82 {
83 let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
84 Ok(self.indexer_client(Arc::new(indexer_client)))
85 }
86
87 pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
109 where
110 State::Payer: IsSet,
111 State::ProgramId: IsSet,
112 State::RpcClient: IsUnset,
113 State::BatchClient: IsUnset,
114 {
115 let rpc_client = Arc::new(RpcClient::new_with_commitment(
116 solana_config.json_rpc_url.clone(),
117 CommitmentConfig::from_str(&solana_config.commitment)?,
118 ));
119 let payer = self.get_payer().clone();
120 Ok(self
121 .rpc_client(rpc_client.clone())
122 .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
123 .build())
124 }
125
126 pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
127 where
128 State::HeliusFeeEstimate: IsUnset,
129 {
130 self.helius_fee_estimate(true)
131 }
132}
133
134impl BloberClient {
135 pub fn rpc_client(&self) -> Arc<RpcClient> {
137 self.rpc_client.clone()
138 }
139
140 pub fn payer(&self) -> Arc<Keypair> {
142 self.payer.clone()
143 }
144
145 pub async fn initialize_blober(
147 &self,
148 fee_strategy: FeeStrategy,
149 namespace: &str,
150 timeout: Option<Duration>,
151 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
152 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
153
154 let fee_strategy = self
155 .convert_fee_strategy_to_fixed(
156 fee_strategy,
157 &[blober],
158 TransactionType::InitializeBlober,
159 )
160 .in_current_span()
161 .await?;
162
163 let msg = Initialize::build_message(MessageArguments::new(
164 self.program_id,
165 blober,
166 &self.payer,
167 self.rpc_client.clone(),
168 fee_strategy,
169 self.helius_fee_estimate,
170 (namespace.to_owned(), blober),
171 ))
172 .await
173 .expect("infallible with a fixed fee strategy");
174
175 let span = info_span!(parent: Span::current(), "initialize_blober");
176 Ok(check_outcomes(
177 self.batch_client
178 .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
179 .instrument(span)
180 .await,
181 )
182 .map_err(UploadBlobError::InitializeBlober)?)
183 }
184
185 pub async fn close_blober(
187 &self,
188 fee_strategy: FeeStrategy,
189 namespace: &str,
190 timeout: Option<Duration>,
191 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
192 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
193
194 let fee_strategy = self
195 .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
196 .in_current_span()
197 .await?;
198
199 let msg = Close::build_message(MessageArguments::new(
200 self.program_id,
201 blober,
202 &self.payer,
203 self.rpc_client.clone(),
204 fee_strategy,
205 self.helius_fee_estimate,
206 (),
207 ))
208 .await
209 .expect("infallible with a fixed fee strategy");
210
211 let span = info_span!(parent: Span::current(), "close_blober");
212 Ok(check_outcomes(
213 self.batch_client
214 .send(vec![(TransactionType::CloseBlober, msg)], timeout)
215 .instrument(span)
216 .await,
217 )
218 .map_err(UploadBlobError::CloseBlober)?)
219 }
220
221 pub async fn upload_blob(
228 &self,
229 blob_data: &[u8],
230 fee_strategy: FeeStrategy,
231 namespace: &str,
232 timeout: Option<Duration>,
233 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
234 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
235 let timestamp = get_unique_timestamp();
236
237 let blob = find_blob_address(
238 self.program_id,
239 self.payer.pubkey(),
240 blober,
241 timestamp,
242 blob_data.len(),
243 );
244
245 let upload_messages = self
246 .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
247 .await?;
248
249 let res = self
250 .do_upload(upload_messages, timeout)
251 .in_current_span()
252 .await;
253
254 if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
255 self.discard_blob(fee_strategy, blob, namespace, timeout)
256 .await
257 } else {
258 res
259 }
260 }
261
262 pub async fn discard_blob(
265 &self,
266 fee_strategy: FeeStrategy,
267 blob: Pubkey,
268 namespace: &str,
269 timeout: Option<Duration>,
270 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
271 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
272
273 let fee_strategy = self
274 .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
275 .in_current_span()
276 .await?;
277
278 let msg = DiscardBlob::build_message(MessageArguments::new(
279 self.program_id,
280 blober,
281 &self.payer,
282 self.rpc_client.clone(),
283 fee_strategy,
284 self.helius_fee_estimate,
285 blob,
286 ))
287 .in_current_span()
288 .await
289 .expect("infallible with a fixed fee strategy");
290
291 let span = info_span!(parent: Span::current(), "discard_blob");
292
293 Ok(check_outcomes(
294 self.batch_client
295 .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
296 .instrument(span)
297 .await,
298 )
299 .map_err(UploadBlobError::DiscardBlob)?)
300 }
301
302 pub async fn estimate_fees(
309 &self,
310 blob_size: usize,
311 blober: Pubkey,
312 priority: Priority,
313 ) -> BloberClientResult<Fee> {
314 let prioritization_fee_rate = priority
315 .get_priority_fee_estimate(
316 &self.rpc_client,
317 &[Pubkey::new_unique(), blober, self.payer.pubkey()],
318 self.helius_fee_estimate,
319 )
320 .await?;
321
322 let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
323
324 let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
325 (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
326 } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
327 (
328 CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
329 CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
330 )
331 } else {
332 (
333 DeclareBlob::COMPUTE_UNIT_LIMIT
334 + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
335 + CompoundFinalize::COMPUTE_UNIT_LIMIT,
336 DeclareBlob::NUM_SIGNATURES
337 + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
338 + CompoundFinalize::NUM_SIGNATURES,
339 )
340 };
341
342 let price_per_signature = Lamports::new(5000);
345
346 let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
347
348 Ok(Fee {
349 num_signatures,
350 price_per_signature,
351 compute_unit_limit,
352 prioritization_fee_rate,
353 blob_account_size,
354 })
355 }
356
357 pub async fn get_ledger_blobs_from_signatures(
359 &self,
360 namespace: &str,
361 payer_pubkey: Option<Pubkey>,
362 signatures: Vec<Signature>,
363 ) -> BloberClientResult<Vec<u8>> {
364 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
365 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
366
367 let relevant_transactions = futures::stream::iter(signatures)
368 .map(|signature| async move {
369 self.rpc_client
370 .get_transaction_with_config(
371 &signature,
372 RpcTransactionConfig {
373 commitment: Some(self.rpc_client.commitment()),
374 encoding: Some(UiTransactionEncoding::Base58),
375 ..Default::default()
376 },
377 )
378 .await
379 })
380 .buffer_unordered(DEFAULT_CONCURRENCY)
381 .collect::<Vec<_>>()
382 .await
383 .into_iter()
384 .collect::<Result<Vec<_>, _>>()?;
385
386 let relevant_instructions = extract_relevant_instructions(
387 &self.program_id,
388 &relevant_transactions
389 .iter()
390 .filter_map(|encoded| match &encoded.transaction.meta {
391 Some(meta) if meta.status.is_err() => None,
392 _ => encoded.transaction.transaction.decode(),
393 })
394 .collect::<Vec<_>>(),
395 );
396
397 let declares = relevant_instructions
398 .iter()
399 .filter_map(|instruction| {
400 (instruction.blober == blober
401 && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
402 .then_some(instruction.blob)
403 })
404 .collect::<Vec<Pubkey>>();
405
406 let Some(blob) = declares.first() else {
407 return Err(LedgerDataBlobError::DeclareNotFound.into());
408 };
409
410 if declares.len() > 1 {
411 return Err(LedgerDataBlobError::MultipleDeclares.into());
412 }
413
414 if relevant_instructions
415 .iter()
416 .filter(|instruction| {
417 matches!(
418 instruction.instruction,
419 RelevantInstruction::FinalizeBlob(_)
420 )
421 })
422 .count()
423 > 1
424 {
425 return Err(LedgerDataBlobError::MultipleFinalizes.into());
426 }
427
428 Ok(get_blob_data_from_instructions(
429 &relevant_instructions,
430 blober,
431 *blob,
432 )?)
433 }
434
435 pub async fn get_ledger_blobs(
437 &self,
438 slot: u64,
439 namespace: &str,
440 payer_pubkey: Option<Pubkey>,
441 lookback_slots: Option<u64>,
442 ) -> BloberClientResult<Vec<Vec<u8>>> {
443 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
444 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
445
446 let block_config = RpcBlockConfig {
447 commitment: Some(self.rpc_client.commitment()),
448 encoding: Some(UiTransactionEncoding::Base58),
449 ..Default::default()
450 };
451 let block = self
452 .rpc_client
453 .get_block_with_config(slot, block_config)
454 .await?;
455
456 let Some(transactions) = block.transactions else {
457 return Ok(Vec::new());
459 };
460
461 let relevant_instructions = extract_relevant_instructions(
462 &self.program_id,
463 &transactions
464 .iter()
465 .filter_map(|tx| match &tx.meta {
466 Some(meta) if meta.status.is_err() => None,
467 _ => tx.transaction.decode(),
468 })
469 .collect::<Vec<_>>(),
470 );
471 let finalized_blobs = relevant_instructions
472 .iter()
473 .filter_map(|instruction| {
474 (instruction.blober == blober
475 && matches!(
476 instruction.instruction,
477 RelevantInstruction::FinalizeBlob(_)
478 ))
479 .then_some(instruction.blob)
480 })
481 .collect::<HashSet<Pubkey>>();
482
483 let mut relevant_instructions_map = HashMap::new();
484 filter_relevant_instructions(
485 relevant_instructions,
486 &finalized_blobs,
487 &mut relevant_instructions_map,
488 );
489
490 let mut blobs = HashMap::with_capacity(finalized_blobs.len());
491 for blob in &finalized_blobs {
492 let instructions = relevant_instructions_map
493 .get(blob)
494 .expect("This should never happen since we at least have the finalize instruction");
495
496 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
497 blobs.insert(blob, blob_data);
498 }
499 }
500
501 if blobs.len() == finalized_blobs.len() {
503 return Ok(blobs.values().cloned().collect());
504 }
505
506 let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
507
508 let block_slots = self
509 .rpc_client
510 .get_blocks_with_commitment(
511 slot - lookback_slots,
512 Some(slot - 1),
513 self.rpc_client.commitment(),
514 )
515 .await?;
516
517 for slot in block_slots.into_iter().rev() {
518 let block = self
519 .rpc_client
520 .get_block_with_config(slot, block_config)
521 .await?;
522 let Some(transactions) = block.transactions else {
523 continue;
525 };
526 let new_relevant_instructions = extract_relevant_instructions(
527 &self.program_id,
528 &transactions
529 .iter()
530 .filter_map(|tx| match &tx.meta {
531 Some(meta) if meta.status.is_err() => None,
532 _ => tx.transaction.decode(),
533 })
534 .collect::<Vec<_>>(),
535 );
536 filter_relevant_instructions(
537 new_relevant_instructions,
538 &finalized_blobs,
539 &mut relevant_instructions_map,
540 );
541 for blob in &finalized_blobs {
542 if blobs.contains_key(blob) {
543 continue;
544 }
545 let instructions = relevant_instructions_map.get(blob).expect(
546 "This should never happen since we at least have the finalize instruction",
547 );
548 println!("total {}", instructions.len());
549
550 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
551 {
552 blobs.insert(blob, blob_data);
553 }
554 }
555 if blobs.len() == finalized_blobs.len() {
556 break;
557 }
558 }
559
560 Ok(blobs.values().cloned().collect())
561 }
562
563 pub async fn get_blobs(
565 &self,
566 slot: u64,
567 namespace: &str,
568 payer_pubkey: Option<Pubkey>,
569 ) -> BloberClientResult<Vec<Vec<u8>>> {
570 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
571 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
572
573 loop {
574 let blobs = self
575 .indexer()
576 .get_blobs(blober, slot)
577 .await
578 .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
579 if let Some(blobs) = blobs {
580 return Ok(blobs);
581 }
582 tokio::time::sleep(Duration::from_millis(100)).await;
583 }
584 }
585
586 pub async fn get_blobs_by_blober(
588 &self,
589 blober_blobs: BlobsByBlober,
590 ) -> BloberClientResult<Vec<Vec<u8>>> {
591 let blober = blober_blobs.blober;
592
593 self.indexer()
594 .get_blobs_by_blober(blober_blobs)
595 .await
596 .map_err(|e| IndexerError::BlobsForBlober(blober.to_string(), e.to_string()).into())
597 }
598
599 pub async fn get_blobs_by_payer(
601 &self,
602 payer_blobs: BlobsByPayer,
603 ) -> BloberClientResult<Vec<Vec<u8>>> {
604 let payer = payer_blobs.payer;
605
606 self.indexer()
607 .get_blobs_by_payer(payer_blobs)
608 .await
609 .map_err(|e| IndexerError::BlobsForPayer(payer.to_string(), e.to_string()).into())
610 }
611
612 pub async fn get_slot_proof(
614 &self,
615 slot: u64,
616 namespace: &str,
617 payer_pubkey: Option<Pubkey>,
618 ) -> BloberClientResult<CompoundProof> {
619 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
620 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
621
622 loop {
623 let proof = self
624 .indexer()
625 .get_proof(blober, slot)
626 .await
627 .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
628 if let Some(proofs) = proof {
629 return Ok(proofs);
630 }
631 tokio::time::sleep(Duration::from_millis(100)).await;
632 }
633 }
634
635 pub async fn get_blob_proof(&self, blob: Pubkey) -> BloberClientResult<Option<CompoundProof>> {
637 self.indexer()
638 .get_proof_for_blob(blob)
639 .await
640 .map_err(|e| IndexerError::ProofForBlob(blob.to_string(), e.to_string()).into())
641 }
642
643 pub async fn get_blob_messages(
648 &self,
649 slot: u64,
650 namespace: &str,
651 payer_pubkey: Option<Pubkey>,
652 ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
653 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
654 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
655
656 let block: EncodedConfirmedBlock = self
657 .rpc_client
658 .get_block_with_config(
659 slot,
660 RpcBlockConfig {
661 commitment: Some(self.rpc_client.commitment()),
662 encoding: Some(UiTransactionEncoding::Base58),
663 ..Default::default()
664 },
665 )
666 .await?
667 .into();
668
669 let finalized = block
670 .transactions
671 .iter()
672 .filter_map(|tx| match &tx.meta {
673 Some(meta) if meta.status.is_err() => None,
674 _ => tx.transaction.decode(),
675 })
676 .filter_map(|tx| {
677 let instructions = tx
678 .message
679 .instructions()
680 .iter()
681 .filter_map(|compiled_instruction| {
682 Some(RelevantInstructionWithAccounts {
683 blob: get_account_at_index(
684 &tx,
685 compiled_instruction,
686 BLOB_ACCOUNT_INSTRUCTION_IDX,
687 )?,
688 blober: get_account_at_index(
689 &tx,
690 compiled_instruction,
691 BLOB_BLOBER_INSTRUCTION_IDX,
692 )?,
693 instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
694 })
695 })
696 .filter(|instruction| {
697 instruction.blober == blober
698 && matches!(
699 instruction.instruction,
700 RelevantInstruction::FinalizeBlob(_)
701 )
702 })
703 .collect::<Vec<_>>();
704
705 instructions.is_empty().then_some(
706 instructions
707 .iter()
708 .map(|instruction| (instruction.blob, tx.message.clone()))
709 .collect::<Vec<_>>(),
710 )
711 })
712 .flatten()
713 .collect::<Vec<_>>();
714
715 Ok(finalized)
716 }
717}