1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use {
log::*,
gemachain_sdk::clock::Slot,
std::{net::SocketAddr, sync::Arc},
tokio::runtime::Runtime,
tonic::{self, transport::Endpoint, Request},
};
tonic::include_proto!("accountsdb_repl");
pub struct AccountsDbReplClient {
client: accounts_db_repl_client::AccountsDbReplClient<tonic::transport::Channel>,
}
#[derive(Debug)]
pub enum ReplicaRpcError {
InvalidUrl(String),
ConnectionError(String),
GetSlotsError(String),
GetAccountsError(String),
}
impl From<tonic::transport::Error> for ReplicaRpcError {
fn from(err: tonic::transport::Error) -> Self {
ReplicaRpcError::ConnectionError(err.to_string())
}
}
impl AccountsDbReplClient {
pub async fn connect(rpc_peer: &SocketAddr) -> Result<Self, ReplicaRpcError> {
let url = format!("http://{}", rpc_peer);
let endpoint = match Endpoint::from_shared(url.to_string()) {
Ok(endpoint) => endpoint,
Err(e) => {
return Err(ReplicaRpcError::InvalidUrl(e.to_string()));
}
};
let client = accounts_db_repl_client::AccountsDbReplClient::connect(endpoint).await?;
info!(
"Successfully connected to the AccountsDb Replication server: {:?}",
url
);
Ok(AccountsDbReplClient { client })
}
pub async fn get_confirmed_slots(
&mut self,
last_slot: Slot,
) -> Result<Vec<Slot>, ReplicaRpcError> {
let request = ReplicaSlotConfirmationRequest {
last_replicated_slot: last_slot,
};
let response = self.client.get_confirmed_slots(Request::new(request)).await;
match response {
Ok(response) => Ok(response.into_inner().updated_slots),
Err(status) => Err(ReplicaRpcError::GetSlotsError(status.to_string())),
}
}
pub async fn get_slot_accounts(
&mut self,
slot: Slot,
) -> Result<Vec<ReplicaAccountInfo>, ReplicaRpcError> {
let request = ReplicaAccountsRequest { slot };
let response = self.client.get_slot_accounts(Request::new(request)).await;
match response {
Ok(response) => Ok(response.into_inner().accounts),
Err(status) => Err(ReplicaRpcError::GetAccountsError(status.to_string())),
}
}
}
#[derive(Clone)]
pub struct AccountsDbReplClientServiceConfig {
pub worker_threads: usize,
pub replica_server_addr: SocketAddr,
}
pub struct AccountsDbReplClientService {
runtime: Arc<Runtime>,
accountsdb_repl_client: AccountsDbReplClient,
}
impl AccountsDbReplClientService {
pub fn new(config: AccountsDbReplClientServiceConfig) -> Result<Self, ReplicaRpcError> {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.worker_threads)
.thread_name("sol-accountsdb-repl-wrk")
.enable_all()
.build()
.expect("Runtime"),
);
let accountsdb_repl_client =
runtime.block_on(AccountsDbReplClient::connect(&config.replica_server_addr))?;
Ok(Self {
runtime,
accountsdb_repl_client,
})
}
pub fn get_confirmed_slots(&mut self, last_slot: Slot) -> Result<Vec<Slot>, ReplicaRpcError> {
self.runtime
.block_on(self.accountsdb_repl_client.get_confirmed_slots(last_slot))
}
pub fn get_slot_accounts(
&mut self,
slot: Slot,
) -> Result<Vec<ReplicaAccountInfo>, ReplicaRpcError> {
self.runtime
.block_on(self.accountsdb_repl_client.get_slot_accounts(slot))
}
}