clone_solana_storage_bigtable/
lib.rs

1#![allow(clippy::arithmetic_side_effects)]
2
3use {
4    crate::bigtable::RowKey,
5    clone_agave_reserved_account_keys::ReservedAccountKeys,
6    clone_solana_clock::{Slot, UnixTimestamp},
7    clone_solana_message::v0::LoadedAddresses,
8    clone_solana_metrics::datapoint_info,
9    clone_solana_pubkey::Pubkey,
10    clone_solana_serde::default_on_eof,
11    clone_solana_signature::Signature,
12    clone_solana_storage_proto::convert::{entries, generated, tx_by_addr},
13    clone_solana_time_utils::AtomicInterval,
14    clone_solana_transaction::versioned::VersionedTransaction,
15    clone_solana_transaction_error::TransactionError,
16    clone_solana_transaction_status::{
17        extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
18        ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo,
19        TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
20        TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
21        VersionedTransactionWithStatusMeta,
22    },
23    log::*,
24    serde::{Deserialize, Serialize},
25    std::{
26        collections::{HashMap, HashSet},
27        convert::TryInto,
28        sync::{
29            atomic::{AtomicUsize, Ordering},
30            Arc,
31        },
32        time::Duration,
33    },
34    thiserror::Error,
35    tokio::task::JoinError,
36};
37
38#[macro_use]
39extern crate clone_solana_metrics;
40
41#[macro_use]
42extern crate serde_derive;
43
44mod access_token;
45mod bigtable;
46mod compression;
47mod root_ca_certificate;
48
49#[derive(Debug, Error)]
50pub enum Error {
51    #[error("BigTable: {0}")]
52    BigTableError(bigtable::Error),
53
54    #[error("I/O Error: {0}")]
55    IoError(std::io::Error),
56
57    #[error("Transaction encoded is not supported")]
58    UnsupportedTransactionEncoding,
59
60    #[error("Block not found: {0}")]
61    BlockNotFound(Slot),
62
63    #[error("Signature not found")]
64    SignatureNotFound,
65
66    #[error("tokio error")]
67    TokioJoinError(JoinError),
68}
69
70impl std::convert::From<bigtable::Error> for Error {
71    fn from(err: bigtable::Error) -> Self {
72        Self::BigTableError(err)
73    }
74}
75
76impl std::convert::From<std::io::Error> for Error {
77    fn from(err: std::io::Error) -> Self {
78        Self::IoError(err)
79    }
80}
81
82pub type Result<T> = std::result::Result<T, Error>;
83
84// Convert a slot to its bucket representation whereby lower slots are always lexically ordered
85// before higher slots
86fn slot_to_key(slot: Slot) -> String {
87    format!("{slot:016x}")
88}
89
90fn slot_to_blocks_key(slot: Slot) -> String {
91    slot_to_key(slot)
92}
93
94fn slot_to_entries_key(slot: Slot) -> String {
95    slot_to_key(slot)
96}
97
98fn slot_to_tx_by_addr_key(slot: Slot) -> String {
99    slot_to_key(!slot)
100}
101
102// Reverse of `slot_to_key`
103fn key_to_slot(key: &str) -> Option<Slot> {
104    match Slot::from_str_radix(key, 16) {
105        Ok(slot) => Some(slot),
106        Err(err) => {
107            // bucket data is probably corrupt
108            warn!("Failed to parse object key as a slot: {}: {}", key, err);
109            None
110        }
111    }
112}
113
114// A serialized `StoredConfirmedBlock` is stored in the `block` table
115//
116// StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids
117// some serde JSON directives that cause issues with bincode
118//
119// Note: in order to continue to support old bincode-serialized bigtable entries, if new fields are
120// added to ConfirmedBlock, they must either be excluded or set to `default_on_eof` here
121//
122#[derive(Serialize, Deserialize)]
123struct StoredConfirmedBlock {
124    previous_blockhash: String,
125    blockhash: String,
126    parent_slot: Slot,
127    transactions: Vec<StoredConfirmedBlockTransaction>,
128    rewards: StoredConfirmedBlockRewards,
129    block_time: Option<UnixTimestamp>,
130    #[serde(deserialize_with = "default_on_eof")]
131    block_height: Option<u64>,
132}
133
134#[cfg(test)]
135impl From<ConfirmedBlock> for StoredConfirmedBlock {
136    fn from(confirmed_block: ConfirmedBlock) -> Self {
137        let ConfirmedBlock {
138            previous_blockhash,
139            blockhash,
140            parent_slot,
141            transactions,
142            rewards,
143            num_partitions: _num_partitions,
144            block_time,
145            block_height,
146        } = confirmed_block;
147
148        Self {
149            previous_blockhash,
150            blockhash,
151            parent_slot,
152            transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
153            rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
154            block_time,
155            block_height,
156        }
157    }
158}
159
160impl From<StoredConfirmedBlock> for ConfirmedBlock {
161    fn from(confirmed_block: StoredConfirmedBlock) -> Self {
162        let StoredConfirmedBlock {
163            previous_blockhash,
164            blockhash,
165            parent_slot,
166            transactions,
167            rewards,
168            block_time,
169            block_height,
170        } = confirmed_block;
171
172        Self {
173            previous_blockhash,
174            blockhash,
175            parent_slot,
176            transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
177            rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
178            num_partitions: None,
179            block_time,
180            block_height,
181        }
182    }
183}
184
185#[derive(Serialize, Deserialize)]
186struct StoredConfirmedBlockTransaction {
187    transaction: VersionedTransaction,
188    meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
189}
190
191#[cfg(test)]
192impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
193    fn from(value: TransactionWithStatusMeta) -> Self {
194        match value {
195            TransactionWithStatusMeta::MissingMetadata(transaction) => Self {
196                transaction: VersionedTransaction::from(transaction),
197                meta: None,
198            },
199            TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta {
200                transaction,
201                meta,
202            }) => Self {
203                transaction,
204                meta: Some(meta.into()),
205            },
206        }
207    }
208}
209
210impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
211    fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self {
212        let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta;
213        match meta {
214            None => Self::MissingMetadata(
215                transaction
216                    .into_legacy_transaction()
217                    .expect("versioned transactions always have meta"),
218            ),
219            Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta {
220                transaction,
221                meta: meta.into(),
222            }),
223        }
224    }
225}
226
227#[derive(Serialize, Deserialize)]
228struct StoredConfirmedBlockTransactionStatusMeta {
229    err: Option<TransactionError>,
230    fee: u64,
231    pre_balances: Vec<u64>,
232    post_balances: Vec<u64>,
233}
234
235impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
236    fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
237        let StoredConfirmedBlockTransactionStatusMeta {
238            err,
239            fee,
240            pre_balances,
241            post_balances,
242        } = value;
243        let status = match &err {
244            None => Ok(()),
245            Some(err) => Err(err.clone()),
246        };
247        Self {
248            status,
249            fee,
250            pre_balances,
251            post_balances,
252            inner_instructions: None,
253            log_messages: None,
254            pre_token_balances: None,
255            post_token_balances: None,
256            rewards: None,
257            loaded_addresses: LoadedAddresses::default(),
258            return_data: None,
259            compute_units_consumed: None,
260        }
261    }
262}
263
264impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
265    fn from(value: TransactionStatusMeta) -> Self {
266        let TransactionStatusMeta {
267            status,
268            fee,
269            pre_balances,
270            post_balances,
271            ..
272        } = value;
273        Self {
274            err: status.err(),
275            fee,
276            pre_balances,
277            post_balances,
278        }
279    }
280}
281
282type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
283
284#[derive(Serialize, Deserialize)]
285struct StoredConfirmedBlockReward {
286    pubkey: String,
287    lamports: i64,
288}
289
290impl From<StoredConfirmedBlockReward> for Reward {
291    fn from(value: StoredConfirmedBlockReward) -> Self {
292        let StoredConfirmedBlockReward { pubkey, lamports } = value;
293        Self {
294            pubkey,
295            lamports,
296            post_balance: 0,
297            reward_type: None,
298            commission: None,
299        }
300    }
301}
302
303impl From<Reward> for StoredConfirmedBlockReward {
304    fn from(value: Reward) -> Self {
305        let Reward {
306            pubkey, lamports, ..
307        } = value;
308        Self { pubkey, lamports }
309    }
310}
311
312// A serialized `TransactionInfo` is stored in the `tx` table
313#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
314struct TransactionInfo {
315    slot: Slot, // The slot that contains the block with this transaction in it
316    index: u32, // Where the transaction is located in the block
317    err: Option<TransactionError>, // None if the transaction executed successfully
318    memo: Option<String>, // Transaction memo
319}
320
321// Part of a serialized `TransactionInfo` which is stored in the `tx` table
322#[derive(PartialEq, Eq, Debug)]
323struct UploadedTransaction {
324    slot: Slot, // The slot that contains the block with this transaction in it
325    index: u32, // Where the transaction is located in the block
326    err: Option<TransactionError>, // None if the transaction executed successfully
327}
328
329impl From<TransactionInfo> for UploadedTransaction {
330    fn from(transaction_info: TransactionInfo) -> Self {
331        Self {
332            slot: transaction_info.slot,
333            index: transaction_info.index,
334            err: transaction_info.err,
335        }
336    }
337}
338
339impl From<TransactionInfo> for TransactionStatus {
340    fn from(transaction_info: TransactionInfo) -> Self {
341        let TransactionInfo { slot, err, .. } = transaction_info;
342        let status = match &err {
343            None => Ok(()),
344            Some(err) => Err(err.clone()),
345        };
346        Self {
347            slot,
348            confirmations: None,
349            status,
350            err,
351            confirmation_status: Some(TransactionConfirmationStatus::Finalized),
352        }
353    }
354}
355
356#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
357struct LegacyTransactionByAddrInfo {
358    pub signature: Signature,          // The transaction signature
359    pub err: Option<TransactionError>, // None if the transaction executed successfully
360    pub index: u32,                    // Where the transaction is located in the block
361    pub memo: Option<String>,          // Transaction memo
362}
363
364impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
365    fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
366        let LegacyTransactionByAddrInfo {
367            signature,
368            err,
369            index,
370            memo,
371        } = legacy;
372
373        Self {
374            signature,
375            err,
376            index,
377            memo,
378            block_time: None,
379        }
380    }
381}
382
383pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
384pub const DEFAULT_APP_PROFILE_ID: &str = "default";
385pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB
386
387#[derive(Debug)]
388pub enum CredentialType {
389    Filepath(Option<String>),
390    Stringified(String),
391}
392
393#[derive(Debug)]
394pub struct LedgerStorageConfig {
395    pub read_only: bool,
396    pub timeout: Option<std::time::Duration>,
397    pub credential_type: CredentialType,
398    pub instance_name: String,
399    pub app_profile_id: String,
400    pub max_message_size: usize,
401}
402
403impl Default for LedgerStorageConfig {
404    fn default() -> Self {
405        Self {
406            read_only: true,
407            timeout: None,
408            credential_type: CredentialType::Filepath(None),
409            instance_name: DEFAULT_INSTANCE_NAME.to_string(),
410            app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
411            max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
412        }
413    }
414}
415
416const METRICS_REPORT_INTERVAL_MS: u64 = 10_000;
417
418#[derive(Default)]
419struct LedgerStorageStats {
420    num_queries: AtomicUsize,
421    last_report: AtomicInterval,
422}
423
424impl LedgerStorageStats {
425    fn increment_num_queries(&self) {
426        self.num_queries.fetch_add(1, Ordering::Relaxed);
427        self.maybe_report();
428    }
429
430    fn maybe_report(&self) {
431        if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) {
432            datapoint_debug!(
433                "storage-bigtable-query",
434                (
435                    "num_queries",
436                    self.num_queries.swap(0, Ordering::Relaxed) as i64,
437                    i64
438                )
439            );
440        }
441    }
442}
443
444#[derive(Clone)]
445pub struct LedgerStorage {
446    connection: bigtable::BigTableConnection,
447    stats: Arc<LedgerStorageStats>,
448}
449
450impl LedgerStorage {
451    pub async fn new(
452        read_only: bool,
453        timeout: Option<std::time::Duration>,
454        credential_path: Option<String>,
455    ) -> Result<Self> {
456        Self::new_with_config(LedgerStorageConfig {
457            read_only,
458            timeout,
459            credential_type: CredentialType::Filepath(credential_path),
460            ..LedgerStorageConfig::default()
461        })
462        .await
463    }
464
465    pub fn new_for_emulator(
466        instance_name: &str,
467        app_profile_id: &str,
468        endpoint: &str,
469        timeout: Option<Duration>,
470    ) -> Result<Self> {
471        let stats = Arc::new(LedgerStorageStats::default());
472        Ok(Self {
473            connection: bigtable::BigTableConnection::new_for_emulator(
474                instance_name,
475                app_profile_id,
476                endpoint,
477                timeout,
478                LedgerStorageConfig::default().max_message_size,
479            )?,
480            stats,
481        })
482    }
483
484    pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
485        let stats = Arc::new(LedgerStorageStats::default());
486        let LedgerStorageConfig {
487            read_only,
488            timeout,
489            instance_name,
490            app_profile_id,
491            credential_type,
492            max_message_size,
493        } = config;
494        let connection = bigtable::BigTableConnection::new(
495            instance_name.as_str(),
496            app_profile_id.as_str(),
497            read_only,
498            timeout,
499            credential_type,
500            max_message_size,
501        )
502        .await?;
503        Ok(Self { stats, connection })
504    }
505
506    pub async fn new_with_stringified_credential(credential: String) -> Result<Self> {
507        Self::new_with_config(LedgerStorageConfig {
508            credential_type: CredentialType::Stringified(credential),
509            ..LedgerStorageConfig::default()
510        })
511        .await
512    }
513
514    /// Return the available slot that contains a block
515    pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
516        trace!("LedgerStorage::get_first_available_block request received");
517        self.stats.increment_num_queries();
518        let mut bigtable = self.connection.client();
519        let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
520        if blocks.is_empty() {
521            return Ok(None);
522        }
523        Ok(key_to_slot(&blocks[0]))
524    }
525
526    /// Fetch the next slots after the provided slot that contains a block
527    ///
528    /// start_slot: slot to start the search from (inclusive)
529    /// limit: stop after this many slots have been found
530    pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
531        trace!(
532            "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}",
533            start_slot,
534            limit
535        );
536        self.stats.increment_num_queries();
537        let mut bigtable = self.connection.client();
538        let blocks = bigtable
539            .get_row_keys(
540                "blocks",
541                Some(slot_to_blocks_key(start_slot)),
542                None,
543                limit as i64,
544            )
545            .await?;
546        Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
547    }
548
549    // Fetches and gets a vector of confirmed blocks via a multirow fetch
550    pub async fn get_confirmed_blocks_with_data<'a>(
551        &self,
552        slots: &'a [Slot],
553    ) -> Result<impl Iterator<Item = (Slot, ConfirmedBlock)> + 'a> {
554        trace!(
555            "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}",
556            slots
557        );
558        self.stats.increment_num_queries();
559        let mut bigtable = self.connection.client();
560        let row_keys = slots.iter().copied().map(slot_to_blocks_key);
561        let data = bigtable
562            .get_protobuf_or_bincode_cells("blocks", row_keys)
563            .await?
564            .filter_map(
565                |(row_key, block_cell_data): (
566                    RowKey,
567                    bigtable::CellData<StoredConfirmedBlock, generated::ConfirmedBlock>,
568                )| {
569                    let block = match block_cell_data {
570                        bigtable::CellData::Bincode(block) => block.into(),
571                        bigtable::CellData::Protobuf(block) => block.try_into().ok()?,
572                    };
573                    Some((key_to_slot(&row_key).unwrap(), block))
574                },
575            );
576        Ok(data)
577    }
578
579    /// Fetch the confirmed block from the desired slot
580    pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
581        trace!(
582            "LedgerStorage::get_confirmed_block request received: {:?}",
583            slot
584        );
585        self.stats.increment_num_queries();
586        let mut bigtable = self.connection.client();
587        let block_cell_data = bigtable
588            .get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
589                "blocks",
590                slot_to_blocks_key(slot),
591            )
592            .await
593            .map_err(|err| match err {
594                bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
595                _ => err.into(),
596            })?;
597        Ok(match block_cell_data {
598            bigtable::CellData::Bincode(block) => block.into(),
599            bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
600                bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
601            })?,
602        })
603    }
604
605    /// Does the confirmed block exist in the Bigtable
606    pub async fn confirmed_block_exists(&self, slot: Slot) -> Result<bool> {
607        trace!(
608            "LedgerStorage::confirmed_block_exists request received: {:?}",
609            slot
610        );
611        self.stats.increment_num_queries();
612        let mut bigtable = self.connection.client();
613
614        let block_exists = bigtable
615            .row_key_exists("blocks", slot_to_blocks_key(slot))
616            .await?;
617
618        Ok(block_exists)
619    }
620
621    /// Fetches a vector of block entries via a multirow fetch
622    pub async fn get_entries(&self, slot: Slot) -> Result<impl Iterator<Item = EntrySummary>> {
623        trace!(
624            "LedgerStorage::get_block_entries request received: {:?}",
625            slot
626        );
627        self.stats.increment_num_queries();
628        let mut bigtable = self.connection.client();
629        let entry_cell_data = bigtable
630            .get_protobuf_cell::<entries::Entries>("entries", slot_to_entries_key(slot))
631            .await
632            .map_err(|err| match err {
633                bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
634                _ => err.into(),
635            })?;
636        let entries = entry_cell_data.entries.into_iter().map(Into::into);
637        Ok(entries)
638    }
639
640    pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
641        trace!(
642            "LedgerStorage::get_signature_status request received: {:?}",
643            signature
644        );
645        self.stats.increment_num_queries();
646        let mut bigtable = self.connection.client();
647        let transaction_info = bigtable
648            .get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
649            .await
650            .map_err(|err| match err {
651                bigtable::Error::RowNotFound => Error::SignatureNotFound,
652                _ => err.into(),
653            })?;
654        Ok(transaction_info.into())
655    }
656
657    // Fetches and gets a vector of confirmed transactions via a multirow fetch
658    pub async fn get_confirmed_transactions(
659        &self,
660        signatures: &[Signature],
661    ) -> Result<Vec<ConfirmedTransactionWithStatusMeta>> {
662        trace!(
663            "LedgerStorage::get_confirmed_transactions request received: {:?}",
664            signatures
665        );
666        self.stats.increment_num_queries();
667        let mut bigtable = self.connection.client();
668
669        // Fetch transactions info
670        let keys = signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>();
671        let cells = bigtable
672            .get_bincode_cells::<TransactionInfo>("tx", &keys)
673            .await?;
674
675        // Collect by slot
676        let mut order: Vec<(Slot, u32, String)> = Vec::new();
677        let mut slots: HashSet<Slot> = HashSet::new();
678        for cell in cells {
679            if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell {
680                order.push((slot, index, signature));
681                slots.insert(slot);
682            }
683        }
684
685        // Fetch blocks
686        let blocks = self
687            .get_confirmed_blocks_with_data(&slots.into_iter().collect::<Vec<_>>())
688            .await?
689            .collect::<HashMap<_, _>>();
690
691        // Extract transactions
692        Ok(order
693            .into_iter()
694            .filter_map(|(slot, index, signature)| {
695                blocks.get(&slot).and_then(|block| {
696                    block
697                        .transactions
698                        .get(index as usize)
699                        .and_then(|tx_with_meta| {
700                            if tx_with_meta.transaction_signature().to_string() != *signature {
701                                warn!(
702                                    "Transaction info or confirmed block for {} is corrupt",
703                                    signature
704                                );
705                                None
706                            } else {
707                                Some(ConfirmedTransactionWithStatusMeta {
708                                    slot,
709                                    tx_with_meta: tx_with_meta.clone(),
710                                    block_time: block.block_time,
711                                })
712                            }
713                        })
714                })
715            })
716            .collect::<Vec<_>>())
717    }
718
719    /// Fetch a confirmed transaction
720    pub async fn get_confirmed_transaction(
721        &self,
722        signature: &Signature,
723    ) -> Result<Option<ConfirmedTransactionWithStatusMeta>> {
724        trace!(
725            "LedgerStorage::get_confirmed_transaction request received: {:?}",
726            signature
727        );
728        self.stats.increment_num_queries();
729        let mut bigtable = self.connection.client();
730
731        // Figure out which block the transaction is located in
732        let TransactionInfo { slot, index, .. } = bigtable
733            .get_bincode_cell("tx", signature.to_string())
734            .await
735            .map_err(|err| match err {
736                bigtable::Error::RowNotFound => Error::SignatureNotFound,
737                _ => err.into(),
738            })?;
739
740        // Load the block and return the transaction
741        let block = self.get_confirmed_block(slot).await?;
742        match block.transactions.into_iter().nth(index as usize) {
743            None => {
744                // report this somewhere actionable?
745                warn!("Transaction info for {} is corrupt", signature);
746                Ok(None)
747            }
748            Some(tx_with_meta) => {
749                if tx_with_meta.transaction_signature() != signature {
750                    warn!(
751                        "Transaction info or confirmed block for {} is corrupt",
752                        signature
753                    );
754                    Ok(None)
755                } else {
756                    Ok(Some(ConfirmedTransactionWithStatusMeta {
757                        slot,
758                        tx_with_meta,
759                        block_time: block.block_time,
760                    }))
761                }
762            }
763        }
764    }
765
766    /// Get confirmed signatures for the provided address, in descending ledger order
767    ///
768    /// address: address to search for
769    /// before_signature: start with the first signature older than this one
770    /// until_signature: end with the last signature more recent than this one
771    /// limit: stop after this many signatures; if limit==0, all records in the table will be read
772    pub async fn get_confirmed_signatures_for_address(
773        &self,
774        address: &Pubkey,
775        before_signature: Option<&Signature>,
776        until_signature: Option<&Signature>,
777        limit: usize,
778    ) -> Result<
779        Vec<(
780            ConfirmedTransactionStatusWithSignature,
781            u32, /*slot index*/
782        )>,
783    > {
784        trace!(
785            "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}",
786            address
787        );
788        self.stats.increment_num_queries();
789        let mut bigtable = self.connection.client();
790        let address_prefix = format!("{address}/");
791
792        // Figure out where to start listing from based on `before_signature`
793        let (first_slot, before_transaction_index) = match before_signature {
794            None => (Slot::MAX, 0),
795            Some(before_signature) => {
796                let TransactionInfo { slot, index, .. } = bigtable
797                    .get_bincode_cell("tx", before_signature.to_string())
798                    .await
799                    .map_err(|err| match err {
800                        bigtable::Error::RowNotFound => Error::SignatureNotFound,
801                        _ => err.into(),
802                    })?;
803
804                (slot, index)
805            }
806        };
807
808        // Figure out where to end listing from based on `until_signature`
809        let (last_slot, until_transaction_index) = match until_signature {
810            None => (0, u32::MAX),
811            Some(until_signature) => {
812                let TransactionInfo { slot, index, .. } = bigtable
813                    .get_bincode_cell("tx", until_signature.to_string())
814                    .await
815                    .map_err(|err| match err {
816                        bigtable::Error::RowNotFound => Error::SignatureNotFound,
817                        _ => err.into(),
818                    })?;
819
820                (slot, index)
821            }
822        };
823
824        let mut infos = vec![];
825
826        let starting_slot_tx_len = bigtable
827            .get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
828                "tx-by-addr",
829                format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
830            )
831            .await
832            .map(|cell_data| {
833                match cell_data {
834                    bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
835                    bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
836                }
837            })
838            .unwrap_or(0);
839
840        // Return the next tx-by-addr data of amount `limit` plus extra to account for the largest
841        // number that might be filtered out
842        let tx_by_addr_data = bigtable
843            .get_row_data(
844                "tx-by-addr",
845                Some(format!(
846                    "{}{}",
847                    address_prefix,
848                    slot_to_tx_by_addr_key(first_slot),
849                )),
850                Some(format!(
851                    "{}{}",
852                    address_prefix,
853                    slot_to_tx_by_addr_key(last_slot),
854                )),
855                limit as i64 + starting_slot_tx_len as i64,
856            )
857            .await?;
858
859        'outer: for (row_key, data) in tx_by_addr_data {
860            let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
861                bigtable::Error::ObjectCorrupt(format!(
862                    "Failed to convert key to slot: tx-by-addr/{row_key}"
863                ))
864            })?;
865
866            let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
867                Vec<LegacyTransactionByAddrInfo>,
868                tx_by_addr::TransactionByAddr,
869            >(&data, "tx-by-addr", row_key.clone())?;
870
871            let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
872                bigtable::CellData::Bincode(tx_by_addr) => {
873                    tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
874                }
875                bigtable::CellData::Protobuf(tx_by_addr) => {
876                    tx_by_addr.try_into().map_err(|error| {
877                        bigtable::Error::ObjectCorrupt(format!(
878                            "Failed to deserialize: {}: tx-by-addr/{}",
879                            error,
880                            row_key.clone()
881                        ))
882                    })?
883                }
884            };
885
886            cell_data.reverse();
887            for tx_by_addr_info in cell_data.into_iter() {
888                // Filter out records before `before_transaction_index`
889                if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
890                    continue;
891                }
892                // Filter out records after `until_transaction_index`
893                if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
894                    continue;
895                }
896                infos.push((
897                    ConfirmedTransactionStatusWithSignature {
898                        signature: tx_by_addr_info.signature,
899                        slot,
900                        err: tx_by_addr_info.err,
901                        memo: tx_by_addr_info.memo,
902                        block_time: tx_by_addr_info.block_time,
903                    },
904                    tx_by_addr_info.index,
905                ));
906                // Respect limit
907                if infos.len() >= limit {
908                    break 'outer;
909                }
910            }
911        }
912        Ok(infos)
913    }
914
915    /// Upload a new confirmed block and associated meta data.
916    pub async fn upload_confirmed_block(
917        &self,
918        slot: Slot,
919        confirmed_block: VersionedConfirmedBlock,
920    ) -> Result<()> {
921        trace!(
922            "LedgerStorage::upload_confirmed_block request received: {:?}",
923            slot
924        );
925        self.upload_confirmed_block_with_entries(
926            slot,
927            VersionedConfirmedBlockWithEntries {
928                block: confirmed_block,
929                entries: vec![],
930            },
931        )
932        .await
933    }
934
935    pub async fn upload_confirmed_block_with_entries(
936        &self,
937        slot: Slot,
938        confirmed_block: VersionedConfirmedBlockWithEntries,
939    ) -> Result<()> {
940        trace!(
941            "LedgerStorage::upload_confirmed_block_with_entries request received: {:?}",
942            slot
943        );
944        let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
945        let VersionedConfirmedBlockWithEntries {
946            block: confirmed_block,
947            entries,
948        } = confirmed_block;
949
950        let reserved_account_keys = ReservedAccountKeys::new_all_activated();
951        let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
952        for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
953            let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
954            let err = meta.status.clone().err();
955            let index = index as u32;
956            let signature = transaction.signatures[0];
957            let memo = extract_and_fmt_memos(transaction_with_meta);
958
959            for address in transaction_with_meta.account_keys().iter() {
960                // Historical note that previously only a set of sysvar ids were
961                // skipped from being uploaded. Now we skip uploaded for the set
962                // of all reserved account keys which will continue to grow in
963                // the future.
964                if !reserved_account_keys.is_reserved(address) {
965                    by_addr
966                        .entry(address)
967                        .or_default()
968                        .push(TransactionByAddrInfo {
969                            signature,
970                            err: err.clone(),
971                            index,
972                            memo: memo.clone(),
973                            block_time: confirmed_block.block_time,
974                        });
975                }
976            }
977
978            tx_cells.push((
979                signature.to_string(),
980                TransactionInfo {
981                    slot,
982                    index,
983                    err,
984                    memo,
985                },
986            ));
987        }
988
989        let tx_by_addr_cells: Vec<_> = by_addr
990            .into_iter()
991            .map(|(address, transaction_info_by_addr)| {
992                (
993                    format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
994                    tx_by_addr::TransactionByAddr {
995                        tx_by_addrs: transaction_info_by_addr
996                            .into_iter()
997                            .map(|by_addr| by_addr.into())
998                            .collect(),
999                    },
1000                )
1001            })
1002            .collect();
1003
1004        let num_entries = entries.len();
1005        let entry_cell = (
1006            slot_to_entries_key(slot),
1007            entries::Entries {
1008                entries: entries.into_iter().enumerate().map(Into::into).collect(),
1009            },
1010        );
1011
1012        let mut tasks = vec![];
1013
1014        if !tx_cells.is_empty() {
1015            let conn = self.connection.clone();
1016            tasks.push(tokio::spawn(async move {
1017                conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
1018                    .await
1019            }));
1020        }
1021
1022        if !tx_by_addr_cells.is_empty() {
1023            let conn = self.connection.clone();
1024            tasks.push(tokio::spawn(async move {
1025                conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
1026                    "tx-by-addr",
1027                    &tx_by_addr_cells,
1028                )
1029                .await
1030            }));
1031        }
1032
1033        if num_entries > 0 {
1034            let conn = self.connection.clone();
1035            tasks.push(tokio::spawn(async move {
1036                conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
1037                    .await
1038            }));
1039        }
1040
1041        let mut bytes_written = 0;
1042        let mut maybe_first_err: Option<Error> = None;
1043
1044        let results = futures::future::join_all(tasks).await;
1045        for result in results {
1046            match result {
1047                Err(err) => {
1048                    if maybe_first_err.is_none() {
1049                        maybe_first_err = Some(Error::TokioJoinError(err));
1050                    }
1051                }
1052                Ok(Err(err)) => {
1053                    if maybe_first_err.is_none() {
1054                        maybe_first_err = Some(Error::BigTableError(err));
1055                    }
1056                }
1057                Ok(Ok(bytes)) => {
1058                    bytes_written += bytes;
1059                }
1060            }
1061        }
1062
1063        if let Some(err) = maybe_first_err {
1064            return Err(err);
1065        }
1066
1067        let num_transactions = confirmed_block.transactions.len();
1068
1069        // Store the block itself last, after all other metadata about the block has been
1070        // successfully stored.  This avoids partial uploaded blocks from becoming visible to
1071        // `get_confirmed_block()` and `get_confirmed_blocks()`
1072        let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
1073        bytes_written += self
1074            .connection
1075            .put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
1076            .await?;
1077        datapoint_info!(
1078            "storage-bigtable-upload-block",
1079            ("slot", slot, i64),
1080            ("transactions", num_transactions, i64),
1081            ("entries", num_entries, i64),
1082            ("bytes", bytes_written, i64),
1083        );
1084        Ok(())
1085    }
1086
1087    // Delete a confirmed block and associated meta data.
1088    pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
1089        let mut addresses: HashSet<&Pubkey> = HashSet::new();
1090        let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
1091        let confirmed_block = self.get_confirmed_block(slot).await?;
1092        for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
1093            match transaction_with_meta {
1094                TransactionWithStatusMeta::MissingMetadata(transaction) => {
1095                    let signature = transaction.signatures[0];
1096                    let index = index as u32;
1097                    let err = None;
1098
1099                    for address in transaction.message.account_keys.iter() {
1100                        // We could skip deleting addresses that are known
1101                        // reserved keys but it's hard to be sure whether we
1102                        // previously uploaded rows for reserved keys or not. So
1103                        // to ensure everything is deleted properly, we attempt
1104                        // to delete rows for all addresses even if they might
1105                        // not have been uploaded.
1106                        addresses.insert(address);
1107                    }
1108
1109                    expected_tx_infos.insert(
1110                        signature.to_string(),
1111                        UploadedTransaction { slot, index, err },
1112                    );
1113                }
1114                TransactionWithStatusMeta::Complete(tx_with_meta) => {
1115                    let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta;
1116                    let signature = transaction.signatures[0];
1117                    let index = index as u32;
1118                    let err = meta.status.clone().err();
1119
1120                    for address in tx_with_meta.account_keys().iter() {
1121                        // We could skip deleting addresses that are known
1122                        // reserved keys but it's hard to be sure whether we
1123                        // previously uploaded rows for reserved keys or not. So
1124                        // to ensure everything is deleted properly, we attempt
1125                        // to delete rows for all addresses even if they might
1126                        // not have been uploaded.
1127                        addresses.insert(address);
1128                    }
1129
1130                    expected_tx_infos.insert(
1131                        signature.to_string(),
1132                        UploadedTransaction { slot, index, err },
1133                    );
1134                }
1135            }
1136        }
1137
1138        let address_slot_rows: Vec<_> = addresses
1139            .into_iter()
1140            .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
1141            .collect();
1142
1143        let tx_deletion_rows = if !expected_tx_infos.is_empty() {
1144            let signatures = expected_tx_infos.keys().cloned().collect::<Vec<_>>();
1145            let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
1146                self.connection
1147                    .get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
1148                    .await?
1149                    .into_iter()
1150                    .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
1151                    .collect::<HashMap<_, _>>();
1152
1153            let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
1154            for (signature, expected_tx_info) in expected_tx_infos {
1155                match fetched_tx_infos.get(&signature) {
1156                    Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
1157                        deletion_rows.push(signature);
1158                    }
1159                    Some(Ok(fetched_tx_info)) => {
1160                        warn!(
1161                            "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}",
1162                            signature,
1163                            fetched_tx_info,
1164                            &expected_tx_info,
1165                        );
1166                    }
1167                    Some(Err(err)) => {
1168                        warn!(
1169                            "skipped tx row {} because the bigtable entry was corrupted: {:?}",
1170                            signature, err
1171                        );
1172                    }
1173                    None => {
1174                        warn!("skipped tx row {} because it was not found", signature);
1175                    }
1176                }
1177            }
1178            deletion_rows
1179        } else {
1180            vec![]
1181        };
1182
1183        let entries_exist = self
1184            .connection
1185            .client()
1186            .row_key_exists("entries", slot_to_entries_key(slot))
1187            .await
1188            .is_ok_and(|x| x);
1189
1190        if !dry_run {
1191            if !address_slot_rows.is_empty() {
1192                self.connection
1193                    .delete_rows_with_retry("tx-by-addr", &address_slot_rows)
1194                    .await?;
1195            }
1196
1197            if !tx_deletion_rows.is_empty() {
1198                self.connection
1199                    .delete_rows_with_retry("tx", &tx_deletion_rows)
1200                    .await?;
1201            }
1202
1203            if entries_exist {
1204                self.connection
1205                    .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)])
1206                    .await?;
1207            }
1208
1209            self.connection
1210                .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
1211                .await?;
1212        }
1213
1214        info!(
1215            "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} entry row",
1216            if dry_run { "[dry run] " } else { "" },
1217            slot,
1218            tx_deletion_rows.len(),
1219            address_slot_rows.len(),
1220            if entries_exist { "with" } else {"WITHOUT"}
1221        );
1222
1223        Ok(())
1224    }
1225}
1226
1227#[cfg(test)]
1228mod test {
1229    use super::*;
1230
1231    #[test]
1232    fn test_slot_to_key() {
1233        assert_eq!(slot_to_key(0), "0000000000000000");
1234        assert_eq!(slot_to_key(!0), "ffffffffffffffff");
1235    }
1236}