1#![cfg_attr(
2 not(feature = "agave-unstable-api"),
3 deprecated(
4 since = "3.1.0",
5 note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6 v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7 acknowledge use of an interface that may break without warning."
8 )
9)]
10#![allow(clippy::arithmetic_side_effects)]
11
12use {
13 crate::bigtable::RowKey,
14 agave_reserved_account_keys::ReservedAccountKeys,
15 log::*,
16 serde::{Deserialize, Serialize},
17 solana_clock::{Slot, UnixTimestamp},
18 solana_message::v0::LoadedAddresses,
19 solana_metrics::datapoint_info,
20 solana_pubkey::Pubkey,
21 solana_serde::default_on_eof,
22 solana_signature::Signature,
23 solana_storage_proto::convert::{entries, generated, tx_by_addr},
24 solana_time_utils::AtomicInterval,
25 solana_transaction::versioned::VersionedTransaction,
26 solana_transaction_error::TransactionError,
27 solana_transaction_status::{
28 extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
29 ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo,
30 TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
31 TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
32 VersionedTransactionWithStatusMeta,
33 },
34 std::{
35 collections::{HashMap, HashSet},
36 convert::TryInto,
37 sync::{
38 atomic::{AtomicUsize, Ordering},
39 Arc,
40 },
41 time::Duration,
42 },
43 thiserror::Error,
44 tokio::task::JoinError,
45};
46
47#[macro_use]
48extern crate solana_metrics;
49
50mod access_token;
51mod bigtable;
52mod compression;
53mod root_ca_certificate;
54
55#[derive(Debug, Error)]
56pub enum Error {
57 #[error("BigTable: {0}")]
58 BigTableError(bigtable::Error),
59
60 #[error("I/O Error: {0}")]
61 IoError(std::io::Error),
62
63 #[error("Transaction encoded is not supported")]
64 UnsupportedTransactionEncoding,
65
66 #[error("Block not found: {0}")]
67 BlockNotFound(Slot),
68
69 #[error("Signature not found")]
70 SignatureNotFound,
71
72 #[error("tokio error")]
73 TokioJoinError(JoinError),
74}
75
76impl std::convert::From<bigtable::Error> for Error {
77 fn from(err: bigtable::Error) -> Self {
78 Self::BigTableError(err)
79 }
80}
81
82impl std::convert::From<std::io::Error> for Error {
83 fn from(err: std::io::Error) -> Self {
84 Self::IoError(err)
85 }
86}
87
88pub type Result<T> = std::result::Result<T, Error>;
89
90fn slot_to_key(slot: Slot) -> String {
93 format!("{slot:016x}")
94}
95
96fn slot_to_blocks_key(slot: Slot) -> String {
97 slot_to_key(slot)
98}
99
100fn slot_to_entries_key(slot: Slot) -> String {
101 slot_to_key(slot)
102}
103
104fn slot_to_tx_by_addr_key(slot: Slot) -> String {
105 slot_to_key(!slot)
106}
107
108fn key_to_slot(key: &str) -> Option<Slot> {
110 match Slot::from_str_radix(key, 16) {
111 Ok(slot) => Some(slot),
112 Err(err) => {
113 warn!("Failed to parse object key as a slot: {key}: {err}");
115 None
116 }
117 }
118}
119
120#[derive(Serialize, Deserialize)]
129struct StoredConfirmedBlock {
130 previous_blockhash: String,
131 blockhash: String,
132 parent_slot: Slot,
133 transactions: Vec<StoredConfirmedBlockTransaction>,
134 rewards: StoredConfirmedBlockRewards,
135 block_time: Option<UnixTimestamp>,
136 #[serde(deserialize_with = "default_on_eof")]
137 block_height: Option<u64>,
138}
139
140#[cfg(test)]
141impl From<ConfirmedBlock> for StoredConfirmedBlock {
142 fn from(confirmed_block: ConfirmedBlock) -> Self {
143 let ConfirmedBlock {
144 previous_blockhash,
145 blockhash,
146 parent_slot,
147 transactions,
148 rewards,
149 num_partitions: _num_partitions,
150 block_time,
151 block_height,
152 } = confirmed_block;
153
154 Self {
155 previous_blockhash,
156 blockhash,
157 parent_slot,
158 transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
159 rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
160 block_time,
161 block_height,
162 }
163 }
164}
165
166impl From<StoredConfirmedBlock> for ConfirmedBlock {
167 fn from(confirmed_block: StoredConfirmedBlock) -> Self {
168 let StoredConfirmedBlock {
169 previous_blockhash,
170 blockhash,
171 parent_slot,
172 transactions,
173 rewards,
174 block_time,
175 block_height,
176 } = confirmed_block;
177
178 Self {
179 previous_blockhash,
180 blockhash,
181 parent_slot,
182 transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
183 rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
184 num_partitions: None,
185 block_time,
186 block_height,
187 }
188 }
189}
190
191#[derive(Serialize, Deserialize)]
192struct StoredConfirmedBlockTransaction {
193 transaction: VersionedTransaction,
194 meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
195}
196
197#[cfg(test)]
198impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
199 fn from(value: TransactionWithStatusMeta) -> Self {
200 match value {
201 TransactionWithStatusMeta::MissingMetadata(transaction) => Self {
202 transaction: VersionedTransaction::from(transaction),
203 meta: None,
204 },
205 TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
206 transaction,
207 meta,
208 }) => Self {
209 transaction,
210 meta: Some(meta.into()),
211 },
212 }
213 }
214}
215
216impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
217 fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self {
218 let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta;
219 match meta {
220 None => Self::MissingMetadata(
221 transaction
222 .into_legacy_transaction()
223 .expect("versioned transactions always have meta"),
224 ),
225 Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta {
226 transaction,
227 meta: meta.into(),
228 }),
229 }
230 }
231}
232
233#[derive(Serialize, Deserialize)]
234struct StoredConfirmedBlockTransactionStatusMeta {
235 err: Option<TransactionError>,
236 fee: u64,
237 pre_balances: Vec<u64>,
238 post_balances: Vec<u64>,
239}
240
241impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
242 fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
243 let StoredConfirmedBlockTransactionStatusMeta {
244 err,
245 fee,
246 pre_balances,
247 post_balances,
248 } = value;
249 let status = match &err {
250 None => Ok(()),
251 Some(err) => Err(err.clone()),
252 };
253 Self {
254 status,
255 fee,
256 pre_balances,
257 post_balances,
258 inner_instructions: None,
259 log_messages: None,
260 pre_token_balances: None,
261 post_token_balances: None,
262 rewards: None,
263 loaded_addresses: LoadedAddresses::default(),
264 return_data: None,
265 compute_units_consumed: None,
266 cost_units: None,
267 }
268 }
269}
270
271impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
272 fn from(value: TransactionStatusMeta) -> Self {
273 let TransactionStatusMeta {
274 status,
275 fee,
276 pre_balances,
277 post_balances,
278 ..
279 } = value;
280 Self {
281 err: status.err(),
282 fee,
283 pre_balances,
284 post_balances,
285 }
286 }
287}
288
289type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
290
291#[derive(Serialize, Deserialize)]
292struct StoredConfirmedBlockReward {
293 pubkey: String,
294 lamports: i64,
295}
296
297impl From<StoredConfirmedBlockReward> for Reward {
298 fn from(value: StoredConfirmedBlockReward) -> Self {
299 let StoredConfirmedBlockReward { pubkey, lamports } = value;
300 Self {
301 pubkey,
302 lamports,
303 post_balance: 0,
304 reward_type: None,
305 commission: None,
306 }
307 }
308}
309
310impl From<Reward> for StoredConfirmedBlockReward {
311 fn from(value: Reward) -> Self {
312 let Reward {
313 pubkey, lamports, ..
314 } = value;
315 Self { pubkey, lamports }
316 }
317}
318
319#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
321struct TransactionInfo {
322 slot: Slot, index: u32, err: Option<TransactionError>, memo: Option<String>, }
327
328#[derive(PartialEq, Eq, Debug)]
330struct UploadedTransaction {
331 slot: Slot, index: u32, err: Option<TransactionError>, }
335
336impl From<TransactionInfo> for UploadedTransaction {
337 fn from(transaction_info: TransactionInfo) -> Self {
338 Self {
339 slot: transaction_info.slot,
340 index: transaction_info.index,
341 err: transaction_info.err,
342 }
343 }
344}
345
346impl From<TransactionInfo> for TransactionStatus {
347 fn from(transaction_info: TransactionInfo) -> Self {
348 let TransactionInfo { slot, err, .. } = transaction_info;
349 let status = match &err {
350 None => Ok(()),
351 Some(err) => Err(err.clone()),
352 };
353 Self {
354 slot,
355 confirmations: None,
356 status,
357 err,
358 confirmation_status: Some(TransactionConfirmationStatus::Finalized),
359 }
360 }
361}
362
363#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
364struct LegacyTransactionByAddrInfo {
365 pub signature: Signature, pub err: Option<TransactionError>, pub index: u32, pub memo: Option<String>, }
370
371impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
372 fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
373 let LegacyTransactionByAddrInfo {
374 signature,
375 err,
376 index,
377 memo,
378 } = legacy;
379
380 Self {
381 signature,
382 err,
383 index,
384 memo,
385 block_time: None,
386 }
387 }
388}
389
390pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
391pub const DEFAULT_APP_PROFILE_ID: &str = "default";
392pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; #[derive(Debug)]
395pub enum CredentialType {
396 Filepath(Option<String>),
397 Stringified(String),
398}
399
400#[derive(Debug)]
401pub struct LedgerStorageConfig {
402 pub read_only: bool,
403 pub timeout: Option<std::time::Duration>,
404 pub credential_type: CredentialType,
405 pub instance_name: String,
406 pub app_profile_id: String,
407 pub max_message_size: usize,
408}
409
410impl Default for LedgerStorageConfig {
411 fn default() -> Self {
412 Self {
413 read_only: true,
414 timeout: None,
415 credential_type: CredentialType::Filepath(None),
416 instance_name: DEFAULT_INSTANCE_NAME.to_string(),
417 app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
418 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
419 }
420 }
421}
422
423const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
424
425#[derive(Default)]
426struct LedgerStorageStats {
427 num_queries: AtomicUsize,
428 last_report: AtomicInterval,
429}
430
431impl LedgerStorageStats {
432 fn increment_num_queries(&self) {
433 self.num_queries.fetch_add(1, Ordering::Relaxed);
434 self.maybe_report();
435 }
436
437 fn maybe_report(&self) {
438 if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
439 datapoint_debug!(
440 "storage-bigtable-query",
441 (
442 "num_queries",
443 self.num_queries.swap(0, Ordering::Relaxed) as i64,
444 i64
445 )
446 );
447 }
448 }
449}
450
451#[derive(Clone)]
452pub struct LedgerStorage {
453 connection: bigtable::BigTableConnection,
454 stats: Arc<LedgerStorageStats>,
455}
456
457impl LedgerStorage {
458 pub async fn new(
459 read_only: bool,
460 timeout: Option<std::time::Duration>,
461 credential_path: Option<String>,
462 ) -> Result<Self> {
463 Self::new_with_config(LedgerStorageConfig {
464 read_only,
465 timeout,
466 credential_type: CredentialType::Filepath(credential_path),
467 ..LedgerStorageConfig::default()
468 })
469 .await
470 }
471
472 pub fn new_for_emulator(
473 instance_name: &str,
474 app_profile_id: &str,
475 endpoint: &str,
476 timeout: Option<Duration>,
477 ) -> Result<Self> {
478 let stats = Arc::new(LedgerStorageStats::default());
479 Ok(Self {
480 connection: bigtable::BigTableConnection::new_for_emulator(
481 instance_name,
482 app_profile_id,
483 endpoint,
484 timeout,
485 LedgerStorageConfig::default().max_message_size,
486 )?,
487 stats,
488 })
489 }
490
491 pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
492 let stats = Arc::new(LedgerStorageStats::default());
493 let LedgerStorageConfig {
494 read_only,
495 timeout,
496 instance_name,
497 app_profile_id,
498 credential_type,
499 max_message_size,
500 } = config;
501 let connection = bigtable::BigTableConnection::new(
502 instance_name.as_str(),
503 app_profile_id.as_str(),
504 read_only,
505 timeout,
506 credential_type,
507 max_message_size,
508 )
509 .await?;
510 Ok(Self { stats, connection })
511 }
512
513 pub async fn new_with_stringified_credential(credential: String) -> Result<Self> {
514 Self::new_with_config(LedgerStorageConfig {
515 credential_type: CredentialType::Stringified(credential),
516 ..LedgerStorageConfig::default()
517 })
518 .await
519 }
520
521 pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
523 trace!("LedgerStorage::get_first_available_block request received");
524 self.stats.increment_num_queries();
525 let mut bigtable = self.connection.client();
526 let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
527 if blocks.is_empty() {
528 return Ok(None);
529 }
530 Ok(key_to_slot(&blocks[0]))
531 }
532
533 pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
538 trace!("LedgerStorage::get_confirmed_blocks request received: {start_slot:?} {limit:?}");
539 self.stats.increment_num_queries();
540 let mut bigtable = self.connection.client();
541 let blocks = bigtable
542 .get_row_keys(
543 "blocks",
544 Some(slot_to_blocks_key(start_slot)),
545 None,
546 limit as i64,
547 )
548 .await?;
549 Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
550 }
551
552 pub async fn get_confirmed_blocks_with_data<'a>(
554 &self,
555 slots: &'a [Slot],
556 ) -> Result<impl Iterator<Item = (Slot, ConfirmedBlock)> + 'a> {
557 trace!("LedgerStorage::get_confirmed_blocks_with_data request received: {slots:?}");
558 self.stats.increment_num_queries();
559 let mut bigtable = self.connection.client();
560 let row_keys = slots.iter().copied().map(slot_to_blocks_key);
561 let data = bigtable
562 .get_protobuf_or_bincode_cells("blocks", row_keys)
563 .await?
564 .filter_map(
565 |(row_key, block_cell_data): (
566 RowKey,
567 bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
568 )| {
569 let block = match block_cell_data {
570 bigtable::CellData::Bincode(block) => block.into(),
571 bigtable::CellData::Protobuf(block) => block.try_into().ok()?,
572 };
573 Some((key_to_slot(&row_key).unwrap(), block))
574 },
575 );
576 Ok(data)
577 }
578
579 pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
581 trace!("LedgerStorage::get_confirmed_block request received: {slot:?}");
582 self.stats.increment_num_queries();
583 let mut bigtable = self.connection.client();
584 let block_cell_data = bigtable
585 .get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
586 "blocks",
587 slot_to_blocks_key(slot),
588 )
589 .await
590 .map_err(|err| match err {
591 bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
592 _ => err.into(),
593 })?;
594 Ok(match block_cell_data {
595 bigtable::CellData::Bincode(block) => block.into(),
596 bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
597 bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
598 })?,
599 })
600 }
601
602 pub async fn confirmed_block_exists(&self, slot: Slot) -> Result<bool> {
604 trace!("LedgerStorage::confirmed_block_exists request received: {slot:?}");
605 self.stats.increment_num_queries();
606 let mut bigtable = self.connection.client();
607
608 let block_exists = bigtable
609 .row_key_exists("blocks", slot_to_blocks_key(slot))
610 .await?;
611
612 Ok(block_exists)
613 }
614
615 pub async fn get_entries(&self, slot: Slot) -> Result<impl Iterator<Item = EntrySummary>> {
617 trace!("LedgerStorage::get_block_entries request received: {slot:?}");
618 self.stats.increment_num_queries();
619 let mut bigtable = self.connection.client();
620 let entry_cell_data = bigtable
621 .get_protobuf_cell::<entries::Entries>("entries", slot_to_entries_key(slot))
622 .await
623 .map_err(|err| match err {
624 bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
625 _ => err.into(),
626 })?;
627 let entries = entry_cell_data.entries.into_iter().map(Into::into);
628 Ok(entries)
629 }
630
631 pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
632 trace!("LedgerStorage::get_signature_status request received: {signature:?}");
633 self.stats.increment_num_queries();
634 let mut bigtable = self.connection.client();
635 let transaction_info = bigtable
636 .get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
637 .await
638 .map_err(|err| match err {
639 bigtable::Error::RowNotFound => Error::SignatureNotFound,
640 _ => err.into(),
641 })?;
642 Ok(transaction_info.into())
643 }
644
645 pub async fn get_confirmed_transactions(
647 &self,
648 signatures: &[Signature],
649 ) -> Result<Vec<ConfirmedTransactionWithStatusMeta>> {
650 trace!("LedgerStorage::get_confirmed_transactions request received: {signatures:?}");
651 self.stats.increment_num_queries();
652 let mut bigtable = self.connection.client();
653
654 let keys = signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>();
656 let cells = bigtable
657 .get_bincode_cells::<TransactionInfo>("tx", &keys)
658 .await?;
659
660 let mut order: Vec<(Slot, u32, String)> = Vec::new();
662 let mut slots: HashSet<Slot> = HashSet::new();
663 for cell in cells {
664 if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell {
665 order.push((slot, index, signature));
666 slots.insert(slot);
667 }
668 }
669
670 let blocks = self
672 .get_confirmed_blocks_with_data(&slots.into_iter().collect::<Vec<_>>())
673 .await?
674 .collect::<HashMap<_, _>>();
675
676 Ok(order
678 .into_iter()
679 .filter_map(|(slot, index, signature)| {
680 blocks.get(&slot).and_then(|block| {
681 block
682 .transactions
683 .get(index as usize)
684 .and_then(|tx_with_meta| {
685 if tx_with_meta.transaction_signature().to_string() != *signature {
686 warn!(
687 "Transaction info or confirmed block for {signature} is \
688 corrupt"
689 );
690 None
691 } else {
692 Some(ConfirmedTransactionWithStatusMeta {
693 slot,
694 tx_with_meta: tx_with_meta.clone(),
695 block_time: block.block_time,
696 })
697 }
698 })
699 })
700 })
701 .collect::<Vec<_>>())
702 }
703
704 pub async fn get_confirmed_transaction(
706 &self,
707 signature: &Signature,
708 ) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
709 trace!("LedgerStorage::get_confirmed_transaction request received: {signature:?}");
710 self.stats.increment_num_queries();
711 let mut bigtable = self.connection.client();
712
713 let TransactionInfo { slot, index, .. } = bigtable
715 .get_bincode_cell("tx", signature.to_string())
716 .await
717 .map_err(|err| match err {
718 bigtable::Error::RowNotFound => Error::SignatureNotFound,
719 _ => err.into(),
720 })?;
721
722 let block = self.get_confirmed_block(slot).await?;
724 match block.transactions.into_iter().nth(index as usize) {
725 None => {
726 warn!("Transaction info for {signature} is corrupt");
728 Ok(None)
729 }
730 Some(tx_with_meta) => {
731 if tx_with_meta.transaction_signature() != signature {
732 warn!("Transaction info or confirmed block for {signature} is corrupt");
733 Ok(None)
734 } else {
735 Ok(Some(ConfirmedTransactionWithStatusMeta {
736 slot,
737 tx_with_meta,
738 block_time: block.block_time,
739 }))
740 }
741 }
742 }
743 }
744
745 pub async fn get_confirmed_signatures_for_address(
752 &self,
753 address: &Pubkey,
754 before_signature: Option<&Signature>,
755 until_signature: Option<&Signature>,
756 limit: usize,
757 ) -> Result<
758 Vec<(
759 ConfirmedTransactionStatusWithSignature,
760 u32, )>,
762 > {
763 trace!("LedgerStorage::get_confirmed_signatures_for_address request received: {address:?}");
764 self.stats.increment_num_queries();
765 let mut bigtable = self.connection.client();
766 let address_prefix = format!("{address}/");
767
768 let (first_slot, before_transaction_index) = match before_signature {
770 None => (Slot::MAX, 0),
771 Some(before_signature) => {
772 let TransactionInfo { slot, index, .. } = bigtable
773 .get_bincode_cell("tx", before_signature.to_string())
774 .await
775 .map_err(|err| match err {
776 bigtable::Error::RowNotFound => Error::SignatureNotFound,
777 _ => err.into(),
778 })?;
779
780 (slot, index)
781 }
782 };
783
784 let (last_slot, until_transaction_index) = match until_signature {
786 None => (0, u32::MAX),
787 Some(until_signature) => {
788 let TransactionInfo { slot, index, .. } = bigtable
789 .get_bincode_cell("tx", until_signature.to_string())
790 .await
791 .map_err(|err| match err {
792 bigtable::Error::RowNotFound => Error::SignatureNotFound,
793 _ => err.into(),
794 })?;
795
796 (slot, index)
797 }
798 };
799
800 let mut infos = vec![];
801
802 let starting_slot_tx_len = bigtable
803 .get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
804 "tx-by-addr",
805 format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
806 )
807 .await
808 .map(|cell_data| {
809 match cell_data {
810 bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
811 bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
812 }
813 })
814 .unwrap_or(0);
815
816 let tx_by_addr_data = bigtable
819 .get_row_data(
820 "tx-by-addr",
821 Some(format!(
822 "{}{}",
823 address_prefix,
824 slot_to_tx_by_addr_key(first_slot),
825 )),
826 Some(format!(
827 "{}{}",
828 address_prefix,
829 slot_to_tx_by_addr_key(last_slot),
830 )),
831 limit as i64 + starting_slot_tx_len as i64,
832 )
833 .await?;
834
835 'outer: for (row_key, data) in tx_by_addr_data {
836 let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
837 bigtable::Error::ObjectCorrupt(format!(
838 "Failed to convert key to slot: tx-by-addr/{row_key}"
839 ))
840 })?;
841
842 let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
843 Vec<LegacyTransactionByAddrInfo>,
844 tx_by_addr::TransactionByAddr,
845 >(&data, "tx-by-addr", row_key.clone())?;
846
847 let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
848 bigtable::CellData::Bincode(tx_by_addr) => {
849 tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
850 }
851 bigtable::CellData::Protobuf(tx_by_addr) => {
852 tx_by_addr.try_into().map_err(|error| {
853 bigtable::Error::ObjectCorrupt(format!(
854 "Failed to deserialize: {}: tx-by-addr/{}",
855 error,
856 row_key.clone()
857 ))
858 })?
859 }
860 };
861
862 cell_data.reverse();
863 for tx_by_addr_info in cell_data.into_iter() {
864 if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
866 continue;
867 }
868 if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
870 continue;
871 }
872 infos.push((
873 ConfirmedTransactionStatusWithSignature {
874 signature: tx_by_addr_info.signature,
875 slot,
876 err: tx_by_addr_info.err,
877 memo: tx_by_addr_info.memo,
878 block_time: tx_by_addr_info.block_time,
879 },
880 tx_by_addr_info.index,
881 ));
882 if infos.len() >= limit {
884 break 'outer;
885 }
886 }
887 }
888 Ok(infos)
889 }
890
891 pub async fn upload_confirmed_block(
893 &self,
894 slot: Slot,
895 confirmed_block: VersionedConfirmedBlock,
896 ) -> Result<()> {
897 trace!("LedgerStorage::upload_confirmed_block request received: {slot:?}");
898 self.upload_confirmed_block_with_entries(
899 slot,
900 VersionedConfirmedBlockWithEntries {
901 block: confirmed_block,
902 entries: vec![],
903 },
904 )
905 .await
906 }
907
908 pub async fn upload_confirmed_block_with_entries(
909 &self,
910 slot: Slot,
911 confirmed_block: VersionedConfirmedBlockWithEntries,
912 ) -> Result<()> {
913 trace!("LedgerStorage::upload_confirmed_block_with_entries request received: {slot:?}");
914 let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
915 let VersionedConfirmedBlockWithEntries {
916 block: confirmed_block,
917 entries,
918 } = confirmed_block;
919
920 let reserved_account_keys = ReservedAccountKeys::new_all_activated();
921 let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
922 for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
923 let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
924 let err = meta.status.clone().err();
925 let index = index as u32;
926 let signature = transaction.signatures[0];
927 let memo = extract_and_fmt_memos(transaction_with_meta);
928
929 for address in transaction_with_meta.account_keys().iter() {
930 if !reserved_account_keys.is_reserved(address) {
935 by_addr
936 .entry(address)
937 .or_default()
938 .push(TransactionByAddrInfo {
939 signature,
940 err: err.clone(),
941 index,
942 memo: memo.clone(),
943 block_time: confirmed_block.block_time,
944 });
945 }
946 }
947
948 tx_cells.push((
949 signature.to_string(),
950 TransactionInfo {
951 slot,
952 index,
953 err,
954 memo,
955 },
956 ));
957 }
958
959 let tx_by_addr_cells: Vec<_> = by_addr
960 .into_iter()
961 .map(|(address, transaction_info_by_addr)| {
962 (
963 format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
964 tx_by_addr::TransactionByAddr {
965 tx_by_addrs: transaction_info_by_addr
966 .into_iter()
967 .map(|by_addr| by_addr.into())
968 .collect(),
969 },
970 )
971 })
972 .collect();
973
974 let num_entries = entries.len();
975 let entry_cell = (
976 slot_to_entries_key(slot),
977 entries::Entries {
978 entries: entries.into_iter().enumerate().map(Into::into).collect(),
979 },
980 );
981
982 let mut tasks = vec![];
983
984 if !tx_cells.is_empty() {
985 let conn = self.connection.clone();
986 tasks.push(tokio::spawn(async move {
987 conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
988 .await
989 }));
990 }
991
992 if !tx_by_addr_cells.is_empty() {
993 let conn = self.connection.clone();
994 tasks.push(tokio::spawn(async move {
995 conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
996 "tx-by-addr",
997 &tx_by_addr_cells,
998 )
999 .await
1000 }));
1001 }
1002
1003 if num_entries > 0 {
1004 let conn = self.connection.clone();
1005 tasks.push(tokio::spawn(async move {
1006 conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
1007 .await
1008 }));
1009 }
1010
1011 let mut bytes_written = 0;
1012 let mut maybe_first_err: Option<Error> = None;
1013
1014 let results = futures::future::join_all(tasks).await;
1015 for result in results {
1016 match result {
1017 Err(err) => {
1018 if maybe_first_err.is_none() {
1019 maybe_first_err = Some(Error::TokioJoinError(err));
1020 }
1021 }
1022 Ok(Err(err)) => {
1023 if maybe_first_err.is_none() {
1024 maybe_first_err = Some(Error::BigTableError(err));
1025 }
1026 }
1027 Ok(Ok(bytes)) => {
1028 bytes_written += bytes;
1029 }
1030 }
1031 }
1032
1033 if let Some(err) = maybe_first_err {
1034 return Err(err);
1035 }
1036
1037 let num_transactions = confirmed_block.transactions.len();
1038
1039 let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
1043 bytes_written += self
1044 .connection
1045 .put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
1046 .await?;
1047 datapoint_info!(
1048 "storage-bigtable-upload-block",
1049 ("slot", slot, i64),
1050 ("transactions", num_transactions, i64),
1051 ("entries", num_entries, i64),
1052 ("bytes", bytes_written, i64),
1053 );
1054 Ok(())
1055 }
1056
1057 pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
1059 let mut addresses: HashSet<&Pubkey> = HashSet::new();
1060 let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
1061 let confirmed_block = self.get_confirmed_block(slot).await?;
1062 for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
1063 match transaction_with_meta {
1064 TransactionWithStatusMeta::MissingMetadata(transaction) => {
1065 let signature = transaction.signatures[0];
1066 let index = index as u32;
1067 let err = None;
1068
1069 for address in transaction.message.account_keys.iter() {
1070 addresses.insert(address);
1077 }
1078
1079 expected_tx_infos.insert(
1080 signature.to_string(),
1081 UploadedTransaction { slot, index, err },
1082 );
1083 }
1084 TransactionWithStatusMeta::Complete(tx_with_meta) => {
1085 let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta;
1086 let signature = transaction.signatures[0];
1087 let index = index as u32;
1088 let err = meta.status.clone().err();
1089
1090 for address in tx_with_meta.account_keys().iter() {
1091 addresses.insert(address);
1098 }
1099
1100 expected_tx_infos.insert(
1101 signature.to_string(),
1102 UploadedTransaction { slot, index, err },
1103 );
1104 }
1105 }
1106 }
1107
1108 let address_slot_rows: Vec<_> = addresses
1109 .into_iter()
1110 .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
1111 .collect();
1112
1113 let tx_deletion_rows = if !expected_tx_infos.is_empty() {
1114 let signatures = expected_tx_infos.keys().cloned().collect::<Vec<_>>();
1115 let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
1116 self.connection
1117 .get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
1118 .await?
1119 .into_iter()
1120 .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
1121 .collect::<HashMap<_, _>>();
1122
1123 let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
1124 for (signature, expected_tx_info) in expected_tx_infos {
1125 match fetched_tx_infos.get(&signature) {
1126 Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
1127 deletion_rows.push(signature);
1128 }
1129 Some(Ok(fetched_tx_info)) => {
1130 warn!(
1131 "skipped tx row {} because the bigtable entry ({:?}) did not match to \
1132 {:?}",
1133 signature, fetched_tx_info, &expected_tx_info,
1134 );
1135 }
1136 Some(Err(err)) => {
1137 warn!(
1138 "skipped tx row {signature} because the bigtable entry was corrupted: \
1139 {err:?}"
1140 );
1141 }
1142 None => {
1143 warn!("skipped tx row {signature} because it was not found");
1144 }
1145 }
1146 }
1147 deletion_rows
1148 } else {
1149 vec![]
1150 };
1151
1152 let entries_exist = self
1153 .connection
1154 .client()
1155 .row_key_exists("entries", slot_to_entries_key(slot))
1156 .await
1157 .is_ok_and(|x| x);
1158
1159 if !dry_run {
1160 if !address_slot_rows.is_empty() {
1161 self.connection
1162 .delete_rows_with_retry("tx-by-addr", &address_slot_rows)
1163 .await?;
1164 }
1165
1166 if !tx_deletion_rows.is_empty() {
1167 self.connection
1168 .delete_rows_with_retry("tx", &tx_deletion_rows)
1169 .await?;
1170 }
1171
1172 if entries_exist {
1173 self.connection
1174 .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)])
1175 .await?;
1176 }
1177
1178 self.connection
1179 .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
1180 .await?;
1181 }
1182
1183 info!(
1184 "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} \
1185 entry row",
1186 if dry_run { "[dry run] " } else { "" },
1187 slot,
1188 tx_deletion_rows.len(),
1189 address_slot_rows.len(),
1190 if entries_exist { "with" } else { "WITHOUT" }
1191 );
1192
1193 Ok(())
1194 }
1195}
1196
1197#[cfg(test)]
1198mod test {
1199 use super::*;
1200
1201 #[test]
1202 fn test_slot_to_key() {
1203 assert_eq!(slot_to_key(0), "0000000000000000");
1204 assert_eq!(slot_to_key(!0), "ffffffffffffffff");
1205 }
1206}