thorstreamer_grpc_client/
lib.rs1use tonic::transport::{Channel, Endpoint};
3use tonic::{metadata::MetadataValue, Request, Status, Streaming};
4use std::time::Duration;
5
6pub mod proto {
8 pub mod publisher {
9 tonic::include_proto!("publisher");
10 }
11 pub mod thor_streamer {
12 pub mod types {
13 tonic::include_proto!("thor_streamer.types");
14 }
15 }
16 pub mod google {
17 pub mod protobuf {
18 tonic::include_proto!("google.protobuf");
19 }
20 }
21}
22
23use proto::google::protobuf::Empty as GoogleEmpty;
25use proto::thor_streamer::types::Empty as ThorEmpty;
26
27use proto::publisher::{
28 event_publisher_client::EventPublisherClient,
29 SubscribeAccountsRequest, SubscribeWalletRequest, StreamResponse,
30};
31use proto::thor_streamer::types::{
32 thor_streamer_client::ThorStreamerClient,
33 MessageWrapper,
34};
35
36#[derive(Clone, Debug)]
38pub struct ClientConfig {
39 pub server_addr: String,
40 pub token: String,
41 pub timeout: Duration,
42}
43
44impl Default for ClientConfig {
45 fn default() -> Self {
46 Self {
47 server_addr: "http://localhost:50051".to_string(),
48 token: String::new(),
49 timeout: Duration::from_secs(30),
50 }
51 }
52}
53
54#[derive(Clone)]
56pub struct ThorClient {
57 event_client: EventPublisherClient<Channel>,
58 thor_client: ThorStreamerClient<Channel>,
59 token: String,
60}
61
62impl ThorClient {
63 pub async fn new(config: ClientConfig) -> Result<Self, Box<dyn std::error::Error>> {
65 let endpoint = Endpoint::from_shared(config.server_addr)?
66 .timeout(config.timeout)
67 .tcp_keepalive(Some(Duration::from_secs(30)))
68 .http2_keep_alive_interval(Duration::from_secs(30))
69 .keep_alive_timeout(Duration::from_secs(10));
70
71 let channel = endpoint.connect().await?;
72
73 Ok(Self {
74 event_client: EventPublisherClient::new(channel.clone()),
75 thor_client: ThorStreamerClient::new(channel),
76 token: config.token,
77 })
78 }
79
80 fn with_auth<T>(&self, req: T) -> Request<T> {
82 let mut request = Request::new(req);
83 if let Ok(token) = MetadataValue::try_from(&self.token) {
84 request.metadata_mut().insert("authorization", token);
85 }
86 request
87 }
88
89 pub async fn subscribe_to_transactions(&mut self) -> Result<Streaming<StreamResponse>, Status> {
91 let request = self.with_auth(GoogleEmpty {});
92 let response = self.event_client.subscribe_to_transactions(request).await?;
93 Ok(response.into_inner())
94 }
95
96 pub async fn subscribe_to_slot_status(&mut self) -> Result<Streaming<StreamResponse>, Status> {
98 let request = self.with_auth(GoogleEmpty {});
99 let response = self.event_client.subscribe_to_slot_status(request).await?;
100 Ok(response.into_inner())
101 }
102
103 pub async fn subscribe_to_wallet_transactions(
105 &mut self,
106 wallet_addresses: Vec<String>,
107 ) -> Result<Streaming<StreamResponse>, Status> {
108 let request = self.with_auth(SubscribeWalletRequest {
109 wallet_address: wallet_addresses,
110 });
111 let response = self
112 .event_client
113 .subscribe_to_wallet_transactions(request)
114 .await?;
115 Ok(response.into_inner())
116 }
117
118 pub async fn subscribe_to_account_updates(
120 &mut self,
121 account_addresses: Vec<String>,
122 owner_addresses: Vec<String>,
123 ) -> Result<Streaming<StreamResponse>, Status> {
124 let request = self.with_auth(SubscribeAccountsRequest {
125 account_address: account_addresses,
126 owner_address: owner_addresses,
127 });
128 let response = self
129 .event_client
130 .subscribe_to_account_updates(request)
131 .await?;
132 Ok(response.into_inner())
133 }
134
135 pub async fn subscribe_to_thor_updates(&mut self) -> Result<Streaming<MessageWrapper>, Status> {
137 let request = Request::new(ThorEmpty {});
138 let response = self.thor_client.stream_updates(request).await?;
139 Ok(response.into_inner())
140 }
141}
142
143pub fn parse_message(data: &[u8]) -> Result<MessageWrapper, prost::DecodeError> {
145 use prost::Message;
146 MessageWrapper::decode(data)
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn test_config_default() {
155 let config = ClientConfig::default();
156 assert_eq!(config.server_addr, "http://localhost:50051");
157 assert_eq!(config.timeout, Duration::from_secs(30));
158 }
159}