atlas_arch/
datasource.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arch_program::{hash::Hash, pubkey::Pubkey};
5use arch_sdk::{AccountInfo, ProcessedTransaction};
6use async_trait::async_trait;
7use tokio_util::sync::CancellationToken;
8
9use crate::{error::IndexerResult, metrics::MetricsCollection};
10
11/// A source of update streams that the pipeline can consume from.
12///
13/// Implement this trait if you provide data into the indexing pipeline
14/// (e.g., blocks, transactions, accounts, deletions, or Bitcoin blocks).
15/// Implementations should:
16/// - Be cancel-safe: return promptly when `cancellation_token` is cancelled
17/// - Backpressure-aware: respect the `sender` capacity, awaiting sends
18/// - Metrics-friendly: update gauges/counters to aid observability
19#[async_trait]
20pub trait Datasource: Send + Sync {
21    /// Start producing updates and forward them to the provided `sender`.
22    ///
23    /// Implementations may batch updates and can send any of the `Updates`
24    /// variants. The method should exit when `cancellation_token` is
25    /// triggered or when a fatal error occurs.
26    async fn consume(
27        &self,
28        id: DatasourceId,
29        sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
30        cancellation_token: CancellationToken,
31        metrics: Arc<MetricsCollection>,
32    ) -> IndexerResult<()>;
33
34    /// Advertise which update kinds this datasource will emit.
35    ///
36    /// This is used by the pipeline to route updates efficiently and to
37    /// activate only the relevant pipes/filters.
38    fn update_types(&self) -> Vec<UpdateType>;
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct DatasourceId(String);
43
44impl DatasourceId {
45    pub fn new_unique() -> Self {
46        Self(uuid::Uuid::new_v4().to_string())
47    }
48
49    pub fn new_named(name: &str) -> Self {
50        Self(name.to_string())
51    }
52}
53
54#[derive(Debug, Clone)]
55pub enum Update {
56    Account(AccountUpdate),
57    Transaction(Box<TransactionUpdate>),
58    AccountDeletion(AccountDeletion),
59    BlockDetails(BlockDetails),
60    BitcoinBlock(BitcoinBlock),
61    RolledbackTransactions(RolledbackTransactionsEvent),
62    ReappliedTransactions(ReappliedTransactionsEvent),
63}
64
65#[derive(Debug)]
66pub enum Updates {
67    Accounts(Vec<AccountUpdate>),
68    Transactions(Vec<TransactionUpdate>),
69    AccountDeletions(Vec<AccountDeletion>),
70    BlockDetails(Vec<BlockDetails>),
71    BitcoinBlocks(Vec<BitcoinBlock>),
72    RolledbackTransactions(Vec<RolledbackTransactionsEvent>),
73    ReappliedTransactions(Vec<ReappliedTransactionsEvent>),
74}
75
76impl Updates {
77    pub fn len(&self) -> usize {
78        match self {
79            Updates::Accounts(updates) => updates.len(),
80            Updates::Transactions(updates) => updates.len(),
81            Updates::AccountDeletions(updates) => updates.len(),
82            Updates::BlockDetails(updates) => updates.len(),
83            Updates::BitcoinBlocks(updates) => updates.len(),
84            Updates::RolledbackTransactions(updates) => updates.len(),
85            Updates::ReappliedTransactions(updates) => updates.len(),
86        }
87    }
88
89    pub fn is_empty(&self) -> bool {
90        self.len() == 0
91    }
92
93    pub fn push(&mut self, update: Update) {
94        match (self, update) {
95            (Updates::Accounts(updates), Update::Account(account_update)) => {
96                updates.push(account_update)
97            }
98            (Updates::Transactions(updates), Update::Transaction(tx_update)) => {
99                updates.push(*tx_update)
100            }
101            (Updates::AccountDeletions(updates), Update::AccountDeletion(account_deletion)) => {
102                updates.push(account_deletion)
103            }
104            (Updates::BlockDetails(updates), Update::BlockDetails(block_details)) => {
105                updates.push(block_details)
106            }
107            (Updates::BitcoinBlocks(updates), Update::BitcoinBlock(block)) => updates.push(block),
108            (Updates::RolledbackTransactions(updates), Update::RolledbackTransactions(event)) => {
109                updates.push(event)
110            }
111            (Updates::ReappliedTransactions(updates), Update::ReappliedTransactions(event)) => {
112                updates.push(event)
113            }
114            // Mismatched variant: ignore
115            _ => {}
116        }
117    }
118}
119
120impl Clone for Updates {
121    fn clone(&self) -> Self {
122        match self {
123            Updates::Accounts(updates) => Updates::Accounts(updates.clone()),
124            Updates::Transactions(updates) => Updates::Transactions(updates.clone()),
125            Updates::AccountDeletions(updates) => Updates::AccountDeletions(updates.clone()),
126            Updates::BlockDetails(updates) => Updates::BlockDetails(updates.clone()),
127            Updates::BitcoinBlocks(updates) => Updates::BitcoinBlocks(updates.clone()),
128            Updates::RolledbackTransactions(updates) => {
129                Updates::RolledbackTransactions(updates.clone())
130            }
131            Updates::ReappliedTransactions(updates) => {
132                Updates::ReappliedTransactions(updates.clone())
133            }
134        }
135    }
136}
137
138#[derive(Debug, Clone)]
139pub enum UpdateType {
140    AccountUpdate,
141    Transaction,
142    AccountDeletion,
143    BlockDetails,
144    BitcoinBlock,
145    RolledbackTransactions,
146    ReappliedTransactions,
147}
148
149#[derive(Debug, Clone)]
150pub struct AccountUpdate {
151    pub pubkey: Pubkey,
152    pub account: AccountInfo,
153    pub height: u64,
154}
155
156#[derive(Debug, Clone)]
157pub struct BlockDetails {
158    pub height: u64,
159    pub block_hash: Option<Hash>,
160    pub previous_block_hash: Option<Hash>,
161    pub block_time: Option<i64>,
162    pub block_height: Option<u64>,
163}
164
165#[derive(Debug, Clone)]
166pub struct AccountDeletion {
167    pub pubkey: Pubkey,
168    pub height: u64,
169}
170
171#[derive(Debug, Clone)]
172pub struct TransactionUpdate {
173    pub transaction: ProcessedTransaction,
174    pub height: u64,
175}
176
177#[derive(Debug, Clone)]
178pub struct BitcoinBlock {
179    pub block_height: u64,
180    pub block_hash: String,
181}
182
183/// Transactions that were rolled back
184#[derive(Debug, Clone)]
185pub struct RolledbackTransactionsEvent {
186    /// The height at which the transactions were rolled back
187    pub height: u64,
188    /// The transaction hashes that were rolled back
189    pub transaction_hashes: Vec<String>,
190}
191
192/// Transactions that were reapplied
193#[derive(Debug, Clone)]
194pub struct ReappliedTransactionsEvent {
195    /// The height at which the transactions were reapplied
196    pub height: u64,
197    /// The transaction hashes that were reapplied
198    pub transaction_hashes: Vec<String>,
199}
200
201/// Optional provider interface for fetching Bitcoin transactions by txid.
202///
203/// If supplied to the pipeline, this will be used to enrich transaction updates
204/// with the corresponding serialized Bitcoin transaction bytes when available.
205#[async_trait]
206pub trait BitcoinDatasource: Send + Sync {
207    /// Batch fetch Bitcoin transactions by their txids (big-endian hash).
208    /// Implementations should attempt to fetch all in one request if possible.
209    async fn get_transactions(
210        &self,
211        txids: &[Hash],
212    ) -> IndexerResult<HashMap<Hash, crate::bitcoin::Transaction>>;
213}
214
215/// Optional provider interface for fetching accounts on demand by pubkey.
216#[async_trait]
217pub trait AccountDatasource: Send + Sync {
218    /// Batch fetch accounts by pubkeys at the latest canonical height.
219    async fn get_multiple_accounts(
220        &self,
221        pubkeys: &[Pubkey],
222    ) -> IndexerResult<HashMap<Pubkey, Option<AccountInfo>>>;
223}