1use std::{sync::Arc, time::Duration};
2
3use anchor_lang::{Discriminator, Space, prelude::Pubkey};
4use bon::Builder;
5use data_anchor_blober::{
6 CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
7 find_checkpoint_address, find_checkpoint_config_address,
8 instruction::{
9 Close, ConfigureCheckpoint, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk,
10 },
11 state::blober::Blober,
12};
13use data_anchor_utils::{
14 compression::{self, DataAnchorCompressionAsync},
15 decompress_and_decode_async, encode_and_compress_async,
16 encoding::{self, DataAnchorEncoding, Decodable, Encodable},
17};
18use futures::{StreamExt, TryStreamExt};
19use jsonrpsee::http_client::HttpClient;
20use solana_commitment_config::CommitmentConfig;
21use solana_keypair::Keypair;
22use solana_rpc_client::nonblocking::rpc_client::RpcClient;
23use solana_signer::Signer;
24use tracing::{Instrument, Span, info, info_span};
25
26use crate::{
27 DataAnchorClientError, DataAnchorClientResult,
28 batch_client::{BatchClient, SuccessfulTransaction},
29 constants::DEFAULT_CONCURRENCY,
30 fees::{Fee, FeeStrategy, Lamports},
31 helpers::{check_outcomes, get_unique_timestamp},
32 tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
33 types::TransactionType,
34};
35
36mod builder;
37mod indexer_client;
38mod ledger_client;
39mod proof_client;
40
41pub use indexer_client::IndexerError;
42pub use ledger_client::ChainError;
43pub use proof_client::ProofError;
44
45#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum BloberIdentifier {
48 Namespace(String),
49 PayerAndNamespace { payer: Pubkey, namespace: String },
50 Pubkey(Pubkey),
51}
52
53#[derive(Debug, thiserror::Error)]
54pub enum BloberIdentifierError {
55 #[error(
57 "Missing blober identifier: either namespace, namespace and payer or blober PDA must be provided."
58 )]
59 MissingBloberIdentifier,
60}
61
62impl TryFrom<(Option<String>, Option<Pubkey>)> for BloberIdentifier {
63 type Error = BloberIdentifierError;
64
65 fn try_from(
66 (namespace, blober_pda): (Option<String>, Option<Pubkey>),
67 ) -> Result<Self, Self::Error> {
68 match (namespace, blober_pda) {
69 (Some(namespace), None) => Ok(namespace.into()),
70 (None, Some(pubkey)) => Ok(pubkey.into()),
71 (Some(namespace), Some(payer)) => Ok((payer, namespace).into()),
72 _ => Err(BloberIdentifierError::MissingBloberIdentifier),
73 }
74 }
75}
76
77impl From<String> for BloberIdentifier {
78 fn from(namespace: String) -> Self {
79 BloberIdentifier::Namespace(namespace)
80 }
81}
82
83impl From<(Pubkey, String)> for BloberIdentifier {
84 fn from((payer, namespace): (Pubkey, String)) -> Self {
85 BloberIdentifier::PayerAndNamespace { payer, namespace }
86 }
87}
88
89impl From<Pubkey> for BloberIdentifier {
90 fn from(pubkey: Pubkey) -> Self {
91 BloberIdentifier::Pubkey(pubkey)
92 }
93}
94
95impl BloberIdentifier {
96 pub fn to_blober_address(&self, program_id: Pubkey, payer: Pubkey) -> Pubkey {
98 match self {
99 BloberIdentifier::Namespace(namespace) => {
100 find_blober_address(program_id, payer, namespace)
101 }
102 BloberIdentifier::PayerAndNamespace { payer, namespace } => {
103 find_blober_address(program_id, *payer, namespace)
104 }
105 BloberIdentifier::Pubkey(pubkey) => *pubkey,
106 }
107 }
108
109 pub fn namespace(&self) -> Option<&str> {
111 match self {
112 BloberIdentifier::Namespace(namespace) => Some(namespace),
113 BloberIdentifier::PayerAndNamespace { namespace, .. } => Some(namespace),
114 BloberIdentifier::Pubkey(_) => None,
115 }
116 }
117}
118
119#[derive(Builder, Clone)]
120pub struct DataAnchorClient<Encoding = encoding::Default, Compression = compression::Default>
121where
122 Encoding: DataAnchorEncoding + Default,
123 Compression: DataAnchorCompressionAsync,
124{
125 #[builder(getter(name = get_payer, vis = ""))]
126 pub(crate) payer: Arc<Keypair>,
127 #[builder(default = data_anchor_blober::id())]
128 pub(crate) program_id: Pubkey,
129 pub(crate) rpc_client: Arc<RpcClient>,
130 pub(crate) batch_client: BatchClient,
131 pub(crate) indexer_client: Option<Arc<HttpClient>>,
132 pub(crate) proof_client: Option<Arc<HttpClient>>,
133 #[builder(default)]
134 pub(crate) encoding: Encoding,
135 #[builder(default)]
136 pub(crate) compression: Compression,
137}
138
139impl<Encoding, Compression> DataAnchorClient<Encoding, Compression>
140where
141 Encoding: DataAnchorEncoding + Default,
142 Compression: DataAnchorCompressionAsync,
143{
144 pub fn rpc_client(&self) -> Arc<RpcClient> {
146 self.rpc_client.clone()
147 }
148
149 pub fn payer(&self) -> Arc<Keypair> {
151 self.payer.clone()
152 }
153
154 fn in_mock_env(&self) -> bool {
155 self.rpc_client.url().starts_with("MockSender")
156 }
157
158 async fn check_account_exists(&self, account: Pubkey) -> DataAnchorClientResult<bool> {
159 Ok(self
160 .rpc_client
161 .get_account_with_commitment(&account, CommitmentConfig::confirmed())
162 .await
163 .map(|res| res.value.is_some())?)
164 }
165
166 async fn require_balance(&self, cost: Lamports) -> DataAnchorClientResult {
167 let balance = self
168 .rpc_client
169 .get_balance_with_commitment(&self.payer.pubkey(), CommitmentConfig::confirmed())
170 .await
171 .map(|r| r.value)?;
172 let cost_u64 = cost.into_inner() as u64;
173 if balance < cost_u64 {
174 return Err(ChainError::InsufficientBalance(cost_u64, balance).into());
175 }
176 Ok(())
177 }
178
179 pub async fn encode_and_compress<T>(&self, data: &T) -> DataAnchorClientResult<Vec<u8>>
180 where
181 T: Encodable,
182 {
183 Ok(encode_and_compress_async(&self.encoding, &self.compression, data).await?)
184 }
185
186 pub async fn decompress_and_decode<T>(&self, bytes: &[u8]) -> DataAnchorClientResult<T>
187 where
188 T: Decodable,
189 {
190 Ok(decompress_and_decode_async(&self.encoding, &self.compression, bytes).await?)
191 }
192
193 pub async fn decompress_and_decode_vec<T>(
194 &self,
195 slice_of_bytes: impl Iterator<Item = &[u8]>,
196 ) -> DataAnchorClientResult<Vec<T>>
197 where
198 T: Decodable,
199 {
200 futures::stream::iter(slice_of_bytes)
201 .map(|blob| async move { self.decompress_and_decode(blob).await })
202 .buffer_unordered(DEFAULT_CONCURRENCY)
203 .try_collect()
204 .await
205 }
206
207 pub async fn initialize_blober(
209 &self,
210 fee_strategy: FeeStrategy,
211 identifier: BloberIdentifier,
212 timeout: Option<Duration>,
213 ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
214 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
215
216 let in_mock_env = self.in_mock_env();
217 if !in_mock_env && self.check_account_exists(blober).await? {
218 return Err(
219 ChainError::AccountExists(format!("Blober PDA with address {blober}")).into(),
220 );
221 }
222
223 let fee = fee_strategy
224 .convert_fee_strategy_to_fixed(
225 &self.rpc_client,
226 &[blober, self.payer.pubkey()],
227 TransactionType::InitializeBlober,
228 )
229 .in_current_span()
230 .await?;
231
232 if !in_mock_env {
233 let cost = fee
234 .total_fee()
235 .checked_add(fee.rent())
236 .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
237 self.require_balance(cost).await?;
238 }
239
240 let msg = Initialize::build_message(MessageArguments::new(
241 self.program_id,
242 blober,
243 &self.payer,
244 self.rpc_client.clone(),
245 fee,
246 (
247 identifier
248 .namespace()
249 .ok_or(ChainError::MissingBloberNamespace)?
250 .to_owned(),
251 blober,
252 ),
253 ))
254 .await
255 .expect("infallible with a fixed fee strategy");
256
257 let span = info_span!(parent: Span::current(), "initialize_blober");
258 Ok(check_outcomes(
259 self.batch_client
260 .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
261 .instrument(span)
262 .await,
263 )
264 .map_err(ChainError::InitializeBlober)?)
265 }
266
267 pub async fn close_blober(
269 &self,
270 fee_strategy: FeeStrategy,
271 identifier: BloberIdentifier,
272 timeout: Option<Duration>,
273 ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
274 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
275
276 let in_mock_env = self.in_mock_env();
277
278 if !in_mock_env && !self.check_account_exists(blober).await? {
279 return Err(ChainError::AccountDoesNotExist(format!(
280 "Blober PDA with address {blober}"
281 ))
282 .into());
283 }
284
285 let checkpoint = self.get_checkpoint(identifier.clone()).await?;
286
287 let checkpoint_accounts = if let Some(checkpoint) = checkpoint {
288 let Some(blober_state) = self.get_blober(identifier).await? else {
289 return Err(ChainError::AccountDoesNotExist(format!(
290 "Blober PDA with address {blober}"
291 ))
292 .into());
293 };
294
295 let checkpointed_hash = checkpoint
296 .final_hash()
297 .map_err(|_| ChainError::CheckpointNotUpToDate)?;
298
299 if checkpoint.slot != blober_state.slot || checkpointed_hash != blober_state.hash {
300 return Err(ChainError::CheckpointNotUpToDate.into());
301 }
302
303 Some((
304 find_checkpoint_address(self.program_id, blober),
305 find_checkpoint_config_address(self.program_id, blober),
306 ))
307 } else {
308 None
309 };
310
311 let fee = fee_strategy
312 .convert_fee_strategy_to_fixed(
313 &self.rpc_client,
314 &[blober, self.payer.pubkey()],
315 TransactionType::CloseBlober,
316 )
317 .in_current_span()
318 .await?;
319
320 if !in_mock_env {
321 self.require_balance(fee.total_fee()).await?;
322 }
323
324 let msg = Close::build_message(MessageArguments::new(
325 self.program_id,
326 blober,
327 &self.payer,
328 self.rpc_client.clone(),
329 fee,
330 checkpoint_accounts,
331 ))
332 .await
333 .expect("infallible with a fixed fee strategy");
334
335 let span = info_span!(parent: Span::current(), "close_blober");
336 Ok(check_outcomes(
337 self.batch_client
338 .send(vec![(TransactionType::CloseBlober, msg)], timeout)
339 .instrument(span)
340 .await,
341 )
342 .map_err(ChainError::CloseBlober)?)
343 }
344
345 pub async fn upload_blob<T>(
352 &self,
353 blob_data: &T,
354 fee_strategy: FeeStrategy,
355 namespace: &str,
356 timeout: Option<Duration>,
357 ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)>
358 where
359 T: Encodable,
360 {
361 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
362 let timestamp = get_unique_timestamp();
363
364 let encoded_and_compressed = self.encode_and_compress(blob_data).await?;
365
366 let blob = find_blob_address(
367 self.program_id,
368 self.payer.pubkey(),
369 blober,
370 timestamp,
371 encoded_and_compressed.len(),
372 );
373
374 let in_mock_env = self.in_mock_env();
375 if !in_mock_env && self.check_account_exists(blob).await? {
376 return Err(ChainError::AccountExists(format!("Blob PDA with address {blob}")).into());
377 }
378
379 let fee = self
380 .estimate_fees(encoded_and_compressed.len(), blober, fee_strategy)
381 .await?;
382
383 if !in_mock_env {
384 let cost = fee
385 .total_fee()
386 .checked_add(fee.rent())
387 .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
388 self.require_balance(cost).await?;
389 }
390
391 let upload_messages = self
392 .generate_messages(
393 blob,
394 timestamp,
395 &encoded_and_compressed,
396 fee_strategy,
397 blober,
398 )
399 .await?;
400
401 let res = self
402 .do_upload(upload_messages, timeout)
403 .in_current_span()
404 .await;
405
406 if let Err(DataAnchorClientError::ChainErrors(ChainError::DeclareBlob(_))) = res {
407 self.discard_blob(fee_strategy, blob, namespace, timeout)
408 .await
409 } else {
410 res.map(|r| (r, blob))
411 }
412 }
413
414 pub async fn discard_blob(
417 &self,
418 fee_strategy: FeeStrategy,
419 blob: Pubkey,
420 namespace: &str,
421 timeout: Option<Duration>,
422 ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
423 let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
424
425 let in_mock_env = self.in_mock_env();
426 if !in_mock_env && !self.check_account_exists(blob).await? {
427 return Err(
428 ChainError::AccountDoesNotExist(format!("Blob PDA with address {blob}")).into(),
429 );
430 }
431
432 let fee = fee_strategy
433 .convert_fee_strategy_to_fixed(
434 &self.rpc_client,
435 &[blob, self.payer.pubkey()],
436 TransactionType::DiscardBlob,
437 )
438 .in_current_span()
439 .await?;
440
441 if !in_mock_env {
442 self.require_balance(fee.total_fee()).await?;
443 }
444
445 let msg = DiscardBlob::build_message(MessageArguments::new(
446 self.program_id,
447 blober,
448 &self.payer,
449 self.rpc_client.clone(),
450 fee,
451 blob,
452 ))
453 .in_current_span()
454 .await
455 .expect("infallible with a fixed fee strategy");
456
457 let span = info_span!(parent: Span::current(), "discard_blob");
458
459 Ok((
460 check_outcomes(
461 self.batch_client
462 .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
463 .instrument(span)
464 .await,
465 )
466 .map_err(ChainError::DiscardBlob)?,
467 blob,
468 ))
469 }
470
471 pub async fn configure_checkpoint(
474 &self,
475 fee_strategy: FeeStrategy,
476 identifier: BloberIdentifier,
477 authority: Pubkey,
478 timeout: Option<Duration>,
479 ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
480 let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
481
482 let checkpoint = find_checkpoint_address(self.program_id, blober);
483 let checkpoint_config = find_checkpoint_config_address(self.program_id, blober);
484
485 let in_mock_env = self.in_mock_env();
486 if !in_mock_env && !self.check_account_exists(blober).await? {
487 return Err(ChainError::AccountDoesNotExist(format!(
488 "Blober PDA with address {blober}"
489 ))
490 .into());
491 }
492
493 let fee = fee_strategy
494 .convert_fee_strategy_to_fixed(
495 &self.rpc_client,
496 &[checkpoint, checkpoint_config, self.payer.pubkey()],
497 TransactionType::ConfigureCheckpoint,
498 )
499 .in_current_span()
500 .await?;
501
502 if !in_mock_env {
503 self.require_balance(fee.total_fee()).await?;
504 }
505
506 info!(
507 "Configuring checkpoint for blober: {}, authority: {}",
508 blober, authority
509 );
510 let msg = ConfigureCheckpoint::build_message(MessageArguments::new(
511 self.program_id,
512 blober,
513 &self.payer,
514 self.rpc_client.clone(),
515 fee,
516 authority,
517 ))
518 .in_current_span()
519 .await
520 .expect("infallible with a fixed fee strategy");
521
522 let span = info_span!(parent: Span::current(), "configure_checkpoint");
523
524 Ok((
525 check_outcomes(
526 self.batch_client
527 .send(vec![(TransactionType::ConfigureCheckpoint, msg)], timeout)
528 .instrument(span)
529 .await,
530 )
531 .map_err(ChainError::ConfigureCheckpoint)?,
532 checkpoint_config,
533 ))
534 }
535
536 pub async fn estimate_fees(
543 &self,
544 blob_size: usize,
545 blober: Pubkey,
546 fee_strategy: FeeStrategy,
547 ) -> DataAnchorClientResult<Fee> {
548 let prioritization_fee_rate = fee_strategy
549 .convert_fee_strategy_to_fixed(
550 &self.rpc_client,
551 &[Pubkey::new_unique(), blober, self.payer.pubkey()],
552 TransactionType::Compound,
553 )
554 .await?
555 .prioritization_fee_rate;
556
557 let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
558
559 let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
560 (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
561 } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
562 (
563 CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
564 CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
565 )
566 } else {
567 (
568 DeclareBlob::COMPUTE_UNIT_LIMIT
569 + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
570 + CompoundFinalize::COMPUTE_UNIT_LIMIT,
571 DeclareBlob::NUM_SIGNATURES
572 + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
573 + CompoundFinalize::NUM_SIGNATURES,
574 )
575 };
576
577 let price_per_signature = Lamports::new(5000);
580
581 let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
582
583 Ok(Fee {
584 num_signatures,
585 price_per_signature,
586 compute_unit_limit,
587 prioritization_fee_rate,
588 blob_account_size,
589 })
590 }
591}