1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 sync::Arc,
5 time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use futures::StreamExt;
12use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
13use nitro_da_blober::{
14 CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
15 instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
16 state::blober::Blober,
17};
18use nitro_da_indexer_api::{
19 CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
20 extract_relevant_instructions, get_account_at_index,
21};
22use solana_cli_config::Config;
23use solana_client::rpc_config::RpcTransactionConfig;
24use solana_rpc_client::nonblocking::rpc_client::RpcClient;
25use solana_rpc_client_api::config::RpcBlockConfig;
26use solana_sdk::{
27 commitment_config::CommitmentConfig,
28 message::VersionedMessage,
29 pubkey::Pubkey,
30 signature::{Keypair, Signature},
31 signer::Signer,
32};
33use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
34use tracing::{Instrument, Span, info_span};
35
36use crate::{
37 BloberClientError, BloberClientResult, LedgerDataBlobError,
38 batch_client::{BatchClient, SuccessfulTransaction},
39 constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
40 fees::{Fee, FeeStrategy, Lamports, Priority},
41 helpers::{
42 check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
43 get_unique_timestamp,
44 },
45 tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
46 types::{IndexerError, TransactionType, UploadBlobError},
47};
48
49#[derive(Builder, Clone)]
50pub struct BloberClient {
51 #[builder(getter(name = get_payer, vis = ""))]
52 pub(crate) payer: Arc<Keypair>,
53 pub(crate) program_id: Pubkey,
54 pub(crate) rpc_client: Arc<RpcClient>,
55 pub(crate) batch_client: BatchClient,
56 pub(crate) indexer_client: Option<Arc<WsClient>>,
58 #[builder(default = false)]
59 pub(crate) helius_fee_estimate: bool,
60}
61
62impl<State: blober_client_builder::State> BloberClientBuilder<State> {
63 pub async fn indexer_from_url(
75 self,
76 indexer_url: &str,
77 ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
78 where
79 State::IndexerClient: IsUnset,
80 {
81 let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
82 Ok(self.indexer_client(Arc::new(indexer_client)))
83 }
84
85 pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
107 where
108 State::Payer: IsSet,
109 State::ProgramId: IsSet,
110 State::RpcClient: IsUnset,
111 State::BatchClient: IsUnset,
112 {
113 let rpc_client = Arc::new(RpcClient::new_with_commitment(
114 solana_config.json_rpc_url.clone(),
115 CommitmentConfig::from_str(&solana_config.commitment)?,
116 ));
117 let payer = self.get_payer().clone();
118 Ok(self
119 .rpc_client(rpc_client.clone())
120 .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
121 .build())
122 }
123
124 pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
125 where
126 State::HeliusFeeEstimate: IsUnset,
127 {
128 self.helius_fee_estimate(true)
129 }
130}
131
132impl BloberClient {
133 pub fn rpc_client(&self) -> Arc<RpcClient> {
135 self.rpc_client.clone()
136 }
137
138 pub fn payer(&self) -> Arc<Keypair> {
140 self.payer.clone()
141 }
142
143 pub async fn initialize_blober(
145 &self,
146 fee_strategy: FeeStrategy,
147 namespace: String,
148 timeout: Option<Duration>,
149 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
150 let blober = find_blober_address(self.program_id, self.payer.pubkey(), &namespace);
151
152 let fee_strategy = self
153 .convert_fee_strategy_to_fixed(
154 fee_strategy,
155 &[blober],
156 TransactionType::InitializeBlober,
157 )
158 .in_current_span()
159 .await?;
160
161 let msg = Initialize::build_message(MessageArguments::new(
162 self.program_id,
163 blober,
164 &self.payer,
165 self.rpc_client.clone(),
166 fee_strategy,
167 self.helius_fee_estimate,
168 (namespace, blober),
169 ))
170 .await
171 .expect("infallible with a fixed fee strategy");
172
173 let span = info_span!(parent: Span::current(), "initialize_blober");
174 Ok(check_outcomes(
175 self.batch_client
176 .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
177 .instrument(span)
178 .await,
179 )
180 .map_err(UploadBlobError::InitializeBlober)?)
181 }
182
183 pub async fn close_blober(
185 &self,
186 fee_strategy: FeeStrategy,
187 blober: Pubkey,
188 timeout: Option<Duration>,
189 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
190 let fee_strategy = self
191 .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
192 .in_current_span()
193 .await?;
194
195 let msg = Close::build_message(MessageArguments::new(
196 self.program_id,
197 blober,
198 &self.payer,
199 self.rpc_client.clone(),
200 fee_strategy,
201 self.helius_fee_estimate,
202 (),
203 ))
204 .await
205 .expect("infallible with a fixed fee strategy");
206
207 let span = info_span!(parent: Span::current(), "close_blober");
208 Ok(check_outcomes(
209 self.batch_client
210 .send(vec![(TransactionType::CloseBlober, msg)], timeout)
211 .instrument(span)
212 .await,
213 )
214 .map_err(UploadBlobError::CloseBlober)?)
215 }
216
217 pub async fn upload_blob(
224 &self,
225 blob_data: &[u8],
226 fee_strategy: FeeStrategy,
227 blober: Pubkey,
228 timeout: Option<Duration>,
229 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
230 let timestamp = get_unique_timestamp();
231
232 let blob = find_blob_address(
233 self.program_id,
234 self.payer.pubkey(),
235 blober,
236 timestamp,
237 blob_data.len(),
238 );
239
240 let upload_messages = self
241 .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
242 .await?;
243
244 let res = self
245 .do_upload(upload_messages, timeout)
246 .in_current_span()
247 .await;
248
249 if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
250 self.discard_blob(fee_strategy, blob, blober, timeout).await
251 } else {
252 res
253 }
254 }
255
256 pub async fn discard_blob(
259 &self,
260 fee_strategy: FeeStrategy,
261 blob: Pubkey,
262 blober: Pubkey,
263 timeout: Option<Duration>,
264 ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
265 let fee_strategy = self
266 .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
267 .in_current_span()
268 .await?;
269
270 let msg = DiscardBlob::build_message(MessageArguments::new(
271 self.program_id,
272 blober,
273 &self.payer,
274 self.rpc_client.clone(),
275 fee_strategy,
276 self.helius_fee_estimate,
277 blob,
278 ))
279 .in_current_span()
280 .await
281 .expect("infallible with a fixed fee strategy");
282
283 let span = info_span!(parent: Span::current(), "discard_blob");
284
285 Ok(check_outcomes(
286 self.batch_client
287 .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
288 .instrument(span)
289 .await,
290 )
291 .map_err(UploadBlobError::DiscardBlob)?)
292 }
293
294 pub async fn estimate_fees(
301 &self,
302 blob_size: usize,
303 blober: Pubkey,
304 priority: Priority,
305 ) -> BloberClientResult<Fee> {
306 let prioritization_fee_rate = priority
307 .get_priority_fee_estimate(
308 &self.rpc_client,
309 &[Pubkey::new_unique(), blober, self.payer.pubkey()],
310 self.helius_fee_estimate,
311 )
312 .await?;
313
314 let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
315
316 let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
317 (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
318 } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
319 (
320 CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
321 CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
322 )
323 } else {
324 (
325 DeclareBlob::COMPUTE_UNIT_LIMIT
326 + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
327 + CompoundFinalize::COMPUTE_UNIT_LIMIT,
328 DeclareBlob::NUM_SIGNATURES
329 + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
330 + CompoundFinalize::NUM_SIGNATURES,
331 )
332 };
333
334 let price_per_signature = Lamports::new(5000);
337
338 let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
339
340 Ok(Fee {
341 num_signatures,
342 price_per_signature,
343 compute_unit_limit,
344 prioritization_fee_rate,
345 blob_account_size,
346 })
347 }
348
349 pub async fn get_ledger_blobs_from_signatures(
351 &self,
352 blober: Pubkey,
353 signatures: Vec<Signature>,
354 ) -> BloberClientResult<Vec<u8>> {
355 let relevant_transactions = futures::stream::iter(signatures)
356 .map(|signature| async move {
357 self.rpc_client
358 .get_transaction_with_config(
359 &signature,
360 RpcTransactionConfig {
361 commitment: Some(self.rpc_client.commitment()),
362 encoding: Some(UiTransactionEncoding::Base58),
363 ..Default::default()
364 },
365 )
366 .await
367 })
368 .buffer_unordered(DEFAULT_CONCURRENCY)
369 .collect::<Vec<_>>()
370 .await
371 .into_iter()
372 .collect::<Result<Vec<_>, _>>()?;
373
374 let relevant_instructions = extract_relevant_instructions(
375 &relevant_transactions
376 .iter()
377 .filter_map(|encoded| match &encoded.transaction.meta {
378 Some(meta) if meta.status.is_err() => None,
379 _ => encoded.transaction.transaction.decode(),
380 })
381 .collect::<Vec<_>>(),
382 );
383
384 let declares = relevant_instructions
385 .iter()
386 .filter_map(|instruction| {
387 (instruction.blober == blober
388 && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
389 .then_some(instruction.blob)
390 })
391 .collect::<Vec<Pubkey>>();
392
393 let Some(blob) = declares.first() else {
394 return Err(LedgerDataBlobError::DeclareNotFound.into());
395 };
396
397 if declares.len() > 1 {
398 return Err(LedgerDataBlobError::MultipleDeclares.into());
399 }
400
401 if relevant_instructions
402 .iter()
403 .filter(|instruction| {
404 matches!(
405 instruction.instruction,
406 RelevantInstruction::FinalizeBlob(_)
407 )
408 })
409 .count()
410 > 1
411 {
412 return Err(LedgerDataBlobError::MultipleFinalizes.into());
413 }
414
415 Ok(get_blob_data_from_instructions(
416 &relevant_instructions,
417 blober,
418 *blob,
419 )?)
420 }
421
422 pub async fn get_ledger_blobs(
424 &self,
425 slot: u64,
426 blober: Pubkey,
427 lookback_slots: Option<u64>,
428 ) -> BloberClientResult<Vec<Vec<u8>>> {
429 let block_config = RpcBlockConfig {
430 commitment: Some(self.rpc_client.commitment()),
431 encoding: Some(UiTransactionEncoding::Base58),
432 ..Default::default()
433 };
434 let block = self
435 .rpc_client
436 .get_block_with_config(slot, block_config)
437 .await?;
438
439 let Some(transactions) = block.transactions else {
440 return Ok(Vec::new());
442 };
443
444 let relevant_instructions = extract_relevant_instructions(
445 &transactions
446 .iter()
447 .filter_map(|tx| match &tx.meta {
448 Some(meta) if meta.status.is_err() => None,
449 _ => tx.transaction.decode(),
450 })
451 .collect::<Vec<_>>(),
452 );
453 let finalized_blobs = relevant_instructions
454 .iter()
455 .filter_map(|instruction| {
456 (instruction.blober == blober
457 && matches!(
458 instruction.instruction,
459 RelevantInstruction::FinalizeBlob(_)
460 ))
461 .then_some(instruction.blob)
462 })
463 .collect::<HashSet<Pubkey>>();
464
465 let mut relevant_instructions_map = HashMap::new();
466 filter_relevant_instructions(
467 relevant_instructions,
468 &finalized_blobs,
469 &mut relevant_instructions_map,
470 );
471
472 let mut blobs = HashMap::with_capacity(finalized_blobs.len());
473 for blob in &finalized_blobs {
474 let instructions = relevant_instructions_map
475 .get(blob)
476 .expect("This should never happen since we at least have the finalize instruction");
477
478 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
479 blobs.insert(blob, blob_data);
480 }
481 }
482
483 if blobs.len() == finalized_blobs.len() {
485 return Ok(blobs.values().cloned().collect());
486 }
487
488 let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
489
490 let block_slots = self
491 .rpc_client
492 .get_blocks_with_commitment(
493 slot - lookback_slots,
494 Some(slot - 1),
495 self.rpc_client.commitment(),
496 )
497 .await?;
498
499 for slot in block_slots.into_iter().rev() {
500 let block = self
501 .rpc_client
502 .get_block_with_config(slot, block_config)
503 .await?;
504 let Some(transactions) = block.transactions else {
505 continue;
507 };
508 let new_relevant_instructions = extract_relevant_instructions(
509 &transactions
510 .iter()
511 .filter_map(|tx| match &tx.meta {
512 Some(meta) if meta.status.is_err() => None,
513 _ => tx.transaction.decode(),
514 })
515 .collect::<Vec<_>>(),
516 );
517 filter_relevant_instructions(
518 new_relevant_instructions,
519 &finalized_blobs,
520 &mut relevant_instructions_map,
521 );
522 for blob in &finalized_blobs {
523 if blobs.contains_key(blob) {
524 continue;
525 }
526 let instructions = relevant_instructions_map.get(blob).expect(
527 "This should never happen since we at least have the finalize instruction",
528 );
529 println!("total {}", instructions.len());
530
531 if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
532 {
533 blobs.insert(blob, blob_data);
534 }
535 }
536 if blobs.len() == finalized_blobs.len() {
537 break;
538 }
539 }
540
541 Ok(blobs.values().cloned().collect())
542 }
543
544 pub async fn get_blobs(&self, slot: u64, blober: Pubkey) -> BloberClientResult<Vec<Vec<u8>>> {
546 loop {
547 let blobs = self
548 .indexer()
549 .get_blobs(blober, slot)
550 .await
551 .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
552 if let Some(blobs) = blobs {
553 return Ok(blobs);
554 }
555 tokio::time::sleep(Duration::from_millis(100)).await;
556 }
557 }
558
559 pub async fn get_slot_proof(
561 &self,
562 slot: u64,
563 blober: Pubkey,
564 ) -> BloberClientResult<CompoundProof> {
565 loop {
566 let proof = self
567 .indexer()
568 .get_proof(blober, slot)
569 .await
570 .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
571 if let Some(proofs) = proof {
572 return Ok(proofs);
573 }
574 tokio::time::sleep(Duration::from_millis(100)).await;
575 }
576 }
577
578 pub async fn get_blob_messages(
583 &self,
584 slot: u64,
585 blober: Pubkey,
586 ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
587 let block: EncodedConfirmedBlock = self
588 .rpc_client
589 .get_block_with_config(
590 slot,
591 RpcBlockConfig {
592 commitment: Some(self.rpc_client.commitment()),
593 encoding: Some(UiTransactionEncoding::Base58),
594 ..Default::default()
595 },
596 )
597 .await?
598 .into();
599
600 let finalized = block
601 .transactions
602 .iter()
603 .filter_map(|tx| match &tx.meta {
604 Some(meta) if meta.status.is_err() => None,
605 _ => tx.transaction.decode(),
606 })
607 .filter_map(|tx| {
608 let instructions = tx
609 .message
610 .instructions()
611 .iter()
612 .filter_map(|compiled_instruction| {
613 Some(RelevantInstructionWithAccounts {
614 blob: get_account_at_index(&tx, compiled_instruction, 0)?,
615 blober: get_account_at_index(&tx, compiled_instruction, 1)?,
616 instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
617 })
618 })
619 .filter(|instruction| {
620 instruction.blober == blober
621 && matches!(
622 instruction.instruction,
623 RelevantInstruction::FinalizeBlob(_)
624 )
625 })
626 .collect::<Vec<_>>();
627
628 instructions.is_empty().then_some(
629 instructions
630 .iter()
631 .map(|instruction| (instruction.blob, tx.message.clone()))
632 .collect::<Vec<_>>(),
633 )
634 })
635 .flatten()
636 .collect::<Vec<_>>();
637
638 Ok(finalized)
639 }
640}