1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arch_program::{hash::Hash, pubkey::Pubkey};
5use arch_sdk::{AccountInfo, ProcessedTransaction};
6use async_trait::async_trait;
7use tokio_util::sync::CancellationToken;
8
9use crate::{error::IndexerResult, metrics::MetricsCollection};
10
11#[async_trait]
20pub trait Datasource: Send + Sync {
21 async fn consume(
27 &self,
28 id: DatasourceId,
29 sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
30 cancellation_token: CancellationToken,
31 metrics: Arc<MetricsCollection>,
32 ) -> IndexerResult<()>;
33
34 fn update_types(&self) -> Vec<UpdateType>;
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct DatasourceId(String);
43
44impl DatasourceId {
45 pub fn new_unique() -> Self {
46 Self(uuid::Uuid::new_v4().to_string())
47 }
48
49 pub fn new_named(name: &str) -> Self {
50 Self(name.to_string())
51 }
52}
53
54#[derive(Debug, Clone)]
55pub enum Update {
56 Account(AccountUpdate),
57 Transaction(Box<TransactionUpdate>),
58 AccountDeletion(AccountDeletion),
59 BlockDetails(BlockDetails),
60 BitcoinBlock(BitcoinBlock),
61 RolledbackTransactions(RolledbackTransactionsEvent),
62 ReappliedTransactions(ReappliedTransactionsEvent),
63}
64
65#[derive(Debug)]
66pub enum Updates {
67 Accounts(Vec<AccountUpdate>),
68 Transactions(Vec<TransactionUpdate>),
69 AccountDeletions(Vec<AccountDeletion>),
70 BlockDetails(Vec<BlockDetails>),
71 BitcoinBlocks(Vec<BitcoinBlock>),
72 RolledbackTransactions(Vec<RolledbackTransactionsEvent>),
73 ReappliedTransactions(Vec<ReappliedTransactionsEvent>),
74}
75
76impl Updates {
77 pub fn len(&self) -> usize {
78 match self {
79 Updates::Accounts(updates) => updates.len(),
80 Updates::Transactions(updates) => updates.len(),
81 Updates::AccountDeletions(updates) => updates.len(),
82 Updates::BlockDetails(updates) => updates.len(),
83 Updates::BitcoinBlocks(updates) => updates.len(),
84 Updates::RolledbackTransactions(updates) => updates.len(),
85 Updates::ReappliedTransactions(updates) => updates.len(),
86 }
87 }
88
89 pub fn is_empty(&self) -> bool {
90 self.len() == 0
91 }
92
93 pub fn push(&mut self, update: Update) {
94 match (self, update) {
95 (Updates::Accounts(updates), Update::Account(account_update)) => {
96 updates.push(account_update)
97 }
98 (Updates::Transactions(updates), Update::Transaction(tx_update)) => {
99 updates.push(*tx_update)
100 }
101 (Updates::AccountDeletions(updates), Update::AccountDeletion(account_deletion)) => {
102 updates.push(account_deletion)
103 }
104 (Updates::BlockDetails(updates), Update::BlockDetails(block_details)) => {
105 updates.push(block_details)
106 }
107 (Updates::BitcoinBlocks(updates), Update::BitcoinBlock(block)) => updates.push(block),
108 (Updates::RolledbackTransactions(updates), Update::RolledbackTransactions(event)) => {
109 updates.push(event)
110 }
111 (Updates::ReappliedTransactions(updates), Update::ReappliedTransactions(event)) => {
112 updates.push(event)
113 }
114 _ => {}
116 }
117 }
118}
119
120impl Clone for Updates {
121 fn clone(&self) -> Self {
122 match self {
123 Updates::Accounts(updates) => Updates::Accounts(updates.clone()),
124 Updates::Transactions(updates) => Updates::Transactions(updates.clone()),
125 Updates::AccountDeletions(updates) => Updates::AccountDeletions(updates.clone()),
126 Updates::BlockDetails(updates) => Updates::BlockDetails(updates.clone()),
127 Updates::BitcoinBlocks(updates) => Updates::BitcoinBlocks(updates.clone()),
128 Updates::RolledbackTransactions(updates) => {
129 Updates::RolledbackTransactions(updates.clone())
130 }
131 Updates::ReappliedTransactions(updates) => {
132 Updates::ReappliedTransactions(updates.clone())
133 }
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
139pub enum UpdateType {
140 AccountUpdate,
141 Transaction,
142 AccountDeletion,
143 BlockDetails,
144 BitcoinBlock,
145 RolledbackTransactions,
146 ReappliedTransactions,
147}
148
149#[derive(Debug, Clone)]
150pub struct AccountUpdate {
151 pub pubkey: Pubkey,
152 pub account: AccountInfo,
153 pub height: u64,
154}
155
156#[derive(Debug, Clone)]
157pub struct BlockDetails {
158 pub height: u64,
159 pub block_hash: Option<Hash>,
160 pub previous_block_hash: Option<Hash>,
161 pub block_time: Option<i64>,
162 pub block_height: Option<u64>,
163}
164
165#[derive(Debug, Clone)]
166pub struct AccountDeletion {
167 pub pubkey: Pubkey,
168 pub height: u64,
169}
170
171#[derive(Debug, Clone)]
172pub struct TransactionUpdate {
173 pub transaction: ProcessedTransaction,
174 pub height: u64,
175}
176
177#[derive(Debug, Clone)]
178pub struct BitcoinBlock {
179 pub block_height: u64,
180 pub block_hash: String,
181}
182
183#[derive(Debug, Clone)]
185pub struct RolledbackTransactionsEvent {
186 pub height: u64,
188 pub transaction_hashes: Vec<String>,
190}
191
192#[derive(Debug, Clone)]
194pub struct ReappliedTransactionsEvent {
195 pub height: u64,
197 pub transaction_hashes: Vec<String>,
199}
200
201#[async_trait]
206pub trait BitcoinDatasource: Send + Sync {
207 async fn get_transactions(
210 &self,
211 txids: &[Hash],
212 ) -> IndexerResult<HashMap<Hash, crate::bitcoin::Transaction>>;
213}
214
215#[async_trait]
217pub trait AccountDatasource: Send + Sync {
218 async fn get_multiple_accounts(
220 &self,
221 pubkeys: &[Pubkey],
222 ) -> IndexerResult<HashMap<Pubkey, Option<AccountInfo>>>;
223}