1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 sync::Arc,
5 time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use futures::StreamExt;
12use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
13use nitro_da_blober::{
14 find_blob_address, find_blober_address,
15 instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
16 state::blober::Blober,
17 CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE,
18};
19use nitro_da_indexer_api::{
20 extract_relevant_instructions, get_account_at_index, BlobsByBlober, BlobsByPayer,
21 CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
22};
23use solana_cli_config::Config;
24use solana_client::rpc_config::RpcTransactionConfig;
25use solana_rpc_client::nonblocking::rpc_client::RpcClient;
26use solana_rpc_client_api::config::RpcBlockConfig;
27use solana_sdk::{
28 commitment_config::CommitmentConfig,
29 message::VersionedMessage,
30 pubkey::Pubkey,
31 signature::{Keypair, Signature},
32 signer::Signer,
33};
34use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
35use tracing::{info_span, Instrument, Span};
36
37use crate::{
38 batch_client::{BatchClient, SuccessfulTransaction},
39 constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
40 fees::{Fee, FeeStrategy, Lamports, Priority},
41 helpers::{
42 check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
43 get_unique_timestamp,
44 },
45 tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
46 types::{IndexerError, TransactionType, UploadBlobError},
47 BloberClientError, BloberClientResult, LedgerDataBlobError,
48};
49
50#[derive(Builder, Clone)]
51pub struct BloberClient {
52 #[builder(getter(name = get_payer, vis = ""))]
53 pub(crate) payer: Arc<Keypair>,
54 pub(crate) program_id: Pubkey,
55 pub(crate) rpc_client: Arc<RpcClient>,
56 pub(crate) batch_client: BatchClient,
57 pub(crate) indexer_client: Option<Arc<WsClient>>,
59 #[builder(default = false)]
60 pub(crate) helius_fee_estimate: bool,
61}
62
63impl<State: blober_client_builder::State> BloberClientBuilder<State> {
64 pub async fn indexer_from_url(
76 self,
77 indexer_url: &str,
78 ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
79 where
80 State::IndexerClient: IsUnset,
81 {
82 let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
83 Ok(self.indexer_client(Arc::new(indexer_client)))
84 }
85
86 pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
108 where
109 State::Payer: IsSet,
110 State::ProgramId: IsSet,
111 State::RpcClient: IsUnset,
112 State::BatchClient: IsUnset,
113 {
114 let rpc_client = Arc::new(RpcClient::new_with_commitment(
115 solana_config.json_rpc_url.clone(),
116 CommitmentConfig::from_str(&solana_config.commitment)?,
117 ));
118 let payer = self.get_payer().clone();
119 Ok(self
120 .rpc_client(rpc_client.clone())
121 .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
122 .build())
123 }
124
125 pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
126 where
127 State::HeliusFeeEstimate: IsUnset,
128 {
129 self.helius_fee_estimate(true)
130 }
131}
132
133impl BloberClient {
134 pub fn rpc_client(&self) -> Arc<RpcClient> {
136 self.rpc_client.clone()
137 }
138
139 pub fn payer(&self) -> Arc<Keypair> {
141 self.payer.clone()
142 }
143
144 pub async fn initialize_blober(
146 &self,
147 fee_strategy: FeeStrategy,
148 namespace: &str,
149 timeout: Option<Duration>,
150 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
151 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
152
153 let fee_strategy = self
154 .convert_fee_strategy_to_fixed(
155 fee_strategy,
156 &[blober],
157 TransactionType::InitializeBlober,
158 )
159 .in_current_span()
160 .await?;
161
162 let msg = Initialize::build_message(MessageArguments::new(
163 self.program_id,
164 blober,
165 &self.payer,
166 self.rpc_client.clone(),
167 fee_strategy,
168 self.helius_fee_estimate,
169 (namespace.to_owned(), blober),
170 ))
171 .await
172 .expect("infallible with a fixed fee strategy");
173
174 let span = info_span!(parent: Span::current(), "initialize_blober");
175 Ok(check_outcomes(
176 self.batch_client
177 .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
178 .instrument(span)
179 .await,
180 )
181 .map_err(UploadBlobError::InitializeBlober)?)
182 }
183
184 pub async fn close_blober(
186 &self,
187 fee_strategy: FeeStrategy,
188 namespace: &str,
189 timeout: Option<Duration>,
190 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
191 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
192
193 let fee_strategy = self
194 .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
195 .in_current_span()
196 .await?;
197
198 let msg = Close::build_message(MessageArguments::new(
199 self.program_id,
200 blober,
201 &self.payer,
202 self.rpc_client.clone(),
203 fee_strategy,
204 self.helius_fee_estimate,
205 (),
206 ))
207 .await
208 .expect("infallible with a fixed fee strategy");
209
210 let span = info_span!(parent: Span::current(), "close_blober");
211 Ok(check_outcomes(
212 self.batch_client
213 .send(vec![(TransactionType::CloseBlober, msg)], timeout)
214 .instrument(span)
215 .await,
216 )
217 .map_err(UploadBlobError::CloseBlober)?)
218 }
219
220 pub async fn upload_blob(
227 &self,
228 blob_data: &[u8],
229 fee_strategy: FeeStrategy,
230 namespace: &str,
231 timeout: Option<Duration>,
232 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
233 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
234 let timestamp = get_unique_timestamp();
235
236 let blob = find_blob_address(
237 self.program_id,
238 self.payer.pubkey(),
239 blober,
240 timestamp,
241 blob_data.len(),
242 );
243
244 let upload_messages = self
245 .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
246 .await?;
247
248 let res = self
249 .do_upload(upload_messages, timeout)
250 .in_current_span()
251 .await;
252
253 if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
254 self.discard_blob(fee_strategy, blob, namespace, timeout)
255 .await
256 } else {
257 res
258 }
259 }
260
261 pub async fn discard_blob(
264 &self,
265 fee_strategy: FeeStrategy,
266 blob: Pubkey,
267 namespace: &str,
268 timeout: Option<Duration>,
269 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
270 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
271
272 let fee_strategy = self
273 .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
274 .in_current_span()
275 .await?;
276
277 let msg = DiscardBlob::build_message(MessageArguments::new(
278 self.program_id,
279 blober,
280 &self.payer,
281 self.rpc_client.clone(),
282 fee_strategy,
283 self.helius_fee_estimate,
284 blob,
285 ))
286 .in_current_span()
287 .await
288 .expect("infallible with a fixed fee strategy");
289
290 let span = info_span!(parent: Span::current(), "discard_blob");
291
292 Ok(check_outcomes(
293 self.batch_client
294 .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
295 .instrument(span)
296 .await,
297 )
298 .map_err(UploadBlobError::DiscardBlob)?)
299 }
300
301 pub async fn estimate_fees(
308 &self,
309 blob_size: usize,
310 blober: Pubkey,
311 priority: Priority,
312 ) -> BloberClientResult<Fee> {
313 let prioritization_fee_rate = priority
314 .get_priority_fee_estimate(
315 &self.rpc_client,
316 &[Pubkey::new_unique(), blober, self.payer.pubkey()],
317 self.helius_fee_estimate,
318 )
319 .await?;
320
321 let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
322
323 let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
324 (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
325 } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
326 (
327 CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
328 CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
329 )
330 } else {
331 (
332 DeclareBlob::COMPUTE_UNIT_LIMIT
333 + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
334 + CompoundFinalize::COMPUTE_UNIT_LIMIT,
335 DeclareBlob::NUM_SIGNATURES
336 + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
337 + CompoundFinalize::NUM_SIGNATURES,
338 )
339 };
340
341 let price_per_signature = Lamports::new(5000);
344
345 let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
346
347 Ok(Fee {
348 num_signatures,
349 price_per_signature,
350 compute_unit_limit,
351 prioritization_fee_rate,
352 blob_account_size,
353 })
354 }
355
356 pub async fn get_ledger_blobs_from_signatures(
358 &self,
359 namespace: &str,
360 payer_pubkey: Option<Pubkey>,
361 signatures: Vec<Signature>,
362 ) -> BloberClientResult<Vec<u8>> {
363 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
364 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
365
366 let relevant_transactions = futures::stream::iter(signatures)
367 .map(|signature| async move {
368 self.rpc_client
369 .get_transaction_with_config(
370 &signature,
371 RpcTransactionConfig {
372 commitment: Some(self.rpc_client.commitment()),
373 encoding: Some(UiTransactionEncoding::Base58),
374 ..Default::default()
375 },
376 )
377 .await
378 })
379 .buffer_unordered(DEFAULT_CONCURRENCY)
380 .collect::<Vec<_>>()
381 .await
382 .into_iter()
383 .collect::<Result<Vec<_>, _>>()?;
384
385 let relevant_instructions = extract_relevant_instructions(
386 &relevant_transactions
387 .iter()
388 .filter_map(|encoded| match &encoded.transaction.meta {
389 Some(meta) if meta.status.is_err() => None,
390 _ => encoded.transaction.transaction.decode(),
391 })
392 .collect::<Vec<_>>(),
393 );
394
395 let declares = relevant_instructions
396 .iter()
397 .filter_map(|instruction| {
398 (instruction.blober == blober
399 && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
400 .then_some(instruction.blob)
401 })
402 .collect::<Vec<Pubkey>>();
403
404 let Some(blob) = declares.first() else {
405 return Err(LedgerDataBlobError::DeclareNotFound.into());
406 };
407
408 if declares.len() > 1 {
409 return Err(LedgerDataBlobError::MultipleDeclares.into());
410 }
411
412 if relevant_instructions
413 .iter()
414 .filter(|instruction| {
415 matches!(
416 instruction.instruction,
417 RelevantInstruction::FinalizeBlob(_)
418 )
419 })
420 .count()
421 > 1
422 {
423 return Err(LedgerDataBlobError::MultipleFinalizes.into());
424 }
425
426 Ok(get_blob_data_from_instructions(
427 &relevant_instructions,
428 blober,
429 *blob,
430 )?)
431 }
432
433 pub async fn get_ledger_blobs(
435 &self,
436 slot: u64,
437 namespace: &str,
438 payer_pubkey: Option<Pubkey>,
439 lookback_slots: Option<u64>,
440 ) -> BloberClientResult<Vec<Vec<u8>>> {
441 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
442 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
443
444 let block_config = RpcBlockConfig {
445 commitment: Some(self.rpc_client.commitment()),
446 encoding: Some(UiTransactionEncoding::Base58),
447 ..Default::default()
448 };
449 let block = self
450 .rpc_client
451 .get_block_with_config(slot, block_config)
452 .await?;
453
454 let Some(transactions) = block.transactions else {
455 return Ok(Vec::new());
457 };
458
459 let relevant_instructions = extract_relevant_instructions(
460 &transactions
461 .iter()
462 .filter_map(|tx| match &tx.meta {
463 Some(meta) if meta.status.is_err() => None,
464 _ => tx.transaction.decode(),
465 })
466 .collect::<Vec<_>>(),
467 );
468 let finalized_blobs = relevant_instructions
469 .iter()
470 .filter_map(|instruction| {
471 (instruction.blober == blober
472 && matches!(
473 instruction.instruction,
474 RelevantInstruction::FinalizeBlob(_)
475 ))
476 .then_some(instruction.blob)
477 })
478 .collect::<HashSet<Pubkey>>();
479
480 let mut relevant_instructions_map = HashMap::new();
481 filter_relevant_instructions(
482 relevant_instructions,
483 &finalized_blobs,
484 &mut relevant_instructions_map,
485 );
486
487 let mut blobs = HashMap::with_capacity(finalized_blobs.len());
488 for blob in &finalized_blobs {
489 let instructions = relevant_instructions_map
490 .get(blob)
491 .expect("This should never happen since we at least have the finalize instruction");
492
493 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
494 blobs.insert(blob, blob_data);
495 }
496 }
497
498 if blobs.len() == finalized_blobs.len() {
500 return Ok(blobs.values().cloned().collect());
501 }
502
503 let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
504
505 let block_slots = self
506 .rpc_client
507 .get_blocks_with_commitment(
508 slot - lookback_slots,
509 Some(slot - 1),
510 self.rpc_client.commitment(),
511 )
512 .await?;
513
514 for slot in block_slots.into_iter().rev() {
515 let block = self
516 .rpc_client
517 .get_block_with_config(slot, block_config)
518 .await?;
519 let Some(transactions) = block.transactions else {
520 continue;
522 };
523 let new_relevant_instructions = extract_relevant_instructions(
524 &transactions
525 .iter()
526 .filter_map(|tx| match &tx.meta {
527 Some(meta) if meta.status.is_err() => None,
528 _ => tx.transaction.decode(),
529 })
530 .collect::<Vec<_>>(),
531 );
532 filter_relevant_instructions(
533 new_relevant_instructions,
534 &finalized_blobs,
535 &mut relevant_instructions_map,
536 );
537 for blob in &finalized_blobs {
538 if blobs.contains_key(blob) {
539 continue;
540 }
541 let instructions = relevant_instructions_map.get(blob).expect(
542 "This should never happen since we at least have the finalize instruction",
543 );
544 println!("total {}", instructions.len());
545
546 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
547 {
548 blobs.insert(blob, blob_data);
549 }
550 }
551 if blobs.len() == finalized_blobs.len() {
552 break;
553 }
554 }
555
556 Ok(blobs.values().cloned().collect())
557 }
558
559 pub async fn get_blobs(
561 &self,
562 slot: u64,
563 namespace: &str,
564 payer_pubkey: Option<Pubkey>,
565 ) -> BloberClientResult<Vec<Vec<u8>>> {
566 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
567 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
568
569 loop {
570 let blobs = self
571 .indexer()
572 .get_blobs(blober, slot)
573 .await
574 .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
575 if let Some(blobs) = blobs {
576 return Ok(blobs);
577 }
578 tokio::time::sleep(Duration::from_millis(100)).await;
579 }
580 }
581
582 pub async fn get_blobs_by_blober(
584 &self,
585 blober_blobs: BlobsByBlober,
586 ) -> BloberClientResult<Vec<Vec<u8>>> {
587 let blober = blober_blobs.blober;
588
589 self.indexer()
590 .get_blobs_by_blober(blober_blobs)
591 .await
592 .map_err(|e| IndexerError::BlobsForBlober(blober.to_string(), e.to_string()).into())
593 }
594
595 pub async fn get_blobs_by_payer(
597 &self,
598 payer_blobs: BlobsByPayer,
599 ) -> BloberClientResult<Vec<Vec<u8>>> {
600 let payer = payer_blobs.payer;
601
602 self.indexer()
603 .get_blobs_by_payer(payer_blobs)
604 .await
605 .map_err(|e| IndexerError::BlobsForPayer(payer.to_string(), e.to_string()).into())
606 }
607
608 pub async fn get_slot_proof(
610 &self,
611 slot: u64,
612 namespace: &str,
613 payer_pubkey: Option<Pubkey>,
614 ) -> BloberClientResult<CompoundProof> {
615 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
616 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
617
618 loop {
619 let proof = self
620 .indexer()
621 .get_proof(blober, slot)
622 .await
623 .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
624 if let Some(proofs) = proof {
625 return Ok(proofs);
626 }
627 tokio::time::sleep(Duration::from_millis(100)).await;
628 }
629 }
630
631 pub async fn get_blob_proof(&self, blob: Pubkey) -> BloberClientResult<Option<CompoundProof>> {
633 self.indexer()
634 .get_proof_for_blob(blob)
635 .await
636 .map_err(|e| IndexerError::ProofForBlob(blob.to_string(), e.to_string()).into())
637 }
638
639 pub async fn get_blob_messages(
644 &self,
645 slot: u64,
646 namespace: &str,
647 payer_pubkey: Option<Pubkey>,
648 ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
649 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
650 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
651
652 let block: EncodedConfirmedBlock = self
653 .rpc_client
654 .get_block_with_config(
655 slot,
656 RpcBlockConfig {
657 commitment: Some(self.rpc_client.commitment()),
658 encoding: Some(UiTransactionEncoding::Base58),
659 ..Default::default()
660 },
661 )
662 .await?
663 .into();
664
665 let finalized = block
666 .transactions
667 .iter()
668 .filter_map(|tx| match &tx.meta {
669 Some(meta) if meta.status.is_err() => None,
670 _ => tx.transaction.decode(),
671 })
672 .filter_map(|tx| {
673 let instructions = tx
674 .message
675 .instructions()
676 .iter()
677 .filter_map(|compiled_instruction| {
678 Some(RelevantInstructionWithAccounts {
679 blob: get_account_at_index(&tx, compiled_instruction, 0)?,
680 blober: get_account_at_index(&tx, compiled_instruction, 1)?,
681 instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
682 })
683 })
684 .filter(|instruction| {
685 instruction.blober == blober
686 && matches!(
687 instruction.instruction,
688 RelevantInstruction::FinalizeBlob(_)
689 )
690 })
691 .collect::<Vec<_>>();
692
693 instructions.is_empty().then_some(
694 instructions
695 .iter()
696 .map(|instruction| (instruction.blob, tx.message.clone()))
697 .collect::<Vec<_>>(),
698 )
699 })
700 .flatten()
701 .collect::<Vec<_>>();
702
703 Ok(finalized)
704 }
705}