1#![allow(clippy::arithmetic_side_effects)]
2
3use {
4 crate::bigtable::RowKey,
5 clone_agave_reserved_account_keys::ReservedAccountKeys,
6 clone_solana_clock::{Slot, UnixTimestamp},
7 clone_solana_message::v0::LoadedAddresses,
8 clone_solana_metrics::datapoint_info,
9 clone_solana_pubkey::Pubkey,
10 clone_solana_serde::default_on_eof,
11 clone_solana_signature::Signature,
12 clone_solana_storage_proto::convert::{entries, generated, tx_by_addr},
13 clone_solana_time_utils::AtomicInterval,
14 clone_solana_transaction::versioned::VersionedTransaction,
15 clone_solana_transaction_error::TransactionError,
16 clone_solana_transaction_status::{
17 extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
18 ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo,
19 TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
20 TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
21 VersionedTransactionWithStatusMeta,
22 },
23 log::*,
24 serde::{Deserialize, Serialize},
25 std::{
26 collections::{HashMap, HashSet},
27 convert::TryInto,
28 sync::{
29 atomic::{AtomicUsize, Ordering},
30 Arc,
31 },
32 time::Duration,
33 },
34 thiserror::Error,
35 tokio::task::JoinError,
36};
37
38#[macro_use]
39extern crate clone_solana_metrics;
40
41#[macro_use]
42extern crate serde_derive;
43
44mod access_token;
45mod bigtable;
46mod compression;
47mod root_ca_certificate;
48
49#[derive(Debug, Error)]
50pub enum Error {
51 #[error("BigTable: {0}")]
52 BigTableError(bigtable::Error),
53
54 #[error("I/O Error: {0}")]
55 IoError(std::io::Error),
56
57 #[error("Transaction encoded is not supported")]
58 UnsupportedTransactionEncoding,
59
60 #[error("Block not found: {0}")]
61 BlockNotFound(Slot),
62
63 #[error("Signature not found")]
64 SignatureNotFound,
65
66 #[error("tokio error")]
67 TokioJoinError(JoinError),
68}
69
70impl std::convert::From<bigtable::Error> for Error {
71 fn from(err: bigtable::Error) -> Self {
72 Self::BigTableError(err)
73 }
74}
75
76impl std::convert::From<std::io::Error> for Error {
77 fn from(err: std::io::Error) -> Self {
78 Self::IoError(err)
79 }
80}
81
82pub type Result<T> = std::result::Result<T, Error>;
83
84fn slot_to_key(slot: Slot) -> String {
87 format!("{slot:016x}")
88}
89
90fn slot_to_blocks_key(slot: Slot) -> String {
91 slot_to_key(slot)
92}
93
94fn slot_to_entries_key(slot: Slot) -> String {
95 slot_to_key(slot)
96}
97
98fn slot_to_tx_by_addr_key(slot: Slot) -> String {
99 slot_to_key(!slot)
100}
101
102fn key_to_slot(key: &str) -> Option<Slot> {
104 match Slot::from_str_radix(key, 16) {
105 Ok(slot) => Some(slot),
106 Err(err) => {
107 warn!("Failed to parse object key as a slot: {}: {}", key, err);
109 None
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize)]
123struct StoredConfirmedBlock {
124 previous_blockhash: String,
125 blockhash: String,
126 parent_slot: Slot,
127 transactions: Vec<StoredConfirmedBlockTransaction>,
128 rewards: StoredConfirmedBlockRewards,
129 block_time: Option<UnixTimestamp>,
130 #[serde(deserialize_with = "default_on_eof")]
131 block_height: Option<u64>,
132}
133
134#[cfg(test)]
135impl From<ConfirmedBlock> for StoredConfirmedBlock {
136 fn from(confirmed_block: ConfirmedBlock) -> Self {
137 let ConfirmedBlock {
138 previous_blockhash,
139 blockhash,
140 parent_slot,
141 transactions,
142 rewards,
143 num_partitions: _num_partitions,
144 block_time,
145 block_height,
146 } = confirmed_block;
147
148 Self {
149 previous_blockhash,
150 blockhash,
151 parent_slot,
152 transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
153 rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
154 block_time,
155 block_height,
156 }
157 }
158}
159
160impl From<StoredConfirmedBlock> for ConfirmedBlock {
161 fn from(confirmed_block: StoredConfirmedBlock) -> Self {
162 let StoredConfirmedBlock {
163 previous_blockhash,
164 blockhash,
165 parent_slot,
166 transactions,
167 rewards,
168 block_time,
169 block_height,
170 } = confirmed_block;
171
172 Self {
173 previous_blockhash,
174 blockhash,
175 parent_slot,
176 transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
177 rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
178 num_partitions: None,
179 block_time,
180 block_height,
181 }
182 }
183}
184
185#[derive(Serialize, Deserialize)]
186struct StoredConfirmedBlockTransaction {
187 transaction: VersionedTransaction,
188 meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
189}
190
191#[cfg(test)]
192impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
193 fn from(value: TransactionWithStatusMeta) -> Self {
194 match value {
195 TransactionWithStatusMeta::MissingMetadata(transaction) => Self {
196 transaction: VersionedTransaction::from(transaction),
197 meta: None,
198 },
199 TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
200 transaction,
201 meta,
202 }) => Self {
203 transaction,
204 meta: Some(meta.into()),
205 },
206 }
207 }
208}
209
210impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
211 fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self {
212 let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta;
213 match meta {
214 None => Self::MissingMetadata(
215 transaction
216 .into_legacy_transaction()
217 .expect("versioned transactions always have meta"),
218 ),
219 Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta {
220 transaction,
221 meta: meta.into(),
222 }),
223 }
224 }
225}
226
227#[derive(Serialize, Deserialize)]
228struct StoredConfirmedBlockTransactionStatusMeta {
229 err: Option<TransactionError>,
230 fee: u64,
231 pre_balances: Vec<u64>,
232 post_balances: Vec<u64>,
233}
234
235impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
236 fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
237 let StoredConfirmedBlockTransactionStatusMeta {
238 err,
239 fee,
240 pre_balances,
241 post_balances,
242 } = value;
243 let status = match &err {
244 None => Ok(()),
245 Some(err) => Err(err.clone()),
246 };
247 Self {
248 status,
249 fee,
250 pre_balances,
251 post_balances,
252 inner_instructions: None,
253 log_messages: None,
254 pre_token_balances: None,
255 post_token_balances: None,
256 rewards: None,
257 loaded_addresses: LoadedAddresses::default(),
258 return_data: None,
259 compute_units_consumed: None,
260 }
261 }
262}
263
264impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
265 fn from(value: TransactionStatusMeta) -> Self {
266 let TransactionStatusMeta {
267 status,
268 fee,
269 pre_balances,
270 post_balances,
271 ..
272 } = value;
273 Self {
274 err: status.err(),
275 fee,
276 pre_balances,
277 post_balances,
278 }
279 }
280}
281
282type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
283
284#[derive(Serialize, Deserialize)]
285struct StoredConfirmedBlockReward {
286 pubkey: String,
287 lamports: i64,
288}
289
290impl From<StoredConfirmedBlockReward> for Reward {
291 fn from(value: StoredConfirmedBlockReward) -> Self {
292 let StoredConfirmedBlockReward { pubkey, lamports } = value;
293 Self {
294 pubkey,
295 lamports,
296 post_balance: 0,
297 reward_type: None,
298 commission: None,
299 }
300 }
301}
302
303impl From<Reward> for StoredConfirmedBlockReward {
304 fn from(value: Reward) -> Self {
305 let Reward {
306 pubkey, lamports, ..
307 } = value;
308 Self { pubkey, lamports }
309 }
310}
311
312#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
314struct TransactionInfo {
315 slot: Slot, index: u32, err: Option<TransactionError>, memo: Option<String>, }
320
321#[derive(PartialEq, Eq, Debug)]
323struct UploadedTransaction {
324 slot: Slot, index: u32, err: Option<TransactionError>, }
328
329impl From<TransactionInfo> for UploadedTransaction {
330 fn from(transaction_info: TransactionInfo) -> Self {
331 Self {
332 slot: transaction_info.slot,
333 index: transaction_info.index,
334 err: transaction_info.err,
335 }
336 }
337}
338
339impl From<TransactionInfo> for TransactionStatus {
340 fn from(transaction_info: TransactionInfo) -> Self {
341 let TransactionInfo { slot, err, .. } = transaction_info;
342 let status = match &err {
343 None => Ok(()),
344 Some(err) => Err(err.clone()),
345 };
346 Self {
347 slot,
348 confirmations: None,
349 status,
350 err,
351 confirmation_status: Some(TransactionConfirmationStatus::Finalized),
352 }
353 }
354}
355
356#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
357struct LegacyTransactionByAddrInfo {
358 pub signature: Signature, pub err: Option<TransactionError>, pub index: u32, pub memo: Option<String>, }
363
364impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
365 fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
366 let LegacyTransactionByAddrInfo {
367 signature,
368 err,
369 index,
370 memo,
371 } = legacy;
372
373 Self {
374 signature,
375 err,
376 index,
377 memo,
378 block_time: None,
379 }
380 }
381}
382
383pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
384pub const DEFAULT_APP_PROFILE_ID: &str = "default";
385pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; #[derive(Debug)]
388pub enum CredentialType {
389 Filepath(Option<String>),
390 Stringified(String),
391}
392
393#[derive(Debug)]
394pub struct LedgerStorageConfig {
395 pub read_only: bool,
396 pub timeout: Option<std::time::Duration>,
397 pub credential_type: CredentialType,
398 pub instance_name: String,
399 pub app_profile_id: String,
400 pub max_message_size: usize,
401}
402
403impl Default for LedgerStorageConfig {
404 fn default() -> Self {
405 Self {
406 read_only: true,
407 timeout: None,
408 credential_type: CredentialType::Filepath(None),
409 instance_name: DEFAULT_INSTANCE_NAME.to_string(),
410 app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
411 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
412 }
413 }
414}
415
416const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
417
418#[derive(Default)]
419struct LedgerStorageStats {
420 num_queries: AtomicUsize,
421 last_report: AtomicInterval,
422}
423
424impl LedgerStorageStats {
425 fn increment_num_queries(&self) {
426 self.num_queries.fetch_add(1, Ordering::Relaxed);
427 self.maybe_report();
428 }
429
430 fn maybe_report(&self) {
431 if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
432 datapoint_debug!(
433 "storage-bigtable-query",
434 (
435 "num_queries",
436 self.num_queries.swap(0, Ordering::Relaxed) as i64,
437 i64
438 )
439 );
440 }
441 }
442}
443
444#[derive(Clone)]
445pub struct LedgerStorage {
446 connection: bigtable::BigTableConnection,
447 stats: Arc<LedgerStorageStats>,
448}
449
450impl LedgerStorage {
451 pub async fn new(
452 read_only: bool,
453 timeout: Option<std::time::Duration>,
454 credential_path: Option<String>,
455 ) -> Result<Self> {
456 Self::new_with_config(LedgerStorageConfig {
457 read_only,
458 timeout,
459 credential_type: CredentialType::Filepath(credential_path),
460 ..LedgerStorageConfig::default()
461 })
462 .await
463 }
464
465 pub fn new_for_emulator(
466 instance_name: &str,
467 app_profile_id: &str,
468 endpoint: &str,
469 timeout: Option<Duration>,
470 ) -> Result<Self> {
471 let stats = Arc::new(LedgerStorageStats::default());
472 Ok(Self {
473 connection: bigtable::BigTableConnection::new_for_emulator(
474 instance_name,
475 app_profile_id,
476 endpoint,
477 timeout,
478 LedgerStorageConfig::default().max_message_size,
479 )?,
480 stats,
481 })
482 }
483
484 pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
485 let stats = Arc::new(LedgerStorageStats::default());
486 let LedgerStorageConfig {
487 read_only,
488 timeout,
489 instance_name,
490 app_profile_id,
491 credential_type,
492 max_message_size,
493 } = config;
494 let connection = bigtable::BigTableConnection::new(
495 instance_name.as_str(),
496 app_profile_id.as_str(),
497 read_only,
498 timeout,
499 credential_type,
500 max_message_size,
501 )
502 .await?;
503 Ok(Self { stats, connection })
504 }
505
506 pub async fn new_with_stringified_credential(credential: String) -> Result<Self> {
507 Self::new_with_config(LedgerStorageConfig {
508 credential_type: CredentialType::Stringified(credential),
509 ..LedgerStorageConfig::default()
510 })
511 .await
512 }
513
514 pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
516 trace!("LedgerStorage::get_first_available_block request received");
517 self.stats.increment_num_queries();
518 let mut bigtable = self.connection.client();
519 let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
520 if blocks.is_empty() {
521 return Ok(None);
522 }
523 Ok(key_to_slot(&blocks[0]))
524 }
525
526 pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
531 trace!(
532 "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}",
533 start_slot,
534 limit
535 );
536 self.stats.increment_num_queries();
537 let mut bigtable = self.connection.client();
538 let blocks = bigtable
539 .get_row_keys(
540 "blocks",
541 Some(slot_to_blocks_key(start_slot)),
542 None,
543 limit as i64,
544 )
545 .await?;
546 Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
547 }
548
549 pub async fn get_confirmed_blocks_with_data<'a>(
551 &self,
552 slots: &'a [Slot],
553 ) -> Result<impl Iterator<Item = (Slot, ConfirmedBlock)> + 'a> {
554 trace!(
555 "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}",
556 slots
557 );
558 self.stats.increment_num_queries();
559 let mut bigtable = self.connection.client();
560 let row_keys = slots.iter().copied().map(slot_to_blocks_key);
561 let data = bigtable
562 .get_protobuf_or_bincode_cells("blocks", row_keys)
563 .await?
564 .filter_map(
565 |(row_key, block_cell_data): (
566 RowKey,
567 bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
568 )| {
569 let block = match block_cell_data {
570 bigtable::CellData::Bincode(block) => block.into(),
571 bigtable::CellData::Protobuf(block) => block.try_into().ok()?,
572 };
573 Some((key_to_slot(&row_key).unwrap(), block))
574 },
575 );
576 Ok(data)
577 }
578
579 pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
581 trace!(
582 "LedgerStorage::get_confirmed_block request received: {:?}",
583 slot
584 );
585 self.stats.increment_num_queries();
586 let mut bigtable = self.connection.client();
587 let block_cell_data = bigtable
588 .get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
589 "blocks",
590 slot_to_blocks_key(slot),
591 )
592 .await
593 .map_err(|err| match err {
594 bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
595 _ => err.into(),
596 })?;
597 Ok(match block_cell_data {
598 bigtable::CellData::Bincode(block) => block.into(),
599 bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
600 bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
601 })?,
602 })
603 }
604
605 pub async fn confirmed_block_exists(&self, slot: Slot) -> Result<bool> {
607 trace!(
608 "LedgerStorage::confirmed_block_exists request received: {:?}",
609 slot
610 );
611 self.stats.increment_num_queries();
612 let mut bigtable = self.connection.client();
613
614 let block_exists = bigtable
615 .row_key_exists("blocks", slot_to_blocks_key(slot))
616 .await?;
617
618 Ok(block_exists)
619 }
620
621 pub async fn get_entries(&self, slot: Slot) -> Result<impl Iterator<Item = EntrySummary>> {
623 trace!(
624 "LedgerStorage::get_block_entries request received: {:?}",
625 slot
626 );
627 self.stats.increment_num_queries();
628 let mut bigtable = self.connection.client();
629 let entry_cell_data = bigtable
630 .get_protobuf_cell::<entries::Entries>("entries", slot_to_entries_key(slot))
631 .await
632 .map_err(|err| match err {
633 bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
634 _ => err.into(),
635 })?;
636 let entries = entry_cell_data.entries.into_iter().map(Into::into);
637 Ok(entries)
638 }
639
640 pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
641 trace!(
642 "LedgerStorage::get_signature_status request received: {:?}",
643 signature
644 );
645 self.stats.increment_num_queries();
646 let mut bigtable = self.connection.client();
647 let transaction_info = bigtable
648 .get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
649 .await
650 .map_err(|err| match err {
651 bigtable::Error::RowNotFound => Error::SignatureNotFound,
652 _ => err.into(),
653 })?;
654 Ok(transaction_info.into())
655 }
656
657 pub async fn get_confirmed_transactions(
659 &self,
660 signatures: &[Signature],
661 ) -> Result<Vec<ConfirmedTransactionWithStatusMeta>> {
662 trace!(
663 "LedgerStorage::get_confirmed_transactions request received: {:?}",
664 signatures
665 );
666 self.stats.increment_num_queries();
667 let mut bigtable = self.connection.client();
668
669 let keys = signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>();
671 let cells = bigtable
672 .get_bincode_cells::<TransactionInfo>("tx", &keys)
673 .await?;
674
675 let mut order: Vec<(Slot, u32, String)> = Vec::new();
677 let mut slots: HashSet<Slot> = HashSet::new();
678 for cell in cells {
679 if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell {
680 order.push((slot, index, signature));
681 slots.insert(slot);
682 }
683 }
684
685 let blocks = self
687 .get_confirmed_blocks_with_data(&slots.into_iter().collect::<Vec<_>>())
688 .await?
689 .collect::<HashMap<_, _>>();
690
691 Ok(order
693 .into_iter()
694 .filter_map(|(slot, index, signature)| {
695 blocks.get(&slot).and_then(|block| {
696 block
697 .transactions
698 .get(index as usize)
699 .and_then(|tx_with_meta| {
700 if tx_with_meta.transaction_signature().to_string() != *signature {
701 warn!(
702 "Transaction info or confirmed block for {} is corrupt",
703 signature
704 );
705 None
706 } else {
707 Some(ConfirmedTransactionWithStatusMeta {
708 slot,
709 tx_with_meta: tx_with_meta.clone(),
710 block_time: block.block_time,
711 })
712 }
713 })
714 })
715 })
716 .collect::<Vec<_>>())
717 }
718
719 pub async fn get_confirmed_transaction(
721 &self,
722 signature: &Signature,
723 ) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
724 trace!(
725 "LedgerStorage::get_confirmed_transaction request received: {:?}",
726 signature
727 );
728 self.stats.increment_num_queries();
729 let mut bigtable = self.connection.client();
730
731 let TransactionInfo { slot, index, .. } = bigtable
733 .get_bincode_cell("tx", signature.to_string())
734 .await
735 .map_err(|err| match err {
736 bigtable::Error::RowNotFound => Error::SignatureNotFound,
737 _ => err.into(),
738 })?;
739
740 let block = self.get_confirmed_block(slot).await?;
742 match block.transactions.into_iter().nth(index as usize) {
743 None => {
744 warn!("Transaction info for {} is corrupt", signature);
746 Ok(None)
747 }
748 Some(tx_with_meta) => {
749 if tx_with_meta.transaction_signature() != signature {
750 warn!(
751 "Transaction info or confirmed block for {} is corrupt",
752 signature
753 );
754 Ok(None)
755 } else {
756 Ok(Some(ConfirmedTransactionWithStatusMeta {
757 slot,
758 tx_with_meta,
759 block_time: block.block_time,
760 }))
761 }
762 }
763 }
764 }
765
766 pub async fn get_confirmed_signatures_for_address(
773 &self,
774 address: &Pubkey,
775 before_signature: Option<&Signature>,
776 until_signature: Option<&Signature>,
777 limit: usize,
778 ) -> Result<
779 Vec<(
780 ConfirmedTransactionStatusWithSignature,
781 u32, )>,
783 > {
784 trace!(
785 "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}",
786 address
787 );
788 self.stats.increment_num_queries();
789 let mut bigtable = self.connection.client();
790 let address_prefix = format!("{address}/");
791
792 let (first_slot, before_transaction_index) = match before_signature {
794 None => (Slot::MAX, 0),
795 Some(before_signature) => {
796 let TransactionInfo { slot, index, .. } = bigtable
797 .get_bincode_cell("tx", before_signature.to_string())
798 .await
799 .map_err(|err| match err {
800 bigtable::Error::RowNotFound => Error::SignatureNotFound,
801 _ => err.into(),
802 })?;
803
804 (slot, index)
805 }
806 };
807
808 let (last_slot, until_transaction_index) = match until_signature {
810 None => (0, u32::MAX),
811 Some(until_signature) => {
812 let TransactionInfo { slot, index, .. } = bigtable
813 .get_bincode_cell("tx", until_signature.to_string())
814 .await
815 .map_err(|err| match err {
816 bigtable::Error::RowNotFound => Error::SignatureNotFound,
817 _ => err.into(),
818 })?;
819
820 (slot, index)
821 }
822 };
823
824 let mut infos = vec![];
825
826 let starting_slot_tx_len = bigtable
827 .get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
828 "tx-by-addr",
829 format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
830 )
831 .await
832 .map(|cell_data| {
833 match cell_data {
834 bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
835 bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
836 }
837 })
838 .unwrap_or(0);
839
840 let tx_by_addr_data = bigtable
843 .get_row_data(
844 "tx-by-addr",
845 Some(format!(
846 "{}{}",
847 address_prefix,
848 slot_to_tx_by_addr_key(first_slot),
849 )),
850 Some(format!(
851 "{}{}",
852 address_prefix,
853 slot_to_tx_by_addr_key(last_slot),
854 )),
855 limit as i64 + starting_slot_tx_len as i64,
856 )
857 .await?;
858
859 'outer: for (row_key, data) in tx_by_addr_data {
860 let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
861 bigtable::Error::ObjectCorrupt(format!(
862 "Failed to convert key to slot: tx-by-addr/{row_key}"
863 ))
864 })?;
865
866 let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
867 Vec<LegacyTransactionByAddrInfo>,
868 tx_by_addr::TransactionByAddr,
869 >(&data, "tx-by-addr", row_key.clone())?;
870
871 let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
872 bigtable::CellData::Bincode(tx_by_addr) => {
873 tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
874 }
875 bigtable::CellData::Protobuf(tx_by_addr) => {
876 tx_by_addr.try_into().map_err(|error| {
877 bigtable::Error::ObjectCorrupt(format!(
878 "Failed to deserialize: {}: tx-by-addr/{}",
879 error,
880 row_key.clone()
881 ))
882 })?
883 }
884 };
885
886 cell_data.reverse();
887 for tx_by_addr_info in cell_data.into_iter() {
888 if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
890 continue;
891 }
892 if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
894 continue;
895 }
896 infos.push((
897 ConfirmedTransactionStatusWithSignature {
898 signature: tx_by_addr_info.signature,
899 slot,
900 err: tx_by_addr_info.err,
901 memo: tx_by_addr_info.memo,
902 block_time: tx_by_addr_info.block_time,
903 },
904 tx_by_addr_info.index,
905 ));
906 if infos.len() >= limit {
908 break 'outer;
909 }
910 }
911 }
912 Ok(infos)
913 }
914
915 pub async fn upload_confirmed_block(
917 &self,
918 slot: Slot,
919 confirmed_block: VersionedConfirmedBlock,
920 ) -> Result<()> {
921 trace!(
922 "LedgerStorage::upload_confirmed_block request received: {:?}",
923 slot
924 );
925 self.upload_confirmed_block_with_entries(
926 slot,
927 VersionedConfirmedBlockWithEntries {
928 block: confirmed_block,
929 entries: vec![],
930 },
931 )
932 .await
933 }
934
935 pub async fn upload_confirmed_block_with_entries(
936 &self,
937 slot: Slot,
938 confirmed_block: VersionedConfirmedBlockWithEntries,
939 ) -> Result<()> {
940 trace!(
941 "LedgerStorage::upload_confirmed_block_with_entries request received: {:?}",
942 slot
943 );
944 let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
945 let VersionedConfirmedBlockWithEntries {
946 block: confirmed_block,
947 entries,
948 } = confirmed_block;
949
950 let reserved_account_keys = ReservedAccountKeys::new_all_activated();
951 let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
952 for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
953 let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
954 let err = meta.status.clone().err();
955 let index = index as u32;
956 let signature = transaction.signatures[0];
957 let memo = extract_and_fmt_memos(transaction_with_meta);
958
959 for address in transaction_with_meta.account_keys().iter() {
960 if !reserved_account_keys.is_reserved(address) {
965 by_addr
966 .entry(address)
967 .or_default()
968 .push(TransactionByAddrInfo {
969 signature,
970 err: err.clone(),
971 index,
972 memo: memo.clone(),
973 block_time: confirmed_block.block_time,
974 });
975 }
976 }
977
978 tx_cells.push((
979 signature.to_string(),
980 TransactionInfo {
981 slot,
982 index,
983 err,
984 memo,
985 },
986 ));
987 }
988
989 let tx_by_addr_cells: Vec<_> = by_addr
990 .into_iter()
991 .map(|(address, transaction_info_by_addr)| {
992 (
993 format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
994 tx_by_addr::TransactionByAddr {
995 tx_by_addrs: transaction_info_by_addr
996 .into_iter()
997 .map(|by_addr| by_addr.into())
998 .collect(),
999 },
1000 )
1001 })
1002 .collect();
1003
1004 let num_entries = entries.len();
1005 let entry_cell = (
1006 slot_to_entries_key(slot),
1007 entries::Entries {
1008 entries: entries.into_iter().enumerate().map(Into::into).collect(),
1009 },
1010 );
1011
1012 let mut tasks = vec![];
1013
1014 if !tx_cells.is_empty() {
1015 let conn = self.connection.clone();
1016 tasks.push(tokio::spawn(async move {
1017 conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
1018 .await
1019 }));
1020 }
1021
1022 if !tx_by_addr_cells.is_empty() {
1023 let conn = self.connection.clone();
1024 tasks.push(tokio::spawn(async move {
1025 conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
1026 "tx-by-addr",
1027 &tx_by_addr_cells,
1028 )
1029 .await
1030 }));
1031 }
1032
1033 if num_entries > 0 {
1034 let conn = self.connection.clone();
1035 tasks.push(tokio::spawn(async move {
1036 conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
1037 .await
1038 }));
1039 }
1040
1041 let mut bytes_written = 0;
1042 let mut maybe_first_err: Option<Error> = None;
1043
1044 let results = futures::future::join_all(tasks).await;
1045 for result in results {
1046 match result {
1047 Err(err) => {
1048 if maybe_first_err.is_none() {
1049 maybe_first_err = Some(Error::TokioJoinError(err));
1050 }
1051 }
1052 Ok(Err(err)) => {
1053 if maybe_first_err.is_none() {
1054 maybe_first_err = Some(Error::BigTableError(err));
1055 }
1056 }
1057 Ok(Ok(bytes)) => {
1058 bytes_written += bytes;
1059 }
1060 }
1061 }
1062
1063 if let Some(err) = maybe_first_err {
1064 return Err(err);
1065 }
1066
1067 let num_transactions = confirmed_block.transactions.len();
1068
1069 let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
1073 bytes_written += self
1074 .connection
1075 .put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
1076 .await?;
1077 datapoint_info!(
1078 "storage-bigtable-upload-block",
1079 ("slot", slot, i64),
1080 ("transactions", num_transactions, i64),
1081 ("entries", num_entries, i64),
1082 ("bytes", bytes_written, i64),
1083 );
1084 Ok(())
1085 }
1086
1087 pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
1089 let mut addresses: HashSet<&Pubkey> = HashSet::new();
1090 let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
1091 let confirmed_block = self.get_confirmed_block(slot).await?;
1092 for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
1093 match transaction_with_meta {
1094 TransactionWithStatusMeta::MissingMetadata(transaction) => {
1095 let signature = transaction.signatures[0];
1096 let index = index as u32;
1097 let err = None;
1098
1099 for address in transaction.message.account_keys.iter() {
1100 addresses.insert(address);
1107 }
1108
1109 expected_tx_infos.insert(
1110 signature.to_string(),
1111 UploadedTransaction { slot, index, err },
1112 );
1113 }
1114 TransactionWithStatusMeta::Complete(tx_with_meta) => {
1115 let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta;
1116 let signature = transaction.signatures[0];
1117 let index = index as u32;
1118 let err = meta.status.clone().err();
1119
1120 for address in tx_with_meta.account_keys().iter() {
1121 addresses.insert(address);
1128 }
1129
1130 expected_tx_infos.insert(
1131 signature.to_string(),
1132 UploadedTransaction { slot, index, err },
1133 );
1134 }
1135 }
1136 }
1137
1138 let address_slot_rows: Vec<_> = addresses
1139 .into_iter()
1140 .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
1141 .collect();
1142
1143 let tx_deletion_rows = if !expected_tx_infos.is_empty() {
1144 let signatures = expected_tx_infos.keys().cloned().collect::<Vec<_>>();
1145 let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
1146 self.connection
1147 .get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
1148 .await?
1149 .into_iter()
1150 .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
1151 .collect::<HashMap<_, _>>();
1152
1153 let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
1154 for (signature, expected_tx_info) in expected_tx_infos {
1155 match fetched_tx_infos.get(&signature) {
1156 Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
1157 deletion_rows.push(signature);
1158 }
1159 Some(Ok(fetched_tx_info)) => {
1160 warn!(
1161 "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}",
1162 signature,
1163 fetched_tx_info,
1164 &expected_tx_info,
1165 );
1166 }
1167 Some(Err(err)) => {
1168 warn!(
1169 "skipped tx row {} because the bigtable entry was corrupted: {:?}",
1170 signature, err
1171 );
1172 }
1173 None => {
1174 warn!("skipped tx row {} because it was not found", signature);
1175 }
1176 }
1177 }
1178 deletion_rows
1179 } else {
1180 vec![]
1181 };
1182
1183 let entries_exist = self
1184 .connection
1185 .client()
1186 .row_key_exists("entries", slot_to_entries_key(slot))
1187 .await
1188 .is_ok_and(|x| x);
1189
1190 if !dry_run {
1191 if !address_slot_rows.is_empty() {
1192 self.connection
1193 .delete_rows_with_retry("tx-by-addr", &address_slot_rows)
1194 .await?;
1195 }
1196
1197 if !tx_deletion_rows.is_empty() {
1198 self.connection
1199 .delete_rows_with_retry("tx", &tx_deletion_rows)
1200 .await?;
1201 }
1202
1203 if entries_exist {
1204 self.connection
1205 .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)])
1206 .await?;
1207 }
1208
1209 self.connection
1210 .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
1211 .await?;
1212 }
1213
1214 info!(
1215 "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} entry row",
1216 if dry_run { "[dry run] " } else { "" },
1217 slot,
1218 tx_deletion_rows.len(),
1219 address_slot_rows.len(),
1220 if entries_exist { "with" } else {"WITHOUT"}
1221 );
1222
1223 Ok(())
1224 }
1225}
1226
1227#[cfg(test)]
1228mod test {
1229 use super::*;
1230
1231 #[test]
1232 fn test_slot_to_key() {
1233 assert_eq!(slot_to_key(0), "0000000000000000");
1234 assert_eq!(slot_to_key(!0), "ffffffffffffffff");
1235 }
1236}