1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 sync::Arc,
5 time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use futures::StreamExt;
12use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
13use nitro_da_blober::{
14 CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
15 instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
16 state::blober::Blober,
17};
18use nitro_da_indexer_api::{
19 CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
20 extract_relevant_instructions, get_account_at_index,
21};
22use solana_cli_config::Config;
23use solana_client::rpc_config::RpcTransactionConfig;
24use solana_rpc_client::nonblocking::rpc_client::RpcClient;
25use solana_rpc_client_api::config::RpcBlockConfig;
26use solana_sdk::{
27 commitment_config::CommitmentConfig,
28 message::VersionedMessage,
29 pubkey::Pubkey,
30 signature::{Keypair, Signature},
31 signer::Signer,
32};
33use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
34use tracing::{Instrument, Span, info_span};
35
36use crate::{
37 BloberClientError, BloberClientResult, LedgerDataBlobError,
38 batch_client::{BatchClient, SuccessfulTransaction},
39 constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
40 fees::{Fee, FeeStrategy, Lamports, Priority},
41 helpers::{
42 check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
43 get_unique_timestamp,
44 },
45 tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
46 types::{IndexerError, TransactionType, UploadBlobError},
47};
48
49#[derive(Builder, Clone)]
50pub struct BloberClient {
51 #[builder(getter(name = get_payer, vis = ""))]
52 pub(crate) payer: Arc<Keypair>,
53 pub(crate) program_id: Pubkey,
54 pub(crate) rpc_client: Arc<RpcClient>,
55 pub(crate) batch_client: BatchClient,
56 pub(crate) indexer_client: Option<Arc<WsClient>>,
58 #[builder(default = false)]
59 pub(crate) helius_fee_estimate: bool,
60}
61
62impl<State: blober_client_builder::State> BloberClientBuilder<State> {
63 pub async fn indexer_from_url(
75 self,
76 indexer_url: &str,
77 ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
78 where
79 State::IndexerClient: IsUnset,
80 {
81 let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
82 Ok(self.indexer_client(Arc::new(indexer_client)))
83 }
84
85 pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
107 where
108 State::Payer: IsSet,
109 State::ProgramId: IsSet,
110 State::RpcClient: IsUnset,
111 State::BatchClient: IsUnset,
112 {
113 let rpc_client = Arc::new(RpcClient::new_with_commitment(
114 solana_config.json_rpc_url.clone(),
115 CommitmentConfig::from_str(&solana_config.commitment)?,
116 ));
117 let payer = self.get_payer().clone();
118 Ok(self
119 .rpc_client(rpc_client.clone())
120 .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
121 .build())
122 }
123
124 pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
125 where
126 State::HeliusFeeEstimate: IsUnset,
127 {
128 self.helius_fee_estimate(true)
129 }
130}
131
132impl BloberClient {
133 pub fn rpc_client(&self) -> Arc<RpcClient> {
135 self.rpc_client.clone()
136 }
137
138 pub fn payer(&self) -> Arc<Keypair> {
140 self.payer.clone()
141 }
142
143 pub async fn initialize_blober(
145 &self,
146 fee_strategy: FeeStrategy,
147 namespace: &str,
148 timeout: Option<Duration>,
149 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
150 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
151
152 let fee_strategy = self
153 .convert_fee_strategy_to_fixed(
154 fee_strategy,
155 &[blober],
156 TransactionType::InitializeBlober,
157 )
158 .in_current_span()
159 .await?;
160
161 let msg = Initialize::build_message(MessageArguments::new(
162 self.program_id,
163 blober,
164 &self.payer,
165 self.rpc_client.clone(),
166 fee_strategy,
167 self.helius_fee_estimate,
168 (namespace.to_owned(), blober),
169 ))
170 .await
171 .expect("infallible with a fixed fee strategy");
172
173 let span = info_span!(parent: Span::current(), "initialize_blober");
174 Ok(check_outcomes(
175 self.batch_client
176 .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
177 .instrument(span)
178 .await,
179 )
180 .map_err(UploadBlobError::InitializeBlober)?)
181 }
182
183 pub async fn close_blober(
185 &self,
186 fee_strategy: FeeStrategy,
187 namespace: &str,
188 timeout: Option<Duration>,
189 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
190 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
191
192 let fee_strategy = self
193 .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
194 .in_current_span()
195 .await?;
196
197 let msg = Close::build_message(MessageArguments::new(
198 self.program_id,
199 blober,
200 &self.payer,
201 self.rpc_client.clone(),
202 fee_strategy,
203 self.helius_fee_estimate,
204 (),
205 ))
206 .await
207 .expect("infallible with a fixed fee strategy");
208
209 let span = info_span!(parent: Span::current(), "close_blober");
210 Ok(check_outcomes(
211 self.batch_client
212 .send(vec![(TransactionType::CloseBlober, msg)], timeout)
213 .instrument(span)
214 .await,
215 )
216 .map_err(UploadBlobError::CloseBlober)?)
217 }
218
219 pub async fn upload_blob(
226 &self,
227 blob_data: &[u8],
228 fee_strategy: FeeStrategy,
229 namespace: &str,
230 timeout: Option<Duration>,
231 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
232 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
233 let timestamp = get_unique_timestamp();
234
235 let blob = find_blob_address(
236 self.program_id,
237 self.payer.pubkey(),
238 blober,
239 timestamp,
240 blob_data.len(),
241 );
242
243 let upload_messages = self
244 .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
245 .await?;
246
247 let res = self
248 .do_upload(upload_messages, timeout)
249 .in_current_span()
250 .await;
251
252 if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
253 self.discard_blob(fee_strategy, blob, namespace, timeout)
254 .await
255 } else {
256 res
257 }
258 }
259
260 pub async fn discard_blob(
263 &self,
264 fee_strategy: FeeStrategy,
265 blob: Pubkey,
266 namespace: &str,
267 timeout: Option<Duration>,
268 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
269 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
270
271 let fee_strategy = self
272 .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
273 .in_current_span()
274 .await?;
275
276 let msg = DiscardBlob::build_message(MessageArguments::new(
277 self.program_id,
278 blober,
279 &self.payer,
280 self.rpc_client.clone(),
281 fee_strategy,
282 self.helius_fee_estimate,
283 blob,
284 ))
285 .in_current_span()
286 .await
287 .expect("infallible with a fixed fee strategy");
288
289 let span = info_span!(parent: Span::current(), "discard_blob");
290
291 Ok(check_outcomes(
292 self.batch_client
293 .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
294 .instrument(span)
295 .await,
296 )
297 .map_err(UploadBlobError::DiscardBlob)?)
298 }
299
300 pub async fn estimate_fees(
307 &self,
308 blob_size: usize,
309 blober: Pubkey,
310 priority: Priority,
311 ) -> BloberClientResult<Fee> {
312 let prioritization_fee_rate = priority
313 .get_priority_fee_estimate(
314 &self.rpc_client,
315 &[Pubkey::new_unique(), blober, self.payer.pubkey()],
316 self.helius_fee_estimate,
317 )
318 .await?;
319
320 let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
321
322 let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
323 (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
324 } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
325 (
326 CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
327 CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
328 )
329 } else {
330 (
331 DeclareBlob::COMPUTE_UNIT_LIMIT
332 + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
333 + CompoundFinalize::COMPUTE_UNIT_LIMIT,
334 DeclareBlob::NUM_SIGNATURES
335 + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
336 + CompoundFinalize::NUM_SIGNATURES,
337 )
338 };
339
340 let price_per_signature = Lamports::new(5000);
343
344 let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
345
346 Ok(Fee {
347 num_signatures,
348 price_per_signature,
349 compute_unit_limit,
350 prioritization_fee_rate,
351 blob_account_size,
352 })
353 }
354
355 pub async fn get_ledger_blobs_from_signatures(
357 &self,
358 namespace: &str,
359 payer_pubkey: Option<Pubkey>,
360 signatures: Vec<Signature>,
361 ) -> BloberClientResult<Vec<u8>> {
362 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
363 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
364
365 let relevant_transactions = futures::stream::iter(signatures)
366 .map(|signature| async move {
367 self.rpc_client
368 .get_transaction_with_config(
369 &signature,
370 RpcTransactionConfig {
371 commitment: Some(self.rpc_client.commitment()),
372 encoding: Some(UiTransactionEncoding::Base58),
373 ..Default::default()
374 },
375 )
376 .await
377 })
378 .buffer_unordered(DEFAULT_CONCURRENCY)
379 .collect::<Vec<_>>()
380 .await
381 .into_iter()
382 .collect::<Result<Vec<_>, _>>()?;
383
384 let relevant_instructions = extract_relevant_instructions(
385 &relevant_transactions
386 .iter()
387 .filter_map(|encoded| match &encoded.transaction.meta {
388 Some(meta) if meta.status.is_err() => None,
389 _ => encoded.transaction.transaction.decode(),
390 })
391 .collect::<Vec<_>>(),
392 );
393
394 let declares = relevant_instructions
395 .iter()
396 .filter_map(|instruction| {
397 (instruction.blober == blober
398 && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
399 .then_some(instruction.blob)
400 })
401 .collect::<Vec<Pubkey>>();
402
403 let Some(blob) = declares.first() else {
404 return Err(LedgerDataBlobError::DeclareNotFound.into());
405 };
406
407 if declares.len() > 1 {
408 return Err(LedgerDataBlobError::MultipleDeclares.into());
409 }
410
411 if relevant_instructions
412 .iter()
413 .filter(|instruction| {
414 matches!(
415 instruction.instruction,
416 RelevantInstruction::FinalizeBlob(_)
417 )
418 })
419 .count()
420 > 1
421 {
422 return Err(LedgerDataBlobError::MultipleFinalizes.into());
423 }
424
425 Ok(get_blob_data_from_instructions(
426 &relevant_instructions,
427 blober,
428 *blob,
429 )?)
430 }
431
432 pub async fn get_ledger_blobs(
434 &self,
435 slot: u64,
436 namespace: &str,
437 payer_pubkey: Option<Pubkey>,
438 lookback_slots: Option<u64>,
439 ) -> BloberClientResult<Vec<Vec<u8>>> {
440 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
441 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
442
443 let block_config = RpcBlockConfig {
444 commitment: Some(self.rpc_client.commitment()),
445 encoding: Some(UiTransactionEncoding::Base58),
446 ..Default::default()
447 };
448 let block = self
449 .rpc_client
450 .get_block_with_config(slot, block_config)
451 .await?;
452
453 let Some(transactions) = block.transactions else {
454 return Ok(Vec::new());
456 };
457
458 let relevant_instructions = extract_relevant_instructions(
459 &transactions
460 .iter()
461 .filter_map(|tx| match &tx.meta {
462 Some(meta) if meta.status.is_err() => None,
463 _ => tx.transaction.decode(),
464 })
465 .collect::<Vec<_>>(),
466 );
467 let finalized_blobs = relevant_instructions
468 .iter()
469 .filter_map(|instruction| {
470 (instruction.blober == blober
471 && matches!(
472 instruction.instruction,
473 RelevantInstruction::FinalizeBlob(_)
474 ))
475 .then_some(instruction.blob)
476 })
477 .collect::<HashSet<Pubkey>>();
478
479 let mut relevant_instructions_map = HashMap::new();
480 filter_relevant_instructions(
481 relevant_instructions,
482 &finalized_blobs,
483 &mut relevant_instructions_map,
484 );
485
486 let mut blobs = HashMap::with_capacity(finalized_blobs.len());
487 for blob in &finalized_blobs {
488 let instructions = relevant_instructions_map
489 .get(blob)
490 .expect("This should never happen since we at least have the finalize instruction");
491
492 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
493 blobs.insert(blob, blob_data);
494 }
495 }
496
497 if blobs.len() == finalized_blobs.len() {
499 return Ok(blobs.values().cloned().collect());
500 }
501
502 let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
503
504 let block_slots = self
505 .rpc_client
506 .get_blocks_with_commitment(
507 slot - lookback_slots,
508 Some(slot - 1),
509 self.rpc_client.commitment(),
510 )
511 .await?;
512
513 for slot in block_slots.into_iter().rev() {
514 let block = self
515 .rpc_client
516 .get_block_with_config(slot, block_config)
517 .await?;
518 let Some(transactions) = block.transactions else {
519 continue;
521 };
522 let new_relevant_instructions = extract_relevant_instructions(
523 &transactions
524 .iter()
525 .filter_map(|tx| match &tx.meta {
526 Some(meta) if meta.status.is_err() => None,
527 _ => tx.transaction.decode(),
528 })
529 .collect::<Vec<_>>(),
530 );
531 filter_relevant_instructions(
532 new_relevant_instructions,
533 &finalized_blobs,
534 &mut relevant_instructions_map,
535 );
536 for blob in &finalized_blobs {
537 if blobs.contains_key(blob) {
538 continue;
539 }
540 let instructions = relevant_instructions_map.get(blob).expect(
541 "This should never happen since we at least have the finalize instruction",
542 );
543 println!("total {}", instructions.len());
544
545 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
546 {
547 blobs.insert(blob, blob_data);
548 }
549 }
550 if blobs.len() == finalized_blobs.len() {
551 break;
552 }
553 }
554
555 Ok(blobs.values().cloned().collect())
556 }
557
558 pub async fn get_blobs(
560 &self,
561 slot: u64,
562 namespace: &str,
563 payer_pubkey: Option<Pubkey>,
564 ) -> BloberClientResult<Vec<Vec<u8>>> {
565 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
566 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
567
568 loop {
569 let blobs = self
570 .indexer()
571 .get_blobs(blober, slot)
572 .await
573 .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
574 if let Some(blobs) = blobs {
575 return Ok(blobs);
576 }
577 tokio::time::sleep(Duration::from_millis(100)).await;
578 }
579 }
580
581 pub async fn get_slot_proof(
583 &self,
584 slot: u64,
585 namespace: &str,
586 payer_pubkey: Option<Pubkey>,
587 ) -> BloberClientResult<CompoundProof> {
588 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
589 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
590
591 loop {
592 let proof = self
593 .indexer()
594 .get_proof(blober, slot)
595 .await
596 .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
597 if let Some(proofs) = proof {
598 return Ok(proofs);
599 }
600 tokio::time::sleep(Duration::from_millis(100)).await;
601 }
602 }
603
604 pub async fn get_blob_messages(
609 &self,
610 slot: u64,
611 namespace: &str,
612 payer_pubkey: Option<Pubkey>,
613 ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
614 let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
615 let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
616
617 let block: EncodedConfirmedBlock = self
618 .rpc_client
619 .get_block_with_config(
620 slot,
621 RpcBlockConfig {
622 commitment: Some(self.rpc_client.commitment()),
623 encoding: Some(UiTransactionEncoding::Base58),
624 ..Default::default()
625 },
626 )
627 .await?
628 .into();
629
630 let finalized = block
631 .transactions
632 .iter()
633 .filter_map(|tx| match &tx.meta {
634 Some(meta) if meta.status.is_err() => None,
635 _ => tx.transaction.decode(),
636 })
637 .filter_map(|tx| {
638 let instructions = tx
639 .message
640 .instructions()
641 .iter()
642 .filter_map(|compiled_instruction| {
643 Some(RelevantInstructionWithAccounts {
644 blob: get_account_at_index(&tx, compiled_instruction, 0)?,
645 blober: get_account_at_index(&tx, compiled_instruction, 1)?,
646 instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
647 })
648 })
649 .filter(|instruction| {
650 instruction.blober == blober
651 && matches!(
652 instruction.instruction,
653 RelevantInstruction::FinalizeBlob(_)
654 )
655 })
656 .collect::<Vec<_>>();
657
658 instructions.is_empty().then_some(
659 instructions
660 .iter()
661 .map(|instruction| (instruction.blob, tx.message.clone()))
662 .collect::<Vec<_>>(),
663 )
664 })
665 .flatten()
666 .collect::<Vec<_>>();
667
668 Ok(finalized)
669 }
670}