solana_storage_bigtable/
lib.rs

1#![allow(clippy::arithmetic_side_effects)]
2
3use {
4    crate::bigtable::RowKey,
5    agave_reserved_account_keys::ReservedAccountKeys,
6    log::*,
7    serde::{Deserialize, Serialize},
8    solana_clock::{Slot, UnixTimestamp},
9    solana_message::v0::LoadedAddresses,
10    solana_metrics::datapoint_info,
11    solana_pubkey::Pubkey,
12    solana_serde::default_on_eof,
13    solana_signature::Signature,
14    solana_storage_proto::convert::{entries, generated, tx_by_addr},
15    solana_time_utils::AtomicInterval,
16    solana_transaction::versioned::VersionedTransaction,
17    solana_transaction_error::TransactionError,
18    solana_transaction_status::{
19        extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
20        ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo,
21        TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
22        TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
23        VersionedTransactionWithStatusMeta,
24    },
25    std::{
26        collections::{HashMap, HashSet},
27        convert::TryInto,
28        sync::{
29            atomic::{AtomicUsize, Ordering},
30            Arc,
31        },
32        time::Duration,
33    },
34    thiserror::Error,
35    tokio::task::JoinError,
36};
37
38#[macro_use]
39extern crate solana_metrics;
40
41#[macro_use]
42extern crate serde_derive;
43
44mod access_token;
45mod bigtable;
46mod compression;
47mod root_ca_certificate;
48
49#[derive(Debug, Error)]
50pub enum Error {
51    #[error("BigTable: {0}")]
52    BigTableError(bigtable::Error),
53
54    #[error("I/O Error: {0}")]
55    IoError(std::io::Error),
56
57    #[error("Transaction encoded is not supported")]
58    UnsupportedTransactionEncoding,
59
60    #[error("Block not found: {0}")]
61    BlockNotFound(Slot),
62
63    #[error("Signature not found")]
64    SignatureNotFound,
65
66    #[error("tokio error")]
67    TokioJoinError(JoinError),
68}
69
70impl std::convert::From<bigtable::Error> for Error {
71    fn from(err: bigtable::Error) -> Self {
72        Self::BigTableError(err)
73    }
74}
75
76impl std::convert::From<std::io::Error> for Error {
77    fn from(err: std::io::Error) -> Self {
78        Self::IoError(err)
79    }
80}
81
82pub type Result<T> = std::result::Result<T, Error>;
83
84// Convert a slot to its bucket representation whereby lower slots are always lexically ordered
85// before higher slots
86fn slot_to_key(slot: Slot) -> String {
87    format!("{slot:016x}")
88}
89
90fn slot_to_blocks_key(slot: Slot) -> String {
91    slot_to_key(slot)
92}
93
94fn slot_to_entries_key(slot: Slot) -> String {
95    slot_to_key(slot)
96}
97
98fn slot_to_tx_by_addr_key(slot: Slot) -> String {
99    slot_to_key(!slot)
100}
101
102// Reverse of `slot_to_key`
103fn key_to_slot(key: &str) -> Option<Slot> {
104    match Slot::from_str_radix(key, 16) {
105        Ok(slot) => Some(slot),
106        Err(err) => {
107            // bucket data is probably corrupt
108            warn!("Failed to parse object key as a slot: {}: {}", key, err);
109            None
110        }
111    }
112}
113
114// A serialized `StoredConfirmedBlock` is stored in the `block` table
115//
116// StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids
117// some serde JSON directives that cause issues with bincode
118//
119// Note: in order to continue to support old bincode-serialized bigtable entries, if new fields are
120// added to ConfirmedBlock, they must either be excluded or set to `default_on_eof` here
121//
122#[derive(Serialize, Deserialize)]
123struct StoredConfirmedBlock {
124    previous_blockhash: String,
125    blockhash: String,
126    parent_slot: Slot,
127    transactions: Vec<StoredConfirmedBlockTransaction>,
128    rewards: StoredConfirmedBlockRewards,
129    block_time: Option<UnixTimestamp>,
130    #[serde(deserialize_with = "default_on_eof")]
131    block_height: Option<u64>,
132}
133
134#[cfg(test)]
135impl From<ConfirmedBlock> for StoredConfirmedBlock {
136    fn from(confirmed_block: ConfirmedBlock) -> Self {
137        let ConfirmedBlock {
138            previous_blockhash,
139            blockhash,
140            parent_slot,
141            transactions,
142            rewards,
143            num_partitions: _num_partitions,
144            block_time,
145            block_height,
146        } = confirmed_block;
147
148        Self {
149            previous_blockhash,
150            blockhash,
151            parent_slot,
152            transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
153            rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
154            block_time,
155            block_height,
156        }
157    }
158}
159
160impl From<StoredConfirmedBlock> for ConfirmedBlock {
161    fn from(confirmed_block: StoredConfirmedBlock) -> Self {
162        let StoredConfirmedBlock {
163            previous_blockhash,
164            blockhash,
165            parent_slot,
166            transactions,
167            rewards,
168            block_time,
169            block_height,
170        } = confirmed_block;
171
172        Self {
173            previous_blockhash,
174            blockhash,
175            parent_slot,
176            transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
177            rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
178            num_partitions: None,
179            block_time,
180            block_height,
181        }
182    }
183}
184
185#[derive(Serialize, Deserialize)]
186struct StoredConfirmedBlockTransaction {
187    transaction: VersionedTransaction,
188    meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
189}
190
191#[cfg(test)]
192impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
193    fn from(value: TransactionWithStatusMeta) -> Self {
194        match value {
195            TransactionWithStatusMeta::MissingMetadata(transaction) => Self {
196                transaction: VersionedTransaction::from(transaction),
197                meta: None,
198            },
199            TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
200                transaction,
201                meta,
202            }) => Self {
203                transaction,
204                meta: Some(meta.into()),
205            },
206        }
207    }
208}
209
210impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
211    fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self {
212        let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta;
213        match meta {
214            None => Self::MissingMetadata(
215                transaction
216                    .into_legacy_transaction()
217                    .expect("versioned transactions always have meta"),
218            ),
219            Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta {
220                transaction,
221                meta: meta.into(),
222            }),
223        }
224    }
225}
226
227#[derive(Serialize, Deserialize)]
228struct StoredConfirmedBlockTransactionStatusMeta {
229    err: Option<TransactionError>,
230    fee: u64,
231    pre_balances: Vec<u64>,
232    post_balances: Vec<u64>,
233}
234
235impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
236    fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
237        let StoredConfirmedBlockTransactionStatusMeta {
238            err,
239            fee,
240            pre_balances,
241            post_balances,
242        } = value;
243        let status = match &err {
244            None => Ok(()),
245            Some(err) => Err(err.clone()),
246        };
247        Self {
248            status,
249            fee,
250            pre_balances,
251            post_balances,
252            inner_instructions: None,
253            log_messages: None,
254            pre_token_balances: None,
255            post_token_balances: None,
256            rewards: None,
257            loaded_addresses: LoadedAddresses::default(),
258            return_data: None,
259            compute_units_consumed: None,
260            cost_units: None,
261        }
262    }
263}
264
265impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
266    fn from(value: TransactionStatusMeta) -> Self {
267        let TransactionStatusMeta {
268            status,
269            fee,
270            pre_balances,
271            post_balances,
272            ..
273        } = value;
274        Self {
275            err: status.err(),
276            fee,
277            pre_balances,
278            post_balances,
279        }
280    }
281}
282
283type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
284
285#[derive(Serialize, Deserialize)]
286struct StoredConfirmedBlockReward {
287    pubkey: String,
288    lamports: i64,
289}
290
291impl From<StoredConfirmedBlockReward> for Reward {
292    fn from(value: StoredConfirmedBlockReward) -> Self {
293        let StoredConfirmedBlockReward { pubkey, lamports } = value;
294        Self {
295            pubkey,
296            lamports,
297            post_balance: 0,
298            reward_type: None,
299            commission: None,
300        }
301    }
302}
303
304impl From<Reward> for StoredConfirmedBlockReward {
305    fn from(value: Reward) -> Self {
306        let Reward {
307            pubkey, lamports, ..
308        } = value;
309        Self { pubkey, lamports }
310    }
311}
312
313// A serialized `TransactionInfo` is stored in the `tx` table
314#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
315struct TransactionInfo {
316    slot: Slot, // The slot that contains the block with this transaction in it
317    index: u32, // Where the transaction is located in the block
318    err: Option<TransactionError>, // None if the transaction executed successfully
319    memo: Option<String>, // Transaction memo
320}
321
322// Part of a serialized `TransactionInfo` which is stored in the `tx` table
323#[derive(PartialEq, Eq, Debug)]
324struct UploadedTransaction {
325    slot: Slot, // The slot that contains the block with this transaction in it
326    index: u32, // Where the transaction is located in the block
327    err: Option<TransactionError>, // None if the transaction executed successfully
328}
329
330impl From<TransactionInfo> for UploadedTransaction {
331    fn from(transaction_info: TransactionInfo) -> Self {
332        Self {
333            slot: transaction_info.slot,
334            index: transaction_info.index,
335            err: transaction_info.err,
336        }
337    }
338}
339
340impl From<TransactionInfo> for TransactionStatus {
341    fn from(transaction_info: TransactionInfo) -> Self {
342        let TransactionInfo { slot, err, .. } = transaction_info;
343        let status = match &err {
344            None => Ok(()),
345            Some(err) => Err(err.clone()),
346        };
347        Self {
348            slot,
349            confirmations: None,
350            status,
351            err,
352            confirmation_status: Some(TransactionConfirmationStatus::Finalized),
353        }
354    }
355}
356
357#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
358struct LegacyTransactionByAddrInfo {
359    pub signature: Signature,          // The transaction signature
360    pub err: Option<TransactionError>, // None if the transaction executed successfully
361    pub index: u32,                    // Where the transaction is located in the block
362    pub memo: Option<String>,          // Transaction memo
363}
364
365impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
366    fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
367        let LegacyTransactionByAddrInfo {
368            signature,
369            err,
370            index,
371            memo,
372        } = legacy;
373
374        Self {
375            signature,
376            err,
377            index,
378            memo,
379            block_time: None,
380        }
381    }
382}
383
384pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
385pub const DEFAULT_APP_PROFILE_ID: &str = "default";
386pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB
387
388#[derive(Debug)]
389pub enum CredentialType {
390    Filepath(Option<String>),
391    Stringified(String),
392}
393
394#[derive(Debug)]
395pub struct LedgerStorageConfig {
396    pub read_only: bool,
397    pub timeout: Option<std::time::Duration>,
398    pub credential_type: CredentialType,
399    pub instance_name: String,
400    pub app_profile_id: String,
401    pub max_message_size: usize,
402}
403
404impl Default for LedgerStorageConfig {
405    fn default() -> Self {
406        Self {
407            read_only: true,
408            timeout: None,
409            credential_type: CredentialType::Filepath(None),
410            instance_name: DEFAULT_INSTANCE_NAME.to_string(),
411            app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
412            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
413        }
414    }
415}
416
417const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
418
419#[derive(Default)]
420struct LedgerStorageStats {
421    num_queries: AtomicUsize,
422    last_report: AtomicInterval,
423}
424
425impl LedgerStorageStats {
426    fn increment_num_queries(&self) {
427        self.num_queries.fetch_add(1, Ordering::Relaxed);
428        self.maybe_report();
429    }
430
431    fn maybe_report(&self) {
432        if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
433            datapoint_debug!(
434                "storage-bigtable-query",
435                (
436                    "num_queries",
437                    self.num_queries.swap(0, Ordering::Relaxed) as i64,
438                    i64
439                )
440            );
441        }
442    }
443}
444
445#[derive(Clone)]
446pub struct LedgerStorage {
447    connection: bigtable::BigTableConnection,
448    stats: Arc<LedgerStorageStats>,
449}
450
451impl LedgerStorage {
452    pub async fn new(
453        read_only: bool,
454        timeout: Option<std::time::Duration>,
455        credential_path: Option<String>,
456    ) -> Result<Self> {
457        Self::new_with_config(LedgerStorageConfig {
458            read_only,
459            timeout,
460            credential_type: CredentialType::Filepath(credential_path),
461            ..LedgerStorageConfig::default()
462        })
463        .await
464    }
465
466    pub fn new_for_emulator(
467        instance_name: &str,
468        app_profile_id: &str,
469        endpoint: &str,
470        timeout: Option<Duration>,
471    ) -> Result<Self> {
472        let stats = Arc::new(LedgerStorageStats::default());
473        Ok(Self {
474            connection: bigtable::BigTableConnection::new_for_emulator(
475                instance_name,
476                app_profile_id,
477                endpoint,
478                timeout,
479                LedgerStorageConfig::default().max_message_size,
480            )?,
481            stats,
482        })
483    }
484
485    pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
486        let stats = Arc::new(LedgerStorageStats::default());
487        let LedgerStorageConfig {
488            read_only,
489            timeout,
490            instance_name,
491            app_profile_id,
492            credential_type,
493            max_message_size,
494        } = config;
495        let connection = bigtable::BigTableConnection::new(
496            instance_name.as_str(),
497            app_profile_id.as_str(),
498            read_only,
499            timeout,
500            credential_type,
501            max_message_size,
502        )
503        .await?;
504        Ok(Self { stats, connection })
505    }
506
507    pub async fn new_with_stringified_credential(credential: String) -> Result<Self> {
508        Self::new_with_config(LedgerStorageConfig {
509            credential_type: CredentialType::Stringified(credential),
510            ..LedgerStorageConfig::default()
511        })
512        .await
513    }
514
515    /// Return the available slot that contains a block
516    pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
517        trace!("LedgerStorage::get_first_available_block request received");
518        self.stats.increment_num_queries();
519        let mut bigtable = self.connection.client();
520        let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
521        if blocks.is_empty() {
522            return Ok(None);
523        }
524        Ok(key_to_slot(&blocks[0]))
525    }
526
527    /// Fetch the next slots after the provided slot that contains a block
528    ///
529    /// start_slot: slot to start the search from (inclusive)
530    /// limit: stop after this many slots have been found
531    pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
532        trace!(
533            "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}",
534            start_slot,
535            limit
536        );
537        self.stats.increment_num_queries();
538        let mut bigtable = self.connection.client();
539        let blocks = bigtable
540            .get_row_keys(
541                "blocks",
542                Some(slot_to_blocks_key(start_slot)),
543                None,
544                limit as i64,
545            )
546            .await?;
547        Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
548    }
549
550    // Fetches and gets a vector of confirmed blocks via a multirow fetch
551    pub async fn get_confirmed_blocks_with_data<'a>(
552        &self,
553        slots: &'a [Slot],
554    ) -> Result<impl Iterator<Item = (Slot, ConfirmedBlock)> + 'a> {
555        trace!(
556            "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}",
557            slots
558        );
559        self.stats.increment_num_queries();
560        let mut bigtable = self.connection.client();
561        let row_keys = slots.iter().copied().map(slot_to_blocks_key);
562        let data = bigtable
563            .get_protobuf_or_bincode_cells("blocks", row_keys)
564            .await?
565            .filter_map(
566                |(row_key, block_cell_data): (
567                    RowKey,
568                    bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
569                )| {
570                    let block = match block_cell_data {
571                        bigtable::CellData::Bincode(block) => block.into(),
572                        bigtable::CellData::Protobuf(block) => block.try_into().ok()?,
573                    };
574                    Some((key_to_slot(&row_key).unwrap(), block))
575                },
576            );
577        Ok(data)
578    }
579
580    /// Fetch the confirmed block from the desired slot
581    pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
582        trace!(
583            "LedgerStorage::get_confirmed_block request received: {:?}",
584            slot
585        );
586        self.stats.increment_num_queries();
587        let mut bigtable = self.connection.client();
588        let block_cell_data = bigtable
589            .get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
590                "blocks",
591                slot_to_blocks_key(slot),
592            )
593            .await
594            .map_err(|err| match err {
595                bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
596                _ => err.into(),
597            })?;
598        Ok(match block_cell_data {
599            bigtable::CellData::Bincode(block) => block.into(),
600            bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
601                bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
602            })?,
603        })
604    }
605
606    /// Does the confirmed block exist in the Bigtable
607    pub async fn confirmed_block_exists(&self, slot: Slot) -> Result<bool> {
608        trace!(
609            "LedgerStorage::confirmed_block_exists request received: {:?}",
610            slot
611        );
612        self.stats.increment_num_queries();
613        let mut bigtable = self.connection.client();
614
615        let block_exists = bigtable
616            .row_key_exists("blocks", slot_to_blocks_key(slot))
617            .await?;
618
619        Ok(block_exists)
620    }
621
622    /// Fetches a vector of block entries via a multirow fetch
623    pub async fn get_entries(&self, slot: Slot) -> Result<impl Iterator<Item = EntrySummary>> {
624        trace!(
625            "LedgerStorage::get_block_entries request received: {:?}",
626            slot
627        );
628        self.stats.increment_num_queries();
629        let mut bigtable = self.connection.client();
630        let entry_cell_data = bigtable
631            .get_protobuf_cell::<entries::Entries>("entries", slot_to_entries_key(slot))
632            .await
633            .map_err(|err| match err {
634                bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
635                _ => err.into(),
636            })?;
637        let entries = entry_cell_data.entries.into_iter().map(Into::into);
638        Ok(entries)
639    }
640
641    pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
642        trace!(
643            "LedgerStorage::get_signature_status request received: {:?}",
644            signature
645        );
646        self.stats.increment_num_queries();
647        let mut bigtable = self.connection.client();
648        let transaction_info = bigtable
649            .get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
650            .await
651            .map_err(|err| match err {
652                bigtable::Error::RowNotFound => Error::SignatureNotFound,
653                _ => err.into(),
654            })?;
655        Ok(transaction_info.into())
656    }
657
658    // Fetches and gets a vector of confirmed transactions via a multirow fetch
659    pub async fn get_confirmed_transactions(
660        &self,
661        signatures: &[Signature],
662    ) -> Result<Vec<ConfirmedTransactionWithStatusMeta>> {
663        trace!(
664            "LedgerStorage::get_confirmed_transactions request received: {:?}",
665            signatures
666        );
667        self.stats.increment_num_queries();
668        let mut bigtable = self.connection.client();
669
670        // Fetch transactions info
671        let keys = signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>();
672        let cells = bigtable
673            .get_bincode_cells::<TransactionInfo>("tx", &keys)
674            .await?;
675
676        // Collect by slot
677        let mut order: Vec<(Slot, u32, String)> = Vec::new();
678        let mut slots: HashSet<Slot> = HashSet::new();
679        for cell in cells {
680            if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell {
681                order.push((slot, index, signature));
682                slots.insert(slot);
683            }
684        }
685
686        // Fetch blocks
687        let blocks = self
688            .get_confirmed_blocks_with_data(&slots.into_iter().collect::<Vec<_>>())
689            .await?
690            .collect::<HashMap<_, _>>();
691
692        // Extract transactions
693        Ok(order
694            .into_iter()
695            .filter_map(|(slot, index, signature)| {
696                blocks.get(&slot).and_then(|block| {
697                    block
698                        .transactions
699                        .get(index as usize)
700                        .and_then(|tx_with_meta| {
701                            if tx_with_meta.transaction_signature().to_string() != *signature {
702                                warn!(
703                                    "Transaction info or confirmed block for {} is corrupt",
704                                    signature
705                                );
706                                None
707                            } else {
708                                Some(ConfirmedTransactionWithStatusMeta {
709                                    slot,
710                                    tx_with_meta: tx_with_meta.clone(),
711                                    block_time: block.block_time,
712                                })
713                            }
714                        })
715                })
716            })
717            .collect::<Vec<_>>())
718    }
719
720    /// Fetch a confirmed transaction
721    pub async fn get_confirmed_transaction(
722        &self,
723        signature: &Signature,
724    ) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
725        trace!(
726            "LedgerStorage::get_confirmed_transaction request received: {:?}",
727            signature
728        );
729        self.stats.increment_num_queries();
730        let mut bigtable = self.connection.client();
731
732        // Figure out which block the transaction is located in
733        let TransactionInfo { slot, index, .. } = bigtable
734            .get_bincode_cell("tx", signature.to_string())
735            .await
736            .map_err(|err| match err {
737                bigtable::Error::RowNotFound => Error::SignatureNotFound,
738                _ => err.into(),
739            })?;
740
741        // Load the block and return the transaction
742        let block = self.get_confirmed_block(slot).await?;
743        match block.transactions.into_iter().nth(index as usize) {
744            None => {
745                // report this somewhere actionable?
746                warn!("Transaction info for {} is corrupt", signature);
747                Ok(None)
748            }
749            Some(tx_with_meta) => {
750                if tx_with_meta.transaction_signature() != signature {
751                    warn!(
752                        "Transaction info or confirmed block for {} is corrupt",
753                        signature
754                    );
755                    Ok(None)
756                } else {
757                    Ok(Some(ConfirmedTransactionWithStatusMeta {
758                        slot,
759                        tx_with_meta,
760                        block_time: block.block_time,
761                    }))
762                }
763            }
764        }
765    }
766
767    /// Get confirmed signatures for the provided address, in descending ledger order
768    ///
769    /// address: address to search for
770    /// before_signature: start with the first signature older than this one
771    /// until_signature: end with the last signature more recent than this one
772    /// limit: stop after this many signatures; if limit==0, all records in the table will be read
773    pub async fn get_confirmed_signatures_for_address(
774        &self,
775        address: &Pubkey,
776        before_signature: Option<&Signature>,
777        until_signature: Option<&Signature>,
778        limit: usize,
779    ) -> Result<
780        Vec<(
781            ConfirmedTransactionStatusWithSignature,
782            u32, /*slot index*/
783        )>,
784    > {
785        trace!(
786            "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}",
787            address
788        );
789        self.stats.increment_num_queries();
790        let mut bigtable = self.connection.client();
791        let address_prefix = format!("{address}/");
792
793        // Figure out where to start listing from based on `before_signature`
794        let (first_slot, before_transaction_index) = match before_signature {
795            None => (Slot::MAX, 0),
796            Some(before_signature) => {
797                let TransactionInfo { slot, index, .. } = bigtable
798                    .get_bincode_cell("tx", before_signature.to_string())
799                    .await
800                    .map_err(|err| match err {
801                        bigtable::Error::RowNotFound => Error::SignatureNotFound,
802                        _ => err.into(),
803                    })?;
804
805                (slot, index)
806            }
807        };
808
809        // Figure out where to end listing from based on `until_signature`
810        let (last_slot, until_transaction_index) = match until_signature {
811            None => (0, u32::MAX),
812            Some(until_signature) => {
813                let TransactionInfo { slot, index, .. } = bigtable
814                    .get_bincode_cell("tx", until_signature.to_string())
815                    .await
816                    .map_err(|err| match err {
817                        bigtable::Error::RowNotFound => Error::SignatureNotFound,
818                        _ => err.into(),
819                    })?;
820
821                (slot, index)
822            }
823        };
824
825        let mut infos = vec![];
826
827        let starting_slot_tx_len = bigtable
828            .get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
829                "tx-by-addr",
830                format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
831            )
832            .await
833            .map(|cell_data| {
834                match cell_data {
835                    bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
836                    bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
837                }
838            })
839            .unwrap_or(0);
840
841        // Return the next tx-by-addr data of amount `limit` plus extra to account for the largest
842        // number that might be filtered out
843        let tx_by_addr_data = bigtable
844            .get_row_data(
845                "tx-by-addr",
846                Some(format!(
847                    "{}{}",
848                    address_prefix,
849                    slot_to_tx_by_addr_key(first_slot),
850                )),
851                Some(format!(
852                    "{}{}",
853                    address_prefix,
854                    slot_to_tx_by_addr_key(last_slot),
855                )),
856                limit as i64 + starting_slot_tx_len as i64,
857            )
858            .await?;
859
860        'outer: for (row_key, data) in tx_by_addr_data {
861            let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
862                bigtable::Error::ObjectCorrupt(format!(
863                    "Failed to convert key to slot: tx-by-addr/{row_key}"
864                ))
865            })?;
866
867            let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
868                Vec<LegacyTransactionByAddrInfo>,
869                tx_by_addr::TransactionByAddr,
870            >(&data, "tx-by-addr", row_key.clone())?;
871
872            let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
873                bigtable::CellData::Bincode(tx_by_addr) => {
874                    tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
875                }
876                bigtable::CellData::Protobuf(tx_by_addr) => {
877                    tx_by_addr.try_into().map_err(|error| {
878                        bigtable::Error::ObjectCorrupt(format!(
879                            "Failed to deserialize: {}: tx-by-addr/{}",
880                            error,
881                            row_key.clone()
882                        ))
883                    })?
884                }
885            };
886
887            cell_data.reverse();
888            for tx_by_addr_info in cell_data.into_iter() {
889                // Filter out records before `before_transaction_index`
890                if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
891                    continue;
892                }
893                // Filter out records after `until_transaction_index`
894                if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
895                    continue;
896                }
897                infos.push((
898                    ConfirmedTransactionStatusWithSignature {
899                        signature: tx_by_addr_info.signature,
900                        slot,
901                        err: tx_by_addr_info.err,
902                        memo: tx_by_addr_info.memo,
903                        block_time: tx_by_addr_info.block_time,
904                    },
905                    tx_by_addr_info.index,
906                ));
907                // Respect limit
908                if infos.len() >= limit {
909                    break 'outer;
910                }
911            }
912        }
913        Ok(infos)
914    }
915
916    /// Upload a new confirmed block and associated meta data.
917    pub async fn upload_confirmed_block(
918        &self,
919        slot: Slot,
920        confirmed_block: VersionedConfirmedBlock,
921    ) -> Result<()> {
922        trace!(
923            "LedgerStorage::upload_confirmed_block request received: {:?}",
924            slot
925        );
926        self.upload_confirmed_block_with_entries(
927            slot,
928            VersionedConfirmedBlockWithEntries {
929                block: confirmed_block,
930                entries: vec![],
931            },
932        )
933        .await
934    }
935
936    pub async fn upload_confirmed_block_with_entries(
937        &self,
938        slot: Slot,
939        confirmed_block: VersionedConfirmedBlockWithEntries,
940    ) -> Result<()> {
941        trace!(
942            "LedgerStorage::upload_confirmed_block_with_entries request received: {:?}",
943            slot
944        );
945        let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
946        let VersionedConfirmedBlockWithEntries {
947            block: confirmed_block,
948            entries,
949        } = confirmed_block;
950
951        let reserved_account_keys = ReservedAccountKeys::new_all_activated();
952        let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
953        for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
954            let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
955            let err = meta.status.clone().err();
956            let index = index as u32;
957            let signature = transaction.signatures[0];
958            let memo = extract_and_fmt_memos(transaction_with_meta);
959
960            for address in transaction_with_meta.account_keys().iter() {
961                // Historical note that previously only a set of sysvar ids were
962                // skipped from being uploaded. Now we skip uploaded for the set
963                // of all reserved account keys which will continue to grow in
964                // the future.
965                if !reserved_account_keys.is_reserved(address) {
966                    by_addr
967                        .entry(address)
968                        .or_default()
969                        .push(TransactionByAddrInfo {
970                            signature,
971                            err: err.clone(),
972                            index,
973                            memo: memo.clone(),
974                            block_time: confirmed_block.block_time,
975                        });
976                }
977            }
978
979            tx_cells.push((
980                signature.to_string(),
981                TransactionInfo {
982                    slot,
983                    index,
984                    err,
985                    memo,
986                },
987            ));
988        }
989
990        let tx_by_addr_cells: Vec<_> = by_addr
991            .into_iter()
992            .map(|(address, transaction_info_by_addr)| {
993                (
994                    format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
995                    tx_by_addr::TransactionByAddr {
996                        tx_by_addrs: transaction_info_by_addr
997                            .into_iter()
998                            .map(|by_addr| by_addr.into())
999                            .collect(),
1000                    },
1001                )
1002            })
1003            .collect();
1004
1005        let num_entries = entries.len();
1006        let entry_cell = (
1007            slot_to_entries_key(slot),
1008            entries::Entries {
1009                entries: entries.into_iter().enumerate().map(Into::into).collect(),
1010            },
1011        );
1012
1013        let mut tasks = vec![];
1014
1015        if !tx_cells.is_empty() {
1016            let conn = self.connection.clone();
1017            tasks.push(tokio::spawn(async move {
1018                conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
1019                    .await
1020            }));
1021        }
1022
1023        if !tx_by_addr_cells.is_empty() {
1024            let conn = self.connection.clone();
1025            tasks.push(tokio::spawn(async move {
1026                conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
1027                    "tx-by-addr",
1028                    &tx_by_addr_cells,
1029                )
1030                .await
1031            }));
1032        }
1033
1034        if num_entries > 0 {
1035            let conn = self.connection.clone();
1036            tasks.push(tokio::spawn(async move {
1037                conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
1038                    .await
1039            }));
1040        }
1041
1042        let mut bytes_written = 0;
1043        let mut maybe_first_err: Option<Error> = None;
1044
1045        let results = futures::future::join_all(tasks).await;
1046        for result in results {
1047            match result {
1048                Err(err) => {
1049                    if maybe_first_err.is_none() {
1050                        maybe_first_err = Some(Error::TokioJoinError(err));
1051                    }
1052                }
1053                Ok(Err(err)) => {
1054                    if maybe_first_err.is_none() {
1055                        maybe_first_err = Some(Error::BigTableError(err));
1056                    }
1057                }
1058                Ok(Ok(bytes)) => {
1059                    bytes_written += bytes;
1060                }
1061            }
1062        }
1063
1064        if let Some(err) = maybe_first_err {
1065            return Err(err);
1066        }
1067
1068        let num_transactions = confirmed_block.transactions.len();
1069
1070        // Store the block itself last, after all other metadata about the block has been
1071        // successfully stored.  This avoids partial uploaded blocks from becoming visible to
1072        // `get_confirmed_block()` and `get_confirmed_blocks()`
1073        let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
1074        bytes_written += self
1075            .connection
1076            .put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
1077            .await?;
1078        datapoint_info!(
1079            "storage-bigtable-upload-block",
1080            ("slot", slot, i64),
1081            ("transactions", num_transactions, i64),
1082            ("entries", num_entries, i64),
1083            ("bytes", bytes_written, i64),
1084        );
1085        Ok(())
1086    }
1087
1088    // Delete a confirmed block and associated meta data.
1089    pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
1090        let mut addresses: HashSet<&Pubkey> = HashSet::new();
1091        let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
1092        let confirmed_block = self.get_confirmed_block(slot).await?;
1093        for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
1094            match transaction_with_meta {
1095                TransactionWithStatusMeta::MissingMetadata(transaction) => {
1096                    let signature = transaction.signatures[0];
1097                    let index = index as u32;
1098                    let err = None;
1099
1100                    for address in transaction.message.account_keys.iter() {
1101                        // We could skip deleting addresses that are known
1102                        // reserved keys but it's hard to be sure whether we
1103                        // previously uploaded rows for reserved keys or not. So
1104                        // to ensure everything is deleted properly, we attempt
1105                        // to delete rows for all addresses even if they might
1106                        // not have been uploaded.
1107                        addresses.insert(address);
1108                    }
1109
1110                    expected_tx_infos.insert(
1111                        signature.to_string(),
1112                        UploadedTransaction { slot, index, err },
1113                    );
1114                }
1115                TransactionWithStatusMeta::Complete(tx_with_meta) => {
1116                    let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta;
1117                    let signature = transaction.signatures[0];
1118                    let index = index as u32;
1119                    let err = meta.status.clone().err();
1120
1121                    for address in tx_with_meta.account_keys().iter() {
1122                        // We could skip deleting addresses that are known
1123                        // reserved keys but it's hard to be sure whether we
1124                        // previously uploaded rows for reserved keys or not. So
1125                        // to ensure everything is deleted properly, we attempt
1126                        // to delete rows for all addresses even if they might
1127                        // not have been uploaded.
1128                        addresses.insert(address);
1129                    }
1130
1131                    expected_tx_infos.insert(
1132                        signature.to_string(),
1133                        UploadedTransaction { slot, index, err },
1134                    );
1135                }
1136            }
1137        }
1138
1139        let address_slot_rows: Vec<_> = addresses
1140            .into_iter()
1141            .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
1142            .collect();
1143
1144        let tx_deletion_rows = if !expected_tx_infos.is_empty() {
1145            let signatures = expected_tx_infos.keys().cloned().collect::<Vec<_>>();
1146            let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
1147                self.connection
1148                    .get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
1149                    .await?
1150                    .into_iter()
1151                    .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
1152                    .collect::<HashMap<_, _>>();
1153
1154            let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
1155            for (signature, expected_tx_info) in expected_tx_infos {
1156                match fetched_tx_infos.get(&signature) {
1157                    Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
1158                        deletion_rows.push(signature);
1159                    }
1160                    Some(Ok(fetched_tx_info)) => {
1161                        warn!(
1162                            "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}",
1163                            signature,
1164                            fetched_tx_info,
1165                            &expected_tx_info,
1166                        );
1167                    }
1168                    Some(Err(err)) => {
1169                        warn!(
1170                            "skipped tx row {} because the bigtable entry was corrupted: {:?}",
1171                            signature, err
1172                        );
1173                    }
1174                    None => {
1175                        warn!("skipped tx row {} because it was not found", signature);
1176                    }
1177                }
1178            }
1179            deletion_rows
1180        } else {
1181            vec![]
1182        };
1183
1184        let entries_exist = self
1185            .connection
1186            .client()
1187            .row_key_exists("entries", slot_to_entries_key(slot))
1188            .await
1189            .is_ok_and(|x| x);
1190
1191        if !dry_run {
1192            if !address_slot_rows.is_empty() {
1193                self.connection
1194                    .delete_rows_with_retry("tx-by-addr", &address_slot_rows)
1195                    .await?;
1196            }
1197
1198            if !tx_deletion_rows.is_empty() {
1199                self.connection
1200                    .delete_rows_with_retry("tx", &tx_deletion_rows)
1201                    .await?;
1202            }
1203
1204            if entries_exist {
1205                self.connection
1206                    .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)])
1207                    .await?;
1208            }
1209
1210            self.connection
1211                .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
1212                .await?;
1213        }
1214
1215        info!(
1216            "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} entry row",
1217            if dry_run { "[dry run] " } else { "" },
1218            slot,
1219            tx_deletion_rows.len(),
1220            address_slot_rows.len(),
1221            if entries_exist { "with" } else {"WITHOUT"}
1222        );
1223
1224        Ok(())
1225    }
1226}
1227
1228#[cfg(test)]
1229mod test {
1230    use super::*;
1231
1232    #[test]
1233    fn test_slot_to_key() {
1234        assert_eq!(slot_to_key(0), "0000000000000000");
1235        assert_eq!(slot_to_key(!0), "ffffffffffffffff");
1236    }
1237}