Skip to main content

solana_storage_bigtable/
lib.rs

1#![cfg_attr(
2    not(feature = "agave-unstable-api"),
3    deprecated(
4        since = "3.1.0",
5        note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6                v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7                acknowledge use of an interface that may break without warning."
8    )
9)]
10#![allow(clippy::arithmetic_side_effects)]
11
12use {
13    crate::bigtable::RowKey,
14    agave_reserved_account_keys::ReservedAccountKeys,
15    log::*,
16    serde::{Deserialize, Serialize},
17    solana_clock::{Slot, UnixTimestamp},
18    solana_message::v0::LoadedAddresses,
19    solana_metrics::datapoint_info,
20    solana_pubkey::Pubkey,
21    solana_serde::default_on_eof,
22    solana_signature::Signature,
23    solana_storage_proto::convert::{entries, generated, tx_by_addr},
24    solana_time_utils::AtomicInterval,
25    solana_transaction::versioned::VersionedTransaction,
26    solana_transaction_error::TransactionError,
27    solana_transaction_status::{
28        extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
29        ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo,
30        TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
31        TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
32        VersionedTransactionWithStatusMeta,
33    },
34    std::{
35        collections::{HashMap, HashSet},
36        convert::TryInto,
37        sync::{
38            atomic::{AtomicUsize, Ordering},
39            Arc,
40        },
41        time::Duration,
42    },
43    thiserror::Error,
44    tokio::task::JoinError,
45};
46
47#[macro_use]
48extern crate solana_metrics;
49
50mod access_token;
51mod bigtable;
52mod compression;
53mod root_ca_certificate;
54
55#[derive(Debug, Error)]
56pub enum Error {
57    #[error("BigTable: {0}")]
58    BigTableError(bigtable::Error),
59
60    #[error("I/O Error: {0}")]
61    IoError(std::io::Error),
62
63    #[error("Transaction encoded is not supported")]
64    UnsupportedTransactionEncoding,
65
66    #[error("Block not found: {0}")]
67    BlockNotFound(Slot),
68
69    #[error("Signature not found")]
70    SignatureNotFound,
71
72    #[error("tokio error")]
73    TokioJoinError(JoinError),
74}
75
76impl std::convert::From<bigtable::Error> for Error {
77    fn from(err: bigtable::Error) -> Self {
78        Self::BigTableError(err)
79    }
80}
81
82impl std::convert::From<std::io::Error> for Error {
83    fn from(err: std::io::Error) -> Self {
84        Self::IoError(err)
85    }
86}
87
88pub type Result<T> = std::result::Result<T, Error>;
89
90// Convert a slot to its bucket representation whereby lower slots are always lexically ordered
91// before higher slots
92fn slot_to_key(slot: Slot) -> String {
93    format!("{slot:016x}")
94}
95
96fn slot_to_blocks_key(slot: Slot) -> String {
97    slot_to_key(slot)
98}
99
100fn slot_to_entries_key(slot: Slot) -> String {
101    slot_to_key(slot)
102}
103
104fn slot_to_tx_by_addr_key(slot: Slot) -> String {
105    slot_to_key(!slot)
106}
107
108// Reverse of `slot_to_key`
109fn key_to_slot(key: &str) -> Option<Slot> {
110    match Slot::from_str_radix(key, 16) {
111        Ok(slot) => Some(slot),
112        Err(err) => {
113            // bucket data is probably corrupt
114            warn!("Failed to parse object key as a slot: {key}: {err}");
115            None
116        }
117    }
118}
119
120// A serialized `StoredConfirmedBlock` is stored in the `block` table
121//
122// StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids
123// some serde JSON directives that cause issues with bincode
124//
125// Note: in order to continue to support old bincode-serialized bigtable entries, if new fields are
126// added to ConfirmedBlock, they must either be excluded or set to `default_on_eof` here
127//
128#[derive(Serialize, Deserialize)]
129struct StoredConfirmedBlock {
130    previous_blockhash: String,
131    blockhash: String,
132    parent_slot: Slot,
133    transactions: Vec<StoredConfirmedBlockTransaction>,
134    rewards: StoredConfirmedBlockRewards,
135    block_time: Option<UnixTimestamp>,
136    #[serde(deserialize_with = "default_on_eof")]
137    block_height: Option<u64>,
138}
139
140#[cfg(test)]
141impl From<ConfirmedBlock> for StoredConfirmedBlock {
142    fn from(confirmed_block: ConfirmedBlock) -> Self {
143        let ConfirmedBlock {
144            previous_blockhash,
145            blockhash,
146            parent_slot,
147            transactions,
148            rewards,
149            num_partitions: _num_partitions,
150            block_time,
151            block_height,
152        } = confirmed_block;
153
154        Self {
155            previous_blockhash,
156            blockhash,
157            parent_slot,
158            transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
159            rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
160            block_time,
161            block_height,
162        }
163    }
164}
165
166impl From<StoredConfirmedBlock> for ConfirmedBlock {
167    fn from(confirmed_block: StoredConfirmedBlock) -> Self {
168        let StoredConfirmedBlock {
169            previous_blockhash,
170            blockhash,
171            parent_slot,
172            transactions,
173            rewards,
174            block_time,
175            block_height,
176        } = confirmed_block;
177
178        Self {
179            previous_blockhash,
180            blockhash,
181            parent_slot,
182            transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
183            rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
184            num_partitions: None,
185            block_time,
186            block_height,
187        }
188    }
189}
190
191#[derive(Serialize, Deserialize)]
192struct StoredConfirmedBlockTransaction {
193    transaction: VersionedTransaction,
194    meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
195}
196
197#[cfg(test)]
198impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
199    fn from(value: TransactionWithStatusMeta) -> Self {
200        match value {
201            TransactionWithStatusMeta::MissingMetadata(transaction) => Self {
202                transaction: VersionedTransaction::from(transaction),
203                meta: None,
204            },
205            TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
206                transaction,
207                meta,
208            }) => Self {
209                transaction,
210                meta: Some(meta.into()),
211            },
212        }
213    }
214}
215
216impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
217    fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self {
218        let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta;
219        match meta {
220            None => Self::MissingMetadata(
221                transaction
222                    .into_legacy_transaction()
223                    .expect("versioned transactions always have meta"),
224            ),
225            Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta {
226                transaction,
227                meta: meta.into(),
228            }),
229        }
230    }
231}
232
233#[derive(Serialize, Deserialize)]
234struct StoredConfirmedBlockTransactionStatusMeta {
235    err: Option<TransactionError>,
236    fee: u64,
237    pre_balances: Vec<u64>,
238    post_balances: Vec<u64>,
239}
240
241impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
242    fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
243        let StoredConfirmedBlockTransactionStatusMeta {
244            err,
245            fee,
246            pre_balances,
247            post_balances,
248        } = value;
249        let status = match &err {
250            None => Ok(()),
251            Some(err) => Err(err.clone()),
252        };
253        Self {
254            status,
255            fee,
256            pre_balances,
257            post_balances,
258            inner_instructions: None,
259            log_messages: None,
260            pre_token_balances: None,
261            post_token_balances: None,
262            rewards: None,
263            loaded_addresses: LoadedAddresses::default(),
264            return_data: None,
265            compute_units_consumed: None,
266            cost_units: None,
267        }
268    }
269}
270
271impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
272    fn from(value: TransactionStatusMeta) -> Self {
273        let TransactionStatusMeta {
274            status,
275            fee,
276            pre_balances,
277            post_balances,
278            ..
279        } = value;
280        Self {
281            err: status.err(),
282            fee,
283            pre_balances,
284            post_balances,
285        }
286    }
287}
288
289type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
290
291#[derive(Serialize, Deserialize)]
292struct StoredConfirmedBlockReward {
293    pubkey: String,
294    lamports: i64,
295}
296
297impl From<StoredConfirmedBlockReward> for Reward {
298    fn from(value: StoredConfirmedBlockReward) -> Self {
299        let StoredConfirmedBlockReward { pubkey, lamports } = value;
300        Self {
301            pubkey,
302            lamports,
303            post_balance: 0,
304            reward_type: None,
305            commission: None,
306        }
307    }
308}
309
310impl From<Reward> for StoredConfirmedBlockReward {
311    fn from(value: Reward) -> Self {
312        let Reward {
313            pubkey, lamports, ..
314        } = value;
315        Self { pubkey, lamports }
316    }
317}
318
319// A serialized `TransactionInfo` is stored in the `tx` table
320#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
321struct TransactionInfo {
322    slot: Slot, // The slot that contains the block with this transaction in it
323    index: u32, // Where the transaction is located in the block
324    err: Option<TransactionError>, // None if the transaction executed successfully
325    memo: Option<String>, // Transaction memo
326}
327
328// Part of a serialized `TransactionInfo` which is stored in the `tx` table
329#[derive(PartialEq, Eq, Debug)]
330struct UploadedTransaction {
331    slot: Slot, // The slot that contains the block with this transaction in it
332    index: u32, // Where the transaction is located in the block
333    err: Option<TransactionError>, // None if the transaction executed successfully
334}
335
336impl From<TransactionInfo> for UploadedTransaction {
337    fn from(transaction_info: TransactionInfo) -> Self {
338        Self {
339            slot: transaction_info.slot,
340            index: transaction_info.index,
341            err: transaction_info.err,
342        }
343    }
344}
345
346impl From<TransactionInfo> for TransactionStatus {
347    fn from(transaction_info: TransactionInfo) -> Self {
348        let TransactionInfo { slot, err, .. } = transaction_info;
349        let status = match &err {
350            None => Ok(()),
351            Some(err) => Err(err.clone()),
352        };
353        Self {
354            slot,
355            confirmations: None,
356            status,
357            err,
358            confirmation_status: Some(TransactionConfirmationStatus::Finalized),
359        }
360    }
361}
362
363#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
364struct LegacyTransactionByAddrInfo {
365    pub signature: Signature,          // The transaction signature
366    pub err: Option<TransactionError>, // None if the transaction executed successfully
367    pub index: u32,                    // Where the transaction is located in the block
368    pub memo: Option<String>,          // Transaction memo
369}
370
371impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
372    fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
373        let LegacyTransactionByAddrInfo {
374            signature,
375            err,
376            index,
377            memo,
378        } = legacy;
379
380        Self {
381            signature,
382            err,
383            index,
384            memo,
385            block_time: None,
386        }
387    }
388}
389
390pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
391pub const DEFAULT_APP_PROFILE_ID: &str = "default";
392pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB
393
394#[derive(Debug)]
395pub enum CredentialType {
396    Filepath(Option<String>),
397    Stringified(String),
398}
399
400#[derive(Debug)]
401pub struct LedgerStorageConfig {
402    pub read_only: bool,
403    pub timeout: Option<std::time::Duration>,
404    pub credential_type: CredentialType,
405    pub instance_name: String,
406    pub app_profile_id: String,
407    pub max_message_size: usize,
408}
409
410impl Default for LedgerStorageConfig {
411    fn default() -> Self {
412        Self {
413            read_only: true,
414            timeout: None,
415            credential_type: CredentialType::Filepath(None),
416            instance_name: DEFAULT_INSTANCE_NAME.to_string(),
417            app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
418            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
419        }
420    }
421}
422
423const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
424
425#[derive(Default)]
426struct LedgerStorageStats {
427    num_queries: AtomicUsize,
428    last_report: AtomicInterval,
429}
430
431impl LedgerStorageStats {
432    fn increment_num_queries(&self) {
433        self.num_queries.fetch_add(1, Ordering::Relaxed);
434        self.maybe_report();
435    }
436
437    fn maybe_report(&self) {
438        if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
439            datapoint_debug!(
440                "storage-bigtable-query",
441                (
442                    "num_queries",
443                    self.num_queries.swap(0, Ordering::Relaxed) as i64,
444                    i64
445                )
446            );
447        }
448    }
449}
450
451#[derive(Clone)]
452pub struct LedgerStorage {
453    connection: bigtable::BigTableConnection,
454    stats: Arc<LedgerStorageStats>,
455}
456
457impl LedgerStorage {
458    pub async fn new(
459        read_only: bool,
460        timeout: Option<std::time::Duration>,
461        credential_path: Option<String>,
462    ) -> Result<Self> {
463        Self::new_with_config(LedgerStorageConfig {
464            read_only,
465            timeout,
466            credential_type: CredentialType::Filepath(credential_path),
467            ..LedgerStorageConfig::default()
468        })
469        .await
470    }
471
472    pub fn new_for_emulator(
473        instance_name: &str,
474        app_profile_id: &str,
475        endpoint: &str,
476        timeout: Option<Duration>,
477    ) -> Result<Self> {
478        let stats = Arc::new(LedgerStorageStats::default());
479        Ok(Self {
480            connection: bigtable::BigTableConnection::new_for_emulator(
481                instance_name,
482                app_profile_id,
483                endpoint,
484                timeout,
485                LedgerStorageConfig::default().max_message_size,
486            )?,
487            stats,
488        })
489    }
490
491    pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
492        let stats = Arc::new(LedgerStorageStats::default());
493        let LedgerStorageConfig {
494            read_only,
495            timeout,
496            instance_name,
497            app_profile_id,
498            credential_type,
499            max_message_size,
500        } = config;
501        let connection = bigtable::BigTableConnection::new(
502            instance_name.as_str(),
503            app_profile_id.as_str(),
504            read_only,
505            timeout,
506            credential_type,
507            max_message_size,
508        )
509        .await?;
510        Ok(Self { stats, connection })
511    }
512
513    pub async fn new_with_stringified_credential(credential: String) -> Result<Self> {
514        Self::new_with_config(LedgerStorageConfig {
515            credential_type: CredentialType::Stringified(credential),
516            ..LedgerStorageConfig::default()
517        })
518        .await
519    }
520
521    /// Return the available slot that contains a block
522    pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
523        trace!("LedgerStorage::get_first_available_block request received");
524        self.stats.increment_num_queries();
525        let mut bigtable = self.connection.client();
526        let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
527        if blocks.is_empty() {
528            return Ok(None);
529        }
530        Ok(key_to_slot(&blocks[0]))
531    }
532
533    /// Fetch the next slots after the provided slot that contains a block
534    ///
535    /// start_slot: slot to start the search from (inclusive)
536    /// limit: stop after this many slots have been found
537    pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
538        trace!("LedgerStorage::get_confirmed_blocks request received: {start_slot:?} {limit:?}");
539        self.stats.increment_num_queries();
540        let mut bigtable = self.connection.client();
541        let blocks = bigtable
542            .get_row_keys(
543                "blocks",
544                Some(slot_to_blocks_key(start_slot)),
545                None,
546                limit as i64,
547            )
548            .await?;
549        Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
550    }
551
552    // Fetches and gets a vector of confirmed blocks via a multirow fetch
553    pub async fn get_confirmed_blocks_with_data<'a>(
554        &self,
555        slots: &'a [Slot],
556    ) -> Result<impl Iterator<Item = (Slot, ConfirmedBlock)> + 'a> {
557        trace!("LedgerStorage::get_confirmed_blocks_with_data request received: {slots:?}");
558        self.stats.increment_num_queries();
559        let mut bigtable = self.connection.client();
560        let row_keys = slots.iter().copied().map(slot_to_blocks_key);
561        let data = bigtable
562            .get_protobuf_or_bincode_cells("blocks", row_keys)
563            .await?
564            .filter_map(
565                |(row_key, block_cell_data): (
566                    RowKey,
567                    bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
568                )| {
569                    let block = match block_cell_data {
570                        bigtable::CellData::Bincode(block) => block.into(),
571                        bigtable::CellData::Protobuf(block) => block.try_into().ok()?,
572                    };
573                    Some((key_to_slot(&row_key).unwrap(), block))
574                },
575            );
576        Ok(data)
577    }
578
579    /// Fetch the confirmed block from the desired slot
580    pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
581        trace!("LedgerStorage::get_confirmed_block request received: {slot:?}");
582        self.stats.increment_num_queries();
583        let mut bigtable = self.connection.client();
584        let block_cell_data = bigtable
585            .get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
586                "blocks",
587                slot_to_blocks_key(slot),
588            )
589            .await
590            .map_err(|err| match err {
591                bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
592                _ => err.into(),
593            })?;
594        Ok(match block_cell_data {
595            bigtable::CellData::Bincode(block) => block.into(),
596            bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
597                bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
598            })?,
599        })
600    }
601
602    /// Does the confirmed block exist in the Bigtable
603    pub async fn confirmed_block_exists(&self, slot: Slot) -> Result<bool> {
604        trace!("LedgerStorage::confirmed_block_exists request received: {slot:?}");
605        self.stats.increment_num_queries();
606        let mut bigtable = self.connection.client();
607
608        let block_exists = bigtable
609            .row_key_exists("blocks", slot_to_blocks_key(slot))
610            .await?;
611
612        Ok(block_exists)
613    }
614
615    /// Fetches a vector of block entries via a multirow fetch
616    pub async fn get_entries(&self, slot: Slot) -> Result<impl Iterator<Item = EntrySummary>> {
617        trace!("LedgerStorage::get_block_entries request received: {slot:?}");
618        self.stats.increment_num_queries();
619        let mut bigtable = self.connection.client();
620        let entry_cell_data = bigtable
621            .get_protobuf_cell::<entries::Entries>("entries", slot_to_entries_key(slot))
622            .await
623            .map_err(|err| match err {
624                bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
625                _ => err.into(),
626            })?;
627        let entries = entry_cell_data.entries.into_iter().map(Into::into);
628        Ok(entries)
629    }
630
631    pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
632        trace!("LedgerStorage::get_signature_status request received: {signature:?}");
633        self.stats.increment_num_queries();
634        let mut bigtable = self.connection.client();
635        let transaction_info = bigtable
636            .get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
637            .await
638            .map_err(|err| match err {
639                bigtable::Error::RowNotFound => Error::SignatureNotFound,
640                _ => err.into(),
641            })?;
642        Ok(transaction_info.into())
643    }
644
645    // Fetches and gets a vector of confirmed transactions via a multirow fetch
646    pub async fn get_confirmed_transactions(
647        &self,
648        signatures: &[Signature],
649    ) -> Result<Vec<ConfirmedTransactionWithStatusMeta>> {
650        trace!("LedgerStorage::get_confirmed_transactions request received: {signatures:?}");
651        self.stats.increment_num_queries();
652        let mut bigtable = self.connection.client();
653
654        // Fetch transactions info
655        let keys = signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>();
656        let cells = bigtable
657            .get_bincode_cells::<TransactionInfo>("tx", &keys)
658            .await?;
659
660        // Collect by slot
661        let mut order: Vec<(Slot, u32, String)> = Vec::new();
662        let mut slots: HashSet<Slot> = HashSet::new();
663        for cell in cells {
664            if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell {
665                order.push((slot, index, signature));
666                slots.insert(slot);
667            }
668        }
669
670        // Fetch blocks
671        let blocks = self
672            .get_confirmed_blocks_with_data(&slots.into_iter().collect::<Vec<_>>())
673            .await?
674            .collect::<HashMap<_, _>>();
675
676        // Extract transactions
677        Ok(order
678            .into_iter()
679            .filter_map(|(slot, index, signature)| {
680                blocks.get(&slot).and_then(|block| {
681                    block
682                        .transactions
683                        .get(index as usize)
684                        .and_then(|tx_with_meta| {
685                            if tx_with_meta.transaction_signature().to_string() != *signature {
686                                warn!(
687                                    "Transaction info or confirmed block for {signature} is \
688                                     corrupt"
689                                );
690                                None
691                            } else {
692                                Some(ConfirmedTransactionWithStatusMeta {
693                                    slot,
694                                    tx_with_meta: tx_with_meta.clone(),
695                                    block_time: block.block_time,
696                                })
697                            }
698                        })
699                })
700            })
701            .collect::<Vec<_>>())
702    }
703
704    /// Fetch a confirmed transaction
705    pub async fn get_confirmed_transaction(
706        &self,
707        signature: &Signature,
708    ) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
709        trace!("LedgerStorage::get_confirmed_transaction request received: {signature:?}");
710        self.stats.increment_num_queries();
711        let mut bigtable = self.connection.client();
712
713        // Figure out which block the transaction is located in
714        let TransactionInfo { slot, index, .. } = bigtable
715            .get_bincode_cell("tx", signature.to_string())
716            .await
717            .map_err(|err| match err {
718                bigtable::Error::RowNotFound => Error::SignatureNotFound,
719                _ => err.into(),
720            })?;
721
722        // Load the block and return the transaction
723        let block = self.get_confirmed_block(slot).await?;
724        match block.transactions.into_iter().nth(index as usize) {
725            None => {
726                // report this somewhere actionable?
727                warn!("Transaction info for {signature} is corrupt");
728                Ok(None)
729            }
730            Some(tx_with_meta) => {
731                if tx_with_meta.transaction_signature() != signature {
732                    warn!("Transaction info or confirmed block for {signature} is corrupt");
733                    Ok(None)
734                } else {
735                    Ok(Some(ConfirmedTransactionWithStatusMeta {
736                        slot,
737                        tx_with_meta,
738                        block_time: block.block_time,
739                    }))
740                }
741            }
742        }
743    }
744
745    /// Get confirmed signatures for the provided address, in descending ledger order
746    ///
747    /// address: address to search for
748    /// before_signature: start with the first signature older than this one
749    /// until_signature: end with the last signature more recent than this one
750    /// limit: stop after this many signatures; if limit==0, all records in the table will be read
751    pub async fn get_confirmed_signatures_for_address(
752        &self,
753        address: &Pubkey,
754        before_signature: Option<&Signature>,
755        until_signature: Option<&Signature>,
756        limit: usize,
757    ) -> Result<
758        Vec<(
759            ConfirmedTransactionStatusWithSignature,
760            u32, /*slot index*/
761        )>,
762    > {
763        trace!("LedgerStorage::get_confirmed_signatures_for_address request received: {address:?}");
764        self.stats.increment_num_queries();
765        let mut bigtable = self.connection.client();
766        let address_prefix = format!("{address}/");
767
768        // Figure out where to start listing from based on `before_signature`
769        let (first_slot, before_transaction_index) = match before_signature {
770            None => (Slot::MAX, 0),
771            Some(before_signature) => {
772                let TransactionInfo { slot, index, .. } = bigtable
773                    .get_bincode_cell("tx", before_signature.to_string())
774                    .await
775                    .map_err(|err| match err {
776                        bigtable::Error::RowNotFound => Error::SignatureNotFound,
777                        _ => err.into(),
778                    })?;
779
780                (slot, index)
781            }
782        };
783
784        // Figure out where to end listing from based on `until_signature`
785        let (last_slot, until_transaction_index) = match until_signature {
786            None => (0, u32::MAX),
787            Some(until_signature) => {
788                let TransactionInfo { slot, index, .. } = bigtable
789                    .get_bincode_cell("tx", until_signature.to_string())
790                    .await
791                    .map_err(|err| match err {
792                        bigtable::Error::RowNotFound => Error::SignatureNotFound,
793                        _ => err.into(),
794                    })?;
795
796                (slot, index)
797            }
798        };
799
800        let mut infos = vec![];
801
802        let starting_slot_tx_len = bigtable
803            .get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
804                "tx-by-addr",
805                format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
806            )
807            .await
808            .map(|cell_data| {
809                match cell_data {
810                    bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
811                    bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
812                }
813            })
814            .unwrap_or(0);
815
816        // Return the next tx-by-addr data of amount `limit` plus extra to account for the largest
817        // number that might be filtered out
818        let tx_by_addr_data = bigtable
819            .get_row_data(
820                "tx-by-addr",
821                Some(format!(
822                    "{}{}",
823                    address_prefix,
824                    slot_to_tx_by_addr_key(first_slot),
825                )),
826                Some(format!(
827                    "{}{}",
828                    address_prefix,
829                    slot_to_tx_by_addr_key(last_slot),
830                )),
831                limit as i64 + starting_slot_tx_len as i64,
832            )
833            .await?;
834
835        'outer: for (row_key, data) in tx_by_addr_data {
836            let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
837                bigtable::Error::ObjectCorrupt(format!(
838                    "Failed to convert key to slot: tx-by-addr/{row_key}"
839                ))
840            })?;
841
842            let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
843                Vec<LegacyTransactionByAddrInfo>,
844                tx_by_addr::TransactionByAddr,
845            >(&data, "tx-by-addr", row_key.clone())?;
846
847            let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
848                bigtable::CellData::Bincode(tx_by_addr) => {
849                    tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
850                }
851                bigtable::CellData::Protobuf(tx_by_addr) => {
852                    tx_by_addr.try_into().map_err(|error| {
853                        bigtable::Error::ObjectCorrupt(format!(
854                            "Failed to deserialize: {}: tx-by-addr/{}",
855                            error,
856                            row_key.clone()
857                        ))
858                    })?
859                }
860            };
861
862            cell_data.reverse();
863            for tx_by_addr_info in cell_data.into_iter() {
864                // Filter out records before `before_transaction_index`
865                if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
866                    continue;
867                }
868                // Filter out records after `until_transaction_index`
869                if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
870                    continue;
871                }
872                infos.push((
873                    ConfirmedTransactionStatusWithSignature {
874                        signature: tx_by_addr_info.signature,
875                        slot,
876                        err: tx_by_addr_info.err,
877                        memo: tx_by_addr_info.memo,
878                        block_time: tx_by_addr_info.block_time,
879                    },
880                    tx_by_addr_info.index,
881                ));
882                // Respect limit
883                if infos.len() >= limit {
884                    break 'outer;
885                }
886            }
887        }
888        Ok(infos)
889    }
890
891    /// Upload a new confirmed block and associated meta data.
892    pub async fn upload_confirmed_block(
893        &self,
894        slot: Slot,
895        confirmed_block: VersionedConfirmedBlock,
896    ) -> Result<()> {
897        trace!("LedgerStorage::upload_confirmed_block request received: {slot:?}");
898        self.upload_confirmed_block_with_entries(
899            slot,
900            VersionedConfirmedBlockWithEntries {
901                block: confirmed_block,
902                entries: vec![],
903            },
904        )
905        .await
906    }
907
908    pub async fn upload_confirmed_block_with_entries(
909        &self,
910        slot: Slot,
911        confirmed_block: VersionedConfirmedBlockWithEntries,
912    ) -> Result<()> {
913        trace!("LedgerStorage::upload_confirmed_block_with_entries request received: {slot:?}");
914        let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
915        let VersionedConfirmedBlockWithEntries {
916            block: confirmed_block,
917            entries,
918        } = confirmed_block;
919
920        let reserved_account_keys = ReservedAccountKeys::new_all_activated();
921        let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
922        for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
923            let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
924            let err = meta.status.clone().err();
925            let index = index as u32;
926            let signature = transaction.signatures[0];
927            let memo = extract_and_fmt_memos(transaction_with_meta);
928
929            for address in transaction_with_meta.account_keys().iter() {
930                // Historical note that previously only a set of sysvar ids were
931                // skipped from being uploaded. Now we skip uploaded for the set
932                // of all reserved account keys which will continue to grow in
933                // the future.
934                if !reserved_account_keys.is_reserved(address) {
935                    by_addr
936                        .entry(address)
937                        .or_default()
938                        .push(TransactionByAddrInfo {
939                            signature,
940                            err: err.clone(),
941                            index,
942                            memo: memo.clone(),
943                            block_time: confirmed_block.block_time,
944                        });
945                }
946            }
947
948            tx_cells.push((
949                signature.to_string(),
950                TransactionInfo {
951                    slot,
952                    index,
953                    err,
954                    memo,
955                },
956            ));
957        }
958
959        let tx_by_addr_cells: Vec<_> = by_addr
960            .into_iter()
961            .map(|(address, transaction_info_by_addr)| {
962                (
963                    format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
964                    tx_by_addr::TransactionByAddr {
965                        tx_by_addrs: transaction_info_by_addr
966                            .into_iter()
967                            .map(|by_addr| by_addr.into())
968                            .collect(),
969                    },
970                )
971            })
972            .collect();
973
974        let num_entries = entries.len();
975        let entry_cell = (
976            slot_to_entries_key(slot),
977            entries::Entries {
978                entries: entries.into_iter().enumerate().map(Into::into).collect(),
979            },
980        );
981
982        let mut tasks = vec![];
983
984        if !tx_cells.is_empty() {
985            let conn = self.connection.clone();
986            tasks.push(tokio::spawn(async move {
987                conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
988                    .await
989            }));
990        }
991
992        if !tx_by_addr_cells.is_empty() {
993            let conn = self.connection.clone();
994            tasks.push(tokio::spawn(async move {
995                conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
996                    "tx-by-addr",
997                    &tx_by_addr_cells,
998                )
999                .await
1000            }));
1001        }
1002
1003        if num_entries > 0 {
1004            let conn = self.connection.clone();
1005            tasks.push(tokio::spawn(async move {
1006                conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
1007                    .await
1008            }));
1009        }
1010
1011        let mut bytes_written = 0;
1012        let mut maybe_first_err: Option<Error> = None;
1013
1014        let results = futures::future::join_all(tasks).await;
1015        for result in results {
1016            match result {
1017                Err(err) => {
1018                    if maybe_first_err.is_none() {
1019                        maybe_first_err = Some(Error::TokioJoinError(err));
1020                    }
1021                }
1022                Ok(Err(err)) => {
1023                    if maybe_first_err.is_none() {
1024                        maybe_first_err = Some(Error::BigTableError(err));
1025                    }
1026                }
1027                Ok(Ok(bytes)) => {
1028                    bytes_written += bytes;
1029                }
1030            }
1031        }
1032
1033        if let Some(err) = maybe_first_err {
1034            return Err(err);
1035        }
1036
1037        let num_transactions = confirmed_block.transactions.len();
1038
1039        // Store the block itself last, after all other metadata about the block has been
1040        // successfully stored.  This avoids partial uploaded blocks from becoming visible to
1041        // `get_confirmed_block()` and `get_confirmed_blocks()`
1042        let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
1043        bytes_written += self
1044            .connection
1045            .put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
1046            .await?;
1047        datapoint_info!(
1048            "storage-bigtable-upload-block",
1049            ("slot", slot, i64),
1050            ("transactions", num_transactions, i64),
1051            ("entries", num_entries, i64),
1052            ("bytes", bytes_written, i64),
1053        );
1054        Ok(())
1055    }
1056
1057    // Delete a confirmed block and associated meta data.
1058    pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
1059        let mut addresses: HashSet<&Pubkey> = HashSet::new();
1060        let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
1061        let confirmed_block = self.get_confirmed_block(slot).await?;
1062        for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
1063            match transaction_with_meta {
1064                TransactionWithStatusMeta::MissingMetadata(transaction) => {
1065                    let signature = transaction.signatures[0];
1066                    let index = index as u32;
1067                    let err = None;
1068
1069                    for address in transaction.message.account_keys.iter() {
1070                        // We could skip deleting addresses that are known
1071                        // reserved keys but it's hard to be sure whether we
1072                        // previously uploaded rows for reserved keys or not. So
1073                        // to ensure everything is deleted properly, we attempt
1074                        // to delete rows for all addresses even if they might
1075                        // not have been uploaded.
1076                        addresses.insert(address);
1077                    }
1078
1079                    expected_tx_infos.insert(
1080                        signature.to_string(),
1081                        UploadedTransaction { slot, index, err },
1082                    );
1083                }
1084                TransactionWithStatusMeta::Complete(tx_with_meta) => {
1085                    let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta;
1086                    let signature = transaction.signatures[0];
1087                    let index = index as u32;
1088                    let err = meta.status.clone().err();
1089
1090                    for address in tx_with_meta.account_keys().iter() {
1091                        // We could skip deleting addresses that are known
1092                        // reserved keys but it's hard to be sure whether we
1093                        // previously uploaded rows for reserved keys or not. So
1094                        // to ensure everything is deleted properly, we attempt
1095                        // to delete rows for all addresses even if they might
1096                        // not have been uploaded.
1097                        addresses.insert(address);
1098                    }
1099
1100                    expected_tx_infos.insert(
1101                        signature.to_string(),
1102                        UploadedTransaction { slot, index, err },
1103                    );
1104                }
1105            }
1106        }
1107
1108        let address_slot_rows: Vec<_> = addresses
1109            .into_iter()
1110            .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
1111            .collect();
1112
1113        let tx_deletion_rows = if !expected_tx_infos.is_empty() {
1114            let signatures = expected_tx_infos.keys().cloned().collect::<Vec<_>>();
1115            let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
1116                self.connection
1117                    .get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
1118                    .await?
1119                    .into_iter()
1120                    .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
1121                    .collect::<HashMap<_, _>>();
1122
1123            let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
1124            for (signature, expected_tx_info) in expected_tx_infos {
1125                match fetched_tx_infos.get(&signature) {
1126                    Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
1127                        deletion_rows.push(signature);
1128                    }
1129                    Some(Ok(fetched_tx_info)) => {
1130                        warn!(
1131                            "skipped tx row {} because the bigtable entry ({:?}) did not match to \
1132                             {:?}",
1133                            signature, fetched_tx_info, &expected_tx_info,
1134                        );
1135                    }
1136                    Some(Err(err)) => {
1137                        warn!(
1138                            "skipped tx row {signature} because the bigtable entry was corrupted: \
1139                             {err:?}"
1140                        );
1141                    }
1142                    None => {
1143                        warn!("skipped tx row {signature} because it was not found");
1144                    }
1145                }
1146            }
1147            deletion_rows
1148        } else {
1149            vec![]
1150        };
1151
1152        let entries_exist = self
1153            .connection
1154            .client()
1155            .row_key_exists("entries", slot_to_entries_key(slot))
1156            .await
1157            .is_ok_and(|x| x);
1158
1159        if !dry_run {
1160            if !address_slot_rows.is_empty() {
1161                self.connection
1162                    .delete_rows_with_retry("tx-by-addr", &address_slot_rows)
1163                    .await?;
1164            }
1165
1166            if !tx_deletion_rows.is_empty() {
1167                self.connection
1168                    .delete_rows_with_retry("tx", &tx_deletion_rows)
1169                    .await?;
1170            }
1171
1172            if entries_exist {
1173                self.connection
1174                    .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)])
1175                    .await?;
1176            }
1177
1178            self.connection
1179                .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
1180                .await?;
1181        }
1182
1183        info!(
1184            "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} \
1185             entry row",
1186            if dry_run { "[dry run] " } else { "" },
1187            slot,
1188            tx_deletion_rows.len(),
1189            address_slot_rows.len(),
1190            if entries_exist { "with" } else { "WITHOUT" }
1191        );
1192
1193        Ok(())
1194    }
1195}
1196
1197#[cfg(test)]
1198mod test {
1199    use super::*;
1200
1201    #[test]
1202    fn test_slot_to_key() {
1203        assert_eq!(slot_to_key(0), "0000000000000000");
1204        assert_eq!(slot_to_key(!0), "ffffffffffffffff");
1205    }
1206}