1use std::{sync::Arc, time::Duration};
2
3use anchor_lang::{Discriminator, Space, prelude::Pubkey};
4use bon::Builder;
5use data_anchor_blober::{
6 CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
7 find_checkpoint_address, find_checkpoint_config_address,
8 instruction::{
9 Close, ConfigureCheckpoint, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk,
10 },
11 state::blober::Blober,
12};
13use data_anchor_utils::{
14 compression::CompressionType,
15 decompress_and_decode_async, encode_and_compress_async,
16 encoding::{Decodable, Encodable, EncodingType},
17};
18use futures::{StreamExt, TryStreamExt};
19use jsonrpsee::http_client::HttpClient;
20use nitro_sender::{NitroSender, SuccessfulTransaction};
21use solana_commitment_config::CommitmentConfig;
22use solana_keypair::Keypair;
23use solana_rpc_client::nonblocking::rpc_client::RpcClient;
24use solana_signer::Signer;
25use tracing::{Instrument, Span, info, info_span, trace};
26
27use crate::{
28 DataAnchorClientError, DataAnchorClientResult, IndexerUrl,
29 constants::DEFAULT_CONCURRENCY,
30 fees::{Fee, FeeStrategy, Lamports},
31 helpers::{check_outcomes, get_unique_timestamp},
32 tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
33 types::TransactionType,
34};
35
36mod builder;
37mod indexer_client;
38mod ledger_client;
39mod proof_client;
40
41pub use indexer_client::IndexerError;
42pub use ledger_client::ChainError;
43pub use proof_client::ProofError;
44
45#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum BloberIdentifier {
48 Namespace(String),
49 PayerAndNamespace { payer: Pubkey, namespace: String },
50 Pubkey(Pubkey),
51}
52
53#[derive(Debug, thiserror::Error)]
54pub enum BloberIdentifierError {
55 #[error(
57 "Missing blober identifier: either namespace, namespace and payer or blober PDA must be provided."
58 )]
59 MissingBloberIdentifier,
60}
61
62impl TryFrom<(Option<String>, Option<Pubkey>)> for BloberIdentifier {
63 type Error = BloberIdentifierError;
64
65 fn try_from(
66 (namespace, blober_pda): (Option<String>, Option<Pubkey>),
67 ) -> Result<Self, Self::Error> {
68 match (namespace, blober_pda) {
69 (Some(namespace), None) => Ok(namespace.into()),
70 (None, Some(pubkey)) => Ok(pubkey.into()),
71 (Some(namespace), Some(payer)) => Ok((payer, namespace).into()),
72 _ => Err(BloberIdentifierError::MissingBloberIdentifier),
73 }
74 }
75}
76
77impl From<String> for BloberIdentifier {
78 fn from(namespace: String) -> Self {
79 BloberIdentifier::Namespace(namespace)
80 }
81}
82
83impl From<(Pubkey, String)> for BloberIdentifier {
84 fn from((payer, namespace): (Pubkey, String)) -> Self {
85 BloberIdentifier::PayerAndNamespace { payer, namespace }
86 }
87}
88
89impl From<Pubkey> for BloberIdentifier {
90 fn from(pubkey: Pubkey) -> Self {
91 BloberIdentifier::Pubkey(pubkey)
92 }
93}
94
95impl BloberIdentifier {
96 pub fn to_blober_address(&self, program_id: Pubkey, payer: Pubkey) -> Pubkey {
98 match self {
99 BloberIdentifier::Namespace(namespace) => {
100 find_blober_address(program_id, payer, namespace)
101 }
102 BloberIdentifier::PayerAndNamespace { payer, namespace } => {
103 find_blober_address(program_id, *payer, namespace)
104 }
105 BloberIdentifier::Pubkey(pubkey) => *pubkey,
106 }
107 }
108
109 pub fn namespace(&self) -> Option<&str> {
111 match self {
112 BloberIdentifier::Namespace(namespace) => Some(namespace),
113 BloberIdentifier::PayerAndNamespace { namespace, .. } => Some(namespace),
114 BloberIdentifier::Pubkey(_) => None,
115 }
116 }
117}
118
119#[derive(Builder, Clone)]
120pub struct DataAnchorClient {
121 #[builder(getter(name = get_payer, vis = ""))]
122 pub(crate) payer: Arc<Keypair>,
123 #[builder(default = data_anchor_blober::id())]
124 pub(crate) program_id: Pubkey,
125 pub(crate) rpc_client: Arc<RpcClient>,
126 pub(crate) nitro_sender: NitroSender,
127 #[builder(getter(name = get_indexer, vis = ""))]
128 #[allow(dead_code, reason = "Used in builder")]
129 indexer: Option<IndexerUrl>,
130 pub(crate) indexer_client: Option<Arc<HttpClient>>,
131 pub(crate) proof_client: Option<Arc<HttpClient>>,
132 #[builder(default)]
133 pub(crate) encoding: EncodingType,
134 #[builder(default)]
135 pub(crate) compression: CompressionType,
136}
137
138impl DataAnchorClient {
139 pub fn rpc_client(&self) -> Arc<RpcClient> {
141 self.rpc_client.clone()
142 }
143
144 pub fn payer(&self) -> Arc<Keypair> {
146 self.payer.clone()
147 }
148
149 fn in_mock_env(&self) -> bool {
150 self.rpc_client.url().starts_with("MockSender")
151 }
152
153 async fn check_account_exists(&self, account: Pubkey) -> DataAnchorClientResult<bool> {
154 Ok(self
155 .rpc_client
156 .get_account_with_commitment(&account, CommitmentConfig::confirmed())
157 .await
158 .map(|res| res.value.is_some())?)
159 }
160
161 async fn require_balance(&self, cost: Lamports) -> DataAnchorClientResult {
162 let balance = self
163 .rpc_client
164 .get_balance_with_commitment(&self.payer.pubkey(), CommitmentConfig::confirmed())
165 .await
166 .map(|r| r.value)?;
167 let cost_u64 = cost.into_inner() as u64;
168 if balance < cost_u64 {
169 info!(
170 "Balance check failed: required={} lamports, available={} lamports, deficit={} lamports",
171 cost_u64,
172 balance,
173 cost_u64 - balance
174 );
175 return Err(ChainError::InsufficientBalance(cost_u64, balance).into());
176 }
177 trace!(
178 "Balance check passed: required={} lamports, available={} lamports, remaining={} lamports",
179 cost_u64,
180 balance,
181 balance - cost_u64
182 );
183 Ok(())
184 }
185
186 pub async fn encode_and_compress<T>(&self, data: &T) -> DataAnchorClientResult<Vec<u8>>
187 where
188 T: Encodable,
189 {
190 Ok(encode_and_compress_async(&self.encoding, &self.compression, data).await?)
191 }
192
193 pub async fn decompress_and_decode<T>(&self, bytes: &[u8]) -> DataAnchorClientResult<T>
194 where
195 T: Decodable,
196 {
197 Ok(decompress_and_decode_async(bytes).await?)
198 }
199
200 pub async fn decompress_and_decode_vec<T>(
201 &self,
202 slice_of_bytes: impl Iterator<Item = &[u8]>,
203 ) -> DataAnchorClientResult<Vec<T>>
204 where
205 T: Decodable,
206 {
207 futures::stream::iter(slice_of_bytes)
208 .map(|blob| async move { self.decompress_and_decode(blob).await })
209 .buffer_unordered(DEFAULT_CONCURRENCY)
210 .try_collect()
211 .await
212 }
213
214 pub async fn initialize_blober(
216 &self,
217 fee_strategy: FeeStrategy,
218 identifier: BloberIdentifier,
219 timeout: Option<Duration>,
220 ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
221 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
222
223 let in_mock_env = self.in_mock_env();
224 if !in_mock_env && self.check_account_exists(blober).await? {
225 return Err(
226 ChainError::AccountExists(format!("Blober PDA with address {blober}")).into(),
227 );
228 }
229
230 let fee = fee_strategy
231 .convert_fee_strategy_to_fixed(
232 &self.rpc_client,
233 &[blober, self.payer.pubkey()],
234 TransactionType::InitializeBlober,
235 )
236 .in_current_span()
237 .await?;
238
239 if !in_mock_env {
240 let cost = fee
241 .total_fee()
242 .checked_add(fee.rent())
243 .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
244 self.require_balance(cost).await?;
245 }
246
247 let msg = Initialize::build_message(MessageArguments::new(
248 self.program_id,
249 blober,
250 &self.payer,
251 self.rpc_client.clone(),
252 fee,
253 (
254 identifier
255 .namespace()
256 .ok_or(ChainError::MissingBloberNamespace)?
257 .to_owned(),
258 blober,
259 ),
260 ))
261 .await;
262
263 let span = info_span!(parent: Span::current(), "initialize_blober");
264 Ok(check_outcomes(
265 self.nitro_sender
266 .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
267 .instrument(span)
268 .await,
269 self.rpc_client.commitment(),
270 )
271 .map_err(ChainError::InitializeBlober)?)
272 }
273
274 pub async fn close_blober(
276 &self,
277 fee_strategy: FeeStrategy,
278 identifier: BloberIdentifier,
279 timeout: Option<Duration>,
280 ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
281 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
282
283 let in_mock_env = self.in_mock_env();
284
285 if !in_mock_env && !self.check_account_exists(blober).await? {
286 return Err(ChainError::AccountDoesNotExist(format!(
287 "Blober PDA with address {blober}"
288 ))
289 .into());
290 }
291
292 let checkpoint = self.get_checkpoint(identifier.clone()).await?;
293
294 let checkpoint_accounts = if let Some(checkpoint) = checkpoint {
295 let Some(blober_state) = self.get_blober(identifier).await? else {
296 return Err(ChainError::AccountDoesNotExist(format!(
297 "Blober PDA with address {blober}"
298 ))
299 .into());
300 };
301
302 let checkpointed_hash = checkpoint
303 .final_hash()
304 .map_err(|_| ChainError::CheckpointNotUpToDate)?;
305
306 if checkpoint.slot != blober_state.slot || checkpointed_hash != blober_state.hash {
307 return Err(ChainError::CheckpointNotUpToDate.into());
308 }
309
310 Some((
311 find_checkpoint_address(self.program_id, blober),
312 find_checkpoint_config_address(self.program_id, blober),
313 ))
314 } else {
315 None
316 };
317
318 let fee = fee_strategy
319 .convert_fee_strategy_to_fixed(
320 &self.rpc_client,
321 &[blober, self.payer.pubkey()],
322 TransactionType::CloseBlober,
323 )
324 .in_current_span()
325 .await?;
326
327 if !in_mock_env {
328 self.require_balance(fee.total_fee()).await?;
329 }
330
331 let msg = Close::build_message(MessageArguments::new(
332 self.program_id,
333 blober,
334 &self.payer,
335 self.rpc_client.clone(),
336 fee,
337 checkpoint_accounts,
338 ))
339 .await;
340
341 let span = info_span!(parent: Span::current(), "close_blober");
342 Ok(check_outcomes(
343 self.nitro_sender
344 .send(vec![(TransactionType::CloseBlober, msg)], timeout)
345 .instrument(span)
346 .await,
347 self.rpc_client.commitment(),
348 )
349 .map_err(ChainError::CloseBlober)?)
350 }
351
352 pub async fn upload_blob<T>(
359 &self,
360 blob_data: &T,
361 fee_strategy: FeeStrategy,
362 namespace: &str,
363 timeout: Option<Duration>,
364 ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)>
365 where
366 T: Encodable,
367 {
368 info!(
369 "Starting blob upload: namespace='{}', original_size={} bytes",
370 namespace,
371 std::mem::size_of_val(blob_data)
372 );
373
374 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
375 let timestamp = get_unique_timestamp();
376
377 let encoded_and_compressed = self.encode_and_compress(blob_data).await?;
378
379 info!(
380 "Blob encoding/compression completed: compressed_size={} bytes, ratio={:.2}%",
381 encoded_and_compressed.len(),
382 (encoded_and_compressed.len() as f64 / std::mem::size_of_val(blob_data) as f64) * 100.0
383 );
384
385 let blob = find_blob_address(
386 self.program_id,
387 self.payer.pubkey(),
388 blober,
389 timestamp,
390 encoded_and_compressed.len(),
391 );
392
393 info!(
394 "Created blob PDA: blob={}, blober={}, timestamp={}",
395 blob, blober, timestamp
396 );
397
398 let in_mock_env = self.in_mock_env();
399 if !in_mock_env && self.check_account_exists(blob).await? {
400 return Err(ChainError::AccountExists(format!("Blob PDA with address {blob}")).into());
401 }
402
403 let fee = self
404 .estimate_fees(encoded_and_compressed.len(), blober, fee_strategy)
405 .await?;
406
407 if !in_mock_env {
408 let cost = fee
409 .total_fee()
410 .checked_add(fee.rent())
411 .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
412 self.require_balance(cost).await?;
413 }
414
415 let upload_messages = self
416 .generate_messages(
417 blob,
418 timestamp,
419 &encoded_and_compressed,
420 fee_strategy,
421 blober,
422 )
423 .await?;
424
425 let res = self
426 .do_upload(upload_messages, timeout)
427 .in_current_span()
428 .await;
429
430 if let Err(DataAnchorClientError::ChainErrors(ChainError::DeclareBlob(_))) = res {
431 self.discard_blob(fee_strategy, blob, namespace, timeout)
432 .await
433 } else {
434 res.map(|r| (r, blob))
435 }
436 }
437
438 pub async fn discard_blob(
441 &self,
442 fee_strategy: FeeStrategy,
443 blob: Pubkey,
444 namespace: &str,
445 timeout: Option<Duration>,
446 ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
447 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
448
449 let in_mock_env = self.in_mock_env();
450 if !in_mock_env && !self.check_account_exists(blob).await? {
451 return Err(
452 ChainError::AccountDoesNotExist(format!("Blob PDA with address {blob}")).into(),
453 );
454 }
455
456 let fee = fee_strategy
457 .convert_fee_strategy_to_fixed(
458 &self.rpc_client,
459 &[blob, self.payer.pubkey()],
460 TransactionType::DiscardBlob,
461 )
462 .in_current_span()
463 .await?;
464
465 if !in_mock_env {
466 self.require_balance(fee.total_fee()).await?;
467 }
468
469 let msg = DiscardBlob::build_message(MessageArguments::new(
470 self.program_id,
471 blober,
472 &self.payer,
473 self.rpc_client.clone(),
474 fee,
475 blob,
476 ))
477 .in_current_span()
478 .await;
479
480 let span = info_span!(parent: Span::current(), "discard_blob");
481
482 Ok((
483 check_outcomes(
484 self.nitro_sender
485 .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
486 .instrument(span)
487 .await,
488 self.rpc_client.commitment(),
489 )
490 .map_err(ChainError::DiscardBlob)?,
491 blob,
492 ))
493 }
494
495 pub async fn configure_checkpoint(
498 &self,
499 fee_strategy: FeeStrategy,
500 identifier: BloberIdentifier,
501 authority: Pubkey,
502 timeout: Option<Duration>,
503 ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
504 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
505
506 let checkpoint = find_checkpoint_address(self.program_id, blober);
507 let checkpoint_config = find_checkpoint_config_address(self.program_id, blober);
508
509 let in_mock_env = self.in_mock_env();
510 if !in_mock_env && !self.check_account_exists(blober).await? {
511 return Err(ChainError::AccountDoesNotExist(format!(
512 "Blober PDA with address {blober}"
513 ))
514 .into());
515 }
516
517 let fee = fee_strategy
518 .convert_fee_strategy_to_fixed(
519 &self.rpc_client,
520 &[checkpoint, checkpoint_config, self.payer.pubkey()],
521 TransactionType::ConfigureCheckpoint,
522 )
523 .in_current_span()
524 .await?;
525
526 if !in_mock_env {
527 self.require_balance(fee.total_fee()).await?;
528 }
529
530 info!(
531 "Configuring checkpoint for blober: {}, authority: {}",
532 blober, authority
533 );
534 let msg = ConfigureCheckpoint::build_message(MessageArguments::new(
535 self.program_id,
536 blober,
537 &self.payer,
538 self.rpc_client.clone(),
539 fee,
540 authority,
541 ))
542 .in_current_span()
543 .await;
544
545 let span = info_span!(parent: Span::current(), "configure_checkpoint");
546
547 Ok((
548 check_outcomes(
549 self.nitro_sender
550 .send(vec![(TransactionType::ConfigureCheckpoint, msg)], timeout)
551 .instrument(span)
552 .await,
553 self.rpc_client.commitment(),
554 )
555 .map_err(ChainError::ConfigureCheckpoint)?,
556 checkpoint_config,
557 ))
558 }
559
560 pub async fn estimate_fees(
567 &self,
568 blob_size: usize,
569 blober: Pubkey,
570 fee_strategy: FeeStrategy,
571 ) -> DataAnchorClientResult<Fee> {
572 let prioritization_fee_rate = fee_strategy
573 .convert_fee_strategy_to_fixed(
574 &self.rpc_client,
575 &[Pubkey::new_unique(), blober, self.payer.pubkey()],
576 TransactionType::Compound,
577 )
578 .await?
579 .prioritization_fee_rate;
580
581 let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
582
583 let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
584 (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
585 } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
586 (
587 CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
588 CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
589 )
590 } else {
591 (
592 DeclareBlob::COMPUTE_UNIT_LIMIT
593 + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
594 + CompoundFinalize::COMPUTE_UNIT_LIMIT,
595 DeclareBlob::NUM_SIGNATURES
596 + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
597 + CompoundFinalize::NUM_SIGNATURES,
598 )
599 };
600
601 let price_per_signature = Lamports::new(5000);
604
605 let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
606
607 let fee = Fee {
608 num_signatures,
609 price_per_signature,
610 compute_unit_limit,
611 prioritization_fee_rate,
612 blob_account_size,
613 };
614
615 info!(
616 "Fee estimation: blob_size={} bytes, chunks={}, total_fee={} lamports (static: {}, prioritization: {})",
617 blob_size,
618 num_chunks,
619 fee.total_fee().into_inner(),
620 fee.static_fee().into_inner(),
621 fee.prioritization_fee().into_inner()
622 );
623
624 Ok(fee)
625 }
626}