1#![allow(clippy::arithmetic_side_effects)]
2
3use {
4 crate::bigtable::RowKey,
5 agave_reserved_account_keys::ReservedAccountKeys,
6 log::*,
7 serde::{Deserialize, Serialize},
8 solana_clock::{Slot, UnixTimestamp},
9 solana_message::v0::LoadedAddresses,
10 solana_metrics::datapoint_info,
11 solana_pubkey::Pubkey,
12 solana_serde::default_on_eof,
13 solana_signature::Signature,
14 solana_storage_proto::convert::{entries, generated, tx_by_addr},
15 solana_time_utils::AtomicInterval,
16 solana_transaction::versioned::VersionedTransaction,
17 solana_transaction_error::TransactionError,
18 solana_transaction_status::{
19 extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
20 ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo,
21 TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
22 TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
23 VersionedTransactionWithStatusMeta,
24 },
25 std::{
26 collections::{HashMap, HashSet},
27 convert::TryInto,
28 sync::{
29 atomic::{AtomicUsize, Ordering},
30 Arc,
31 },
32 time::Duration,
33 },
34 thiserror::Error,
35 tokio::task::JoinError,
36};
37
38#[macro_use]
39extern crate solana_metrics;
40
41#[macro_use]
42extern crate serde_derive;
43
44mod access_token;
45mod bigtable;
46mod compression;
47mod root_ca_certificate;
48
49#[derive(Debug, Error)]
50pub enum Error {
51 #[error("BigTable: {0}")]
52 BigTableError(bigtable::Error),
53
54 #[error("I/O Error: {0}")]
55 IoError(std::io::Error),
56
57 #[error("Transaction encoded is not supported")]
58 UnsupportedTransactionEncoding,
59
60 #[error("Block not found: {0}")]
61 BlockNotFound(Slot),
62
63 #[error("Signature not found")]
64 SignatureNotFound,
65
66 #[error("tokio error")]
67 TokioJoinError(JoinError),
68}
69
70impl std::convert::From<bigtable::Error> for Error {
71 fn from(err: bigtable::Error) -> Self {
72 Self::BigTableError(err)
73 }
74}
75
76impl std::convert::From<std::io::Error> for Error {
77 fn from(err: std::io::Error) -> Self {
78 Self::IoError(err)
79 }
80}
81
82pub type Result<T> = std::result::Result<T, Error>;
83
84fn slot_to_key(slot: Slot) -> String {
87 format!("{slot:016x}")
88}
89
90fn slot_to_blocks_key(slot: Slot) -> String {
91 slot_to_key(slot)
92}
93
94fn slot_to_entries_key(slot: Slot) -> String {
95 slot_to_key(slot)
96}
97
98fn slot_to_tx_by_addr_key(slot: Slot) -> String {
99 slot_to_key(!slot)
100}
101
102fn key_to_slot(key: &str) -> Option<Slot> {
104 match Slot::from_str_radix(key, 16) {
105 Ok(slot) => Some(slot),
106 Err(err) => {
107 warn!("Failed to parse object key as a slot: {}: {}", key, err);
109 None
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize)]
123struct StoredConfirmedBlock {
124 previous_blockhash: String,
125 blockhash: String,
126 parent_slot: Slot,
127 transactions: Vec<StoredConfirmedBlockTransaction>,
128 rewards: StoredConfirmedBlockRewards,
129 block_time: Option<UnixTimestamp>,
130 #[serde(deserialize_with = "default_on_eof")]
131 block_height: Option<u64>,
132}
133
134#[cfg(test)]
135impl From<ConfirmedBlock> for StoredConfirmedBlock {
136 fn from(confirmed_block: ConfirmedBlock) -> Self {
137 let ConfirmedBlock {
138 previous_blockhash,
139 blockhash,
140 parent_slot,
141 transactions,
142 rewards,
143 num_partitions: _num_partitions,
144 block_time,
145 block_height,
146 } = confirmed_block;
147
148 Self {
149 previous_blockhash,
150 blockhash,
151 parent_slot,
152 transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
153 rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
154 block_time,
155 block_height,
156 }
157 }
158}
159
160impl From<StoredConfirmedBlock> for ConfirmedBlock {
161 fn from(confirmed_block: StoredConfirmedBlock) -> Self {
162 let StoredConfirmedBlock {
163 previous_blockhash,
164 blockhash,
165 parent_slot,
166 transactions,
167 rewards,
168 block_time,
169 block_height,
170 } = confirmed_block;
171
172 Self {
173 previous_blockhash,
174 blockhash,
175 parent_slot,
176 transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
177 rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
178 num_partitions: None,
179 block_time,
180 block_height,
181 }
182 }
183}
184
185#[derive(Serialize, Deserialize)]
186struct StoredConfirmedBlockTransaction {
187 transaction: VersionedTransaction,
188 meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
189}
190
191#[cfg(test)]
192impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
193 fn from(value: TransactionWithStatusMeta) -> Self {
194 match value {
195 TransactionWithStatusMeta::MissingMetadata(transaction) => Self {
196 transaction: VersionedTransaction::from(transaction),
197 meta: None,
198 },
199 TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
200 transaction,
201 meta,
202 }) => Self {
203 transaction,
204 meta: Some(meta.into()),
205 },
206 }
207 }
208}
209
210impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
211 fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self {
212 let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta;
213 match meta {
214 None => Self::MissingMetadata(
215 transaction
216 .into_legacy_transaction()
217 .expect("versioned transactions always have meta"),
218 ),
219 Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta {
220 transaction,
221 meta: meta.into(),
222 }),
223 }
224 }
225}
226
227#[derive(Serialize, Deserialize)]
228struct StoredConfirmedBlockTransactionStatusMeta {
229 err: Option<TransactionError>,
230 fee: u64,
231 pre_balances: Vec<u64>,
232 post_balances: Vec<u64>,
233}
234
235impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
236 fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
237 let StoredConfirmedBlockTransactionStatusMeta {
238 err,
239 fee,
240 pre_balances,
241 post_balances,
242 } = value;
243 let status = match &err {
244 None => Ok(()),
245 Some(err) => Err(err.clone()),
246 };
247 Self {
248 status,
249 fee,
250 pre_balances,
251 post_balances,
252 inner_instructions: None,
253 log_messages: None,
254 pre_token_balances: None,
255 post_token_balances: None,
256 rewards: None,
257 loaded_addresses: LoadedAddresses::default(),
258 return_data: None,
259 compute_units_consumed: None,
260 cost_units: None,
261 }
262 }
263}
264
265impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
266 fn from(value: TransactionStatusMeta) -> Self {
267 let TransactionStatusMeta {
268 status,
269 fee,
270 pre_balances,
271 post_balances,
272 ..
273 } = value;
274 Self {
275 err: status.err(),
276 fee,
277 pre_balances,
278 post_balances,
279 }
280 }
281}
282
283type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
284
285#[derive(Serialize, Deserialize)]
286struct StoredConfirmedBlockReward {
287 pubkey: String,
288 lamports: i64,
289}
290
291impl From<StoredConfirmedBlockReward> for Reward {
292 fn from(value: StoredConfirmedBlockReward) -> Self {
293 let StoredConfirmedBlockReward { pubkey, lamports } = value;
294 Self {
295 pubkey,
296 lamports,
297 post_balance: 0,
298 reward_type: None,
299 commission: None,
300 }
301 }
302}
303
304impl From<Reward> for StoredConfirmedBlockReward {
305 fn from(value: Reward) -> Self {
306 let Reward {
307 pubkey, lamports, ..
308 } = value;
309 Self { pubkey, lamports }
310 }
311}
312
313#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
315struct TransactionInfo {
316 slot: Slot, index: u32, err: Option<TransactionError>, memo: Option<String>, }
321
322#[derive(PartialEq, Eq, Debug)]
324struct UploadedTransaction {
325 slot: Slot, index: u32, err: Option<TransactionError>, }
329
330impl From<TransactionInfo> for UploadedTransaction {
331 fn from(transaction_info: TransactionInfo) -> Self {
332 Self {
333 slot: transaction_info.slot,
334 index: transaction_info.index,
335 err: transaction_info.err,
336 }
337 }
338}
339
340impl From<TransactionInfo> for TransactionStatus {
341 fn from(transaction_info: TransactionInfo) -> Self {
342 let TransactionInfo { slot, err, .. } = transaction_info;
343 let status = match &err {
344 None => Ok(()),
345 Some(err) => Err(err.clone()),
346 };
347 Self {
348 slot,
349 confirmations: None,
350 status,
351 err,
352 confirmation_status: Some(TransactionConfirmationStatus::Finalized),
353 }
354 }
355}
356
357#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
358struct LegacyTransactionByAddrInfo {
359 pub signature: Signature, pub err: Option<TransactionError>, pub index: u32, pub memo: Option<String>, }
364
365impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
366 fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
367 let LegacyTransactionByAddrInfo {
368 signature,
369 err,
370 index,
371 memo,
372 } = legacy;
373
374 Self {
375 signature,
376 err,
377 index,
378 memo,
379 block_time: None,
380 }
381 }
382}
383
384pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
385pub const DEFAULT_APP_PROFILE_ID: &str = "default";
386pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; #[derive(Debug)]
389pub enum CredentialType {
390 Filepath(Option<String>),
391 Stringified(String),
392}
393
394#[derive(Debug)]
395pub struct LedgerStorageConfig {
396 pub read_only: bool,
397 pub timeout: Option<std::time::Duration>,
398 pub credential_type: CredentialType,
399 pub instance_name: String,
400 pub app_profile_id: String,
401 pub max_message_size: usize,
402}
403
404impl Default for LedgerStorageConfig {
405 fn default() -> Self {
406 Self {
407 read_only: true,
408 timeout: None,
409 credential_type: CredentialType::Filepath(None),
410 instance_name: DEFAULT_INSTANCE_NAME.to_string(),
411 app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
412 max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
413 }
414 }
415}
416
417const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
418
419#[derive(Default)]
420struct LedgerStorageStats {
421 num_queries: AtomicUsize,
422 last_report: AtomicInterval,
423}
424
425impl LedgerStorageStats {
426 fn increment_num_queries(&self) {
427 self.num_queries.fetch_add(1, Ordering::Relaxed);
428 self.maybe_report();
429 }
430
431 fn maybe_report(&self) {
432 if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
433 datapoint_debug!(
434 "storage-bigtable-query",
435 (
436 "num_queries",
437 self.num_queries.swap(0, Ordering::Relaxed) as i64,
438 i64
439 )
440 );
441 }
442 }
443}
444
445#[derive(Clone)]
446pub struct LedgerStorage {
447 connection: bigtable::BigTableConnection,
448 stats: Arc<LedgerStorageStats>,
449}
450
451impl LedgerStorage {
452 pub async fn new(
453 read_only: bool,
454 timeout: Option<std::time::Duration>,
455 credential_path: Option<String>,
456 ) -> Result<Self> {
457 Self::new_with_config(LedgerStorageConfig {
458 read_only,
459 timeout,
460 credential_type: CredentialType::Filepath(credential_path),
461 ..LedgerStorageConfig::default()
462 })
463 .await
464 }
465
466 pub fn new_for_emulator(
467 instance_name: &str,
468 app_profile_id: &str,
469 endpoint: &str,
470 timeout: Option<Duration>,
471 ) -> Result<Self> {
472 let stats = Arc::new(LedgerStorageStats::default());
473 Ok(Self {
474 connection: bigtable::BigTableConnection::new_for_emulator(
475 instance_name,
476 app_profile_id,
477 endpoint,
478 timeout,
479 LedgerStorageConfig::default().max_message_size,
480 )?,
481 stats,
482 })
483 }
484
485 pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
486 let stats = Arc::new(LedgerStorageStats::default());
487 let LedgerStorageConfig {
488 read_only,
489 timeout,
490 instance_name,
491 app_profile_id,
492 credential_type,
493 max_message_size,
494 } = config;
495 let connection = bigtable::BigTableConnection::new(
496 instance_name.as_str(),
497 app_profile_id.as_str(),
498 read_only,
499 timeout,
500 credential_type,
501 max_message_size,
502 )
503 .await?;
504 Ok(Self { stats, connection })
505 }
506
507 pub async fn new_with_stringified_credential(credential: String) -> Result<Self> {
508 Self::new_with_config(LedgerStorageConfig {
509 credential_type: CredentialType::Stringified(credential),
510 ..LedgerStorageConfig::default()
511 })
512 .await
513 }
514
515 pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
517 trace!("LedgerStorage::get_first_available_block request received");
518 self.stats.increment_num_queries();
519 let mut bigtable = self.connection.client();
520 let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
521 if blocks.is_empty() {
522 return Ok(None);
523 }
524 Ok(key_to_slot(&blocks[0]))
525 }
526
527 pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
532 trace!(
533 "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}",
534 start_slot,
535 limit
536 );
537 self.stats.increment_num_queries();
538 let mut bigtable = self.connection.client();
539 let blocks = bigtable
540 .get_row_keys(
541 "blocks",
542 Some(slot_to_blocks_key(start_slot)),
543 None,
544 limit as i64,
545 )
546 .await?;
547 Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
548 }
549
550 pub async fn get_confirmed_blocks_with_data<'a>(
552 &self,
553 slots: &'a [Slot],
554 ) -> Result<impl Iterator<Item = (Slot, ConfirmedBlock)> + 'a> {
555 trace!(
556 "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}",
557 slots
558 );
559 self.stats.increment_num_queries();
560 let mut bigtable = self.connection.client();
561 let row_keys = slots.iter().copied().map(slot_to_blocks_key);
562 let data = bigtable
563 .get_protobuf_or_bincode_cells("blocks", row_keys)
564 .await?
565 .filter_map(
566 |(row_key, block_cell_data): (
567 RowKey,
568 bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
569 )| {
570 let block = match block_cell_data {
571 bigtable::CellData::Bincode(block) => block.into(),
572 bigtable::CellData::Protobuf(block) => block.try_into().ok()?,
573 };
574 Some((key_to_slot(&row_key).unwrap(), block))
575 },
576 );
577 Ok(data)
578 }
579
580 pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
582 trace!(
583 "LedgerStorage::get_confirmed_block request received: {:?}",
584 slot
585 );
586 self.stats.increment_num_queries();
587 let mut bigtable = self.connection.client();
588 let block_cell_data = bigtable
589 .get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
590 "blocks",
591 slot_to_blocks_key(slot),
592 )
593 .await
594 .map_err(|err| match err {
595 bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
596 _ => err.into(),
597 })?;
598 Ok(match block_cell_data {
599 bigtable::CellData::Bincode(block) => block.into(),
600 bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
601 bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
602 })?,
603 })
604 }
605
606 pub async fn confirmed_block_exists(&self, slot: Slot) -> Result<bool> {
608 trace!(
609 "LedgerStorage::confirmed_block_exists request received: {:?}",
610 slot
611 );
612 self.stats.increment_num_queries();
613 let mut bigtable = self.connection.client();
614
615 let block_exists = bigtable
616 .row_key_exists("blocks", slot_to_blocks_key(slot))
617 .await?;
618
619 Ok(block_exists)
620 }
621
622 pub async fn get_entries(&self, slot: Slot) -> Result<impl Iterator<Item = EntrySummary>> {
624 trace!(
625 "LedgerStorage::get_block_entries request received: {:?}",
626 slot
627 );
628 self.stats.increment_num_queries();
629 let mut bigtable = self.connection.client();
630 let entry_cell_data = bigtable
631 .get_protobuf_cell::<entries::Entries>("entries", slot_to_entries_key(slot))
632 .await
633 .map_err(|err| match err {
634 bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
635 _ => err.into(),
636 })?;
637 let entries = entry_cell_data.entries.into_iter().map(Into::into);
638 Ok(entries)
639 }
640
641 pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
642 trace!(
643 "LedgerStorage::get_signature_status request received: {:?}",
644 signature
645 );
646 self.stats.increment_num_queries();
647 let mut bigtable = self.connection.client();
648 let transaction_info = bigtable
649 .get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
650 .await
651 .map_err(|err| match err {
652 bigtable::Error::RowNotFound => Error::SignatureNotFound,
653 _ => err.into(),
654 })?;
655 Ok(transaction_info.into())
656 }
657
658 pub async fn get_confirmed_transactions(
660 &self,
661 signatures: &[Signature],
662 ) -> Result<Vec<ConfirmedTransactionWithStatusMeta>> {
663 trace!(
664 "LedgerStorage::get_confirmed_transactions request received: {:?}",
665 signatures
666 );
667 self.stats.increment_num_queries();
668 let mut bigtable = self.connection.client();
669
670 let keys = signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>();
672 let cells = bigtable
673 .get_bincode_cells::<TransactionInfo>("tx", &keys)
674 .await?;
675
676 let mut order: Vec<(Slot, u32, String)> = Vec::new();
678 let mut slots: HashSet<Slot> = HashSet::new();
679 for cell in cells {
680 if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell {
681 order.push((slot, index, signature));
682 slots.insert(slot);
683 }
684 }
685
686 let blocks = self
688 .get_confirmed_blocks_with_data(&slots.into_iter().collect::<Vec<_>>())
689 .await?
690 .collect::<HashMap<_, _>>();
691
692 Ok(order
694 .into_iter()
695 .filter_map(|(slot, index, signature)| {
696 blocks.get(&slot).and_then(|block| {
697 block
698 .transactions
699 .get(index as usize)
700 .and_then(|tx_with_meta| {
701 if tx_with_meta.transaction_signature().to_string() != *signature {
702 warn!(
703 "Transaction info or confirmed block for {} is corrupt",
704 signature
705 );
706 None
707 } else {
708 Some(ConfirmedTransactionWithStatusMeta {
709 slot,
710 tx_with_meta: tx_with_meta.clone(),
711 block_time: block.block_time,
712 })
713 }
714 })
715 })
716 })
717 .collect::<Vec<_>>())
718 }
719
720 pub async fn get_confirmed_transaction(
722 &self,
723 signature: &Signature,
724 ) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
725 trace!(
726 "LedgerStorage::get_confirmed_transaction request received: {:?}",
727 signature
728 );
729 self.stats.increment_num_queries();
730 let mut bigtable = self.connection.client();
731
732 let TransactionInfo { slot, index, .. } = bigtable
734 .get_bincode_cell("tx", signature.to_string())
735 .await
736 .map_err(|err| match err {
737 bigtable::Error::RowNotFound => Error::SignatureNotFound,
738 _ => err.into(),
739 })?;
740
741 let block = self.get_confirmed_block(slot).await?;
743 match block.transactions.into_iter().nth(index as usize) {
744 None => {
745 warn!("Transaction info for {} is corrupt", signature);
747 Ok(None)
748 }
749 Some(tx_with_meta) => {
750 if tx_with_meta.transaction_signature() != signature {
751 warn!(
752 "Transaction info or confirmed block for {} is corrupt",
753 signature
754 );
755 Ok(None)
756 } else {
757 Ok(Some(ConfirmedTransactionWithStatusMeta {
758 slot,
759 tx_with_meta,
760 block_time: block.block_time,
761 }))
762 }
763 }
764 }
765 }
766
767 pub async fn get_confirmed_signatures_for_address(
774 &self,
775 address: &Pubkey,
776 before_signature: Option<&Signature>,
777 until_signature: Option<&Signature>,
778 limit: usize,
779 ) -> Result<
780 Vec<(
781 ConfirmedTransactionStatusWithSignature,
782 u32, )>,
784 > {
785 trace!(
786 "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}",
787 address
788 );
789 self.stats.increment_num_queries();
790 let mut bigtable = self.connection.client();
791 let address_prefix = format!("{address}/");
792
793 let (first_slot, before_transaction_index) = match before_signature {
795 None => (Slot::MAX, 0),
796 Some(before_signature) => {
797 let TransactionInfo { slot, index, .. } = bigtable
798 .get_bincode_cell("tx", before_signature.to_string())
799 .await
800 .map_err(|err| match err {
801 bigtable::Error::RowNotFound => Error::SignatureNotFound,
802 _ => err.into(),
803 })?;
804
805 (slot, index)
806 }
807 };
808
809 let (last_slot, until_transaction_index) = match until_signature {
811 None => (0, u32::MAX),
812 Some(until_signature) => {
813 let TransactionInfo { slot, index, .. } = bigtable
814 .get_bincode_cell("tx", until_signature.to_string())
815 .await
816 .map_err(|err| match err {
817 bigtable::Error::RowNotFound => Error::SignatureNotFound,
818 _ => err.into(),
819 })?;
820
821 (slot, index)
822 }
823 };
824
825 let mut infos = vec![];
826
827 let starting_slot_tx_len = bigtable
828 .get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
829 "tx-by-addr",
830 format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
831 )
832 .await
833 .map(|cell_data| {
834 match cell_data {
835 bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
836 bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
837 }
838 })
839 .unwrap_or(0);
840
841 let tx_by_addr_data = bigtable
844 .get_row_data(
845 "tx-by-addr",
846 Some(format!(
847 "{}{}",
848 address_prefix,
849 slot_to_tx_by_addr_key(first_slot),
850 )),
851 Some(format!(
852 "{}{}",
853 address_prefix,
854 slot_to_tx_by_addr_key(last_slot),
855 )),
856 limit as i64 + starting_slot_tx_len as i64,
857 )
858 .await?;
859
860 'outer: for (row_key, data) in tx_by_addr_data {
861 let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
862 bigtable::Error::ObjectCorrupt(format!(
863 "Failed to convert key to slot: tx-by-addr/{row_key}"
864 ))
865 })?;
866
867 let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
868 Vec<LegacyTransactionByAddrInfo>,
869 tx_by_addr::TransactionByAddr,
870 >(&data, "tx-by-addr", row_key.clone())?;
871
872 let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
873 bigtable::CellData::Bincode(tx_by_addr) => {
874 tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
875 }
876 bigtable::CellData::Protobuf(tx_by_addr) => {
877 tx_by_addr.try_into().map_err(|error| {
878 bigtable::Error::ObjectCorrupt(format!(
879 "Failed to deserialize: {}: tx-by-addr/{}",
880 error,
881 row_key.clone()
882 ))
883 })?
884 }
885 };
886
887 cell_data.reverse();
888 for tx_by_addr_info in cell_data.into_iter() {
889 if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
891 continue;
892 }
893 if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
895 continue;
896 }
897 infos.push((
898 ConfirmedTransactionStatusWithSignature {
899 signature: tx_by_addr_info.signature,
900 slot,
901 err: tx_by_addr_info.err,
902 memo: tx_by_addr_info.memo,
903 block_time: tx_by_addr_info.block_time,
904 },
905 tx_by_addr_info.index,
906 ));
907 if infos.len() >= limit {
909 break 'outer;
910 }
911 }
912 }
913 Ok(infos)
914 }
915
916 pub async fn upload_confirmed_block(
918 &self,
919 slot: Slot,
920 confirmed_block: VersionedConfirmedBlock,
921 ) -> Result<()> {
922 trace!(
923 "LedgerStorage::upload_confirmed_block request received: {:?}",
924 slot
925 );
926 self.upload_confirmed_block_with_entries(
927 slot,
928 VersionedConfirmedBlockWithEntries {
929 block: confirmed_block,
930 entries: vec![],
931 },
932 )
933 .await
934 }
935
936 pub async fn upload_confirmed_block_with_entries(
937 &self,
938 slot: Slot,
939 confirmed_block: VersionedConfirmedBlockWithEntries,
940 ) -> Result<()> {
941 trace!(
942 "LedgerStorage::upload_confirmed_block_with_entries request received: {:?}",
943 slot
944 );
945 let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
946 let VersionedConfirmedBlockWithEntries {
947 block: confirmed_block,
948 entries,
949 } = confirmed_block;
950
951 let reserved_account_keys = ReservedAccountKeys::new_all_activated();
952 let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
953 for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
954 let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
955 let err = meta.status.clone().err();
956 let index = index as u32;
957 let signature = transaction.signatures[0];
958 let memo = extract_and_fmt_memos(transaction_with_meta);
959
960 for address in transaction_with_meta.account_keys().iter() {
961 if !reserved_account_keys.is_reserved(address) {
966 by_addr
967 .entry(address)
968 .or_default()
969 .push(TransactionByAddrInfo {
970 signature,
971 err: err.clone(),
972 index,
973 memo: memo.clone(),
974 block_time: confirmed_block.block_time,
975 });
976 }
977 }
978
979 tx_cells.push((
980 signature.to_string(),
981 TransactionInfo {
982 slot,
983 index,
984 err,
985 memo,
986 },
987 ));
988 }
989
990 let tx_by_addr_cells: Vec<_> = by_addr
991 .into_iter()
992 .map(|(address, transaction_info_by_addr)| {
993 (
994 format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
995 tx_by_addr::TransactionByAddr {
996 tx_by_addrs: transaction_info_by_addr
997 .into_iter()
998 .map(|by_addr| by_addr.into())
999 .collect(),
1000 },
1001 )
1002 })
1003 .collect();
1004
1005 let num_entries = entries.len();
1006 let entry_cell = (
1007 slot_to_entries_key(slot),
1008 entries::Entries {
1009 entries: entries.into_iter().enumerate().map(Into::into).collect(),
1010 },
1011 );
1012
1013 let mut tasks = vec![];
1014
1015 if !tx_cells.is_empty() {
1016 let conn = self.connection.clone();
1017 tasks.push(tokio::spawn(async move {
1018 conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
1019 .await
1020 }));
1021 }
1022
1023 if !tx_by_addr_cells.is_empty() {
1024 let conn = self.connection.clone();
1025 tasks.push(tokio::spawn(async move {
1026 conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
1027 "tx-by-addr",
1028 &tx_by_addr_cells,
1029 )
1030 .await
1031 }));
1032 }
1033
1034 if num_entries > 0 {
1035 let conn = self.connection.clone();
1036 tasks.push(tokio::spawn(async move {
1037 conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
1038 .await
1039 }));
1040 }
1041
1042 let mut bytes_written = 0;
1043 let mut maybe_first_err: Option<Error> = None;
1044
1045 let results = futures::future::join_all(tasks).await;
1046 for result in results {
1047 match result {
1048 Err(err) => {
1049 if maybe_first_err.is_none() {
1050 maybe_first_err = Some(Error::TokioJoinError(err));
1051 }
1052 }
1053 Ok(Err(err)) => {
1054 if maybe_first_err.is_none() {
1055 maybe_first_err = Some(Error::BigTableError(err));
1056 }
1057 }
1058 Ok(Ok(bytes)) => {
1059 bytes_written += bytes;
1060 }
1061 }
1062 }
1063
1064 if let Some(err) = maybe_first_err {
1065 return Err(err);
1066 }
1067
1068 let num_transactions = confirmed_block.transactions.len();
1069
1070 let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
1074 bytes_written += self
1075 .connection
1076 .put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
1077 .await?;
1078 datapoint_info!(
1079 "storage-bigtable-upload-block",
1080 ("slot", slot, i64),
1081 ("transactions", num_transactions, i64),
1082 ("entries", num_entries, i64),
1083 ("bytes", bytes_written, i64),
1084 );
1085 Ok(())
1086 }
1087
1088 pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
1090 let mut addresses: HashSet<&Pubkey> = HashSet::new();
1091 let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
1092 let confirmed_block = self.get_confirmed_block(slot).await?;
1093 for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
1094 match transaction_with_meta {
1095 TransactionWithStatusMeta::MissingMetadata(transaction) => {
1096 let signature = transaction.signatures[0];
1097 let index = index as u32;
1098 let err = None;
1099
1100 for address in transaction.message.account_keys.iter() {
1101 addresses.insert(address);
1108 }
1109
1110 expected_tx_infos.insert(
1111 signature.to_string(),
1112 UploadedTransaction { slot, index, err },
1113 );
1114 }
1115 TransactionWithStatusMeta::Complete(tx_with_meta) => {
1116 let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta;
1117 let signature = transaction.signatures[0];
1118 let index = index as u32;
1119 let err = meta.status.clone().err();
1120
1121 for address in tx_with_meta.account_keys().iter() {
1122 addresses.insert(address);
1129 }
1130
1131 expected_tx_infos.insert(
1132 signature.to_string(),
1133 UploadedTransaction { slot, index, err },
1134 );
1135 }
1136 }
1137 }
1138
1139 let address_slot_rows: Vec<_> = addresses
1140 .into_iter()
1141 .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
1142 .collect();
1143
1144 let tx_deletion_rows = if !expected_tx_infos.is_empty() {
1145 let signatures = expected_tx_infos.keys().cloned().collect::<Vec<_>>();
1146 let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
1147 self.connection
1148 .get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
1149 .await?
1150 .into_iter()
1151 .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
1152 .collect::<HashMap<_, _>>();
1153
1154 let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
1155 for (signature, expected_tx_info) in expected_tx_infos {
1156 match fetched_tx_infos.get(&signature) {
1157 Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
1158 deletion_rows.push(signature);
1159 }
1160 Some(Ok(fetched_tx_info)) => {
1161 warn!(
1162 "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}",
1163 signature,
1164 fetched_tx_info,
1165 &expected_tx_info,
1166 );
1167 }
1168 Some(Err(err)) => {
1169 warn!(
1170 "skipped tx row {} because the bigtable entry was corrupted: {:?}",
1171 signature, err
1172 );
1173 }
1174 None => {
1175 warn!("skipped tx row {} because it was not found", signature);
1176 }
1177 }
1178 }
1179 deletion_rows
1180 } else {
1181 vec![]
1182 };
1183
1184 let entries_exist = self
1185 .connection
1186 .client()
1187 .row_key_exists("entries", slot_to_entries_key(slot))
1188 .await
1189 .is_ok_and(|x| x);
1190
1191 if !dry_run {
1192 if !address_slot_rows.is_empty() {
1193 self.connection
1194 .delete_rows_with_retry("tx-by-addr", &address_slot_rows)
1195 .await?;
1196 }
1197
1198 if !tx_deletion_rows.is_empty() {
1199 self.connection
1200 .delete_rows_with_retry("tx", &tx_deletion_rows)
1201 .await?;
1202 }
1203
1204 if entries_exist {
1205 self.connection
1206 .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)])
1207 .await?;
1208 }
1209
1210 self.connection
1211 .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
1212 .await?;
1213 }
1214
1215 info!(
1216 "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} entry row",
1217 if dry_run { "[dry run] " } else { "" },
1218 slot,
1219 tx_deletion_rows.len(),
1220 address_slot_rows.len(),
1221 if entries_exist { "with" } else {"WITHOUT"}
1222 );
1223
1224 Ok(())
1225 }
1226}
1227
1228#[cfg(test)]
1229mod test {
1230 use super::*;
1231
1232 #[test]
1233 fn test_slot_to_key() {
1234 assert_eq!(slot_to_key(0), "0000000000000000");
1235 assert_eq!(slot_to_key(!0), "ffffffffffffffff");
1236 }
1237}