thorstreamer_grpc_client/
lib.rs

1// lib.rs
2use tonic::transport::{Channel, Endpoint};
3use tonic::{metadata::MetadataValue, Request, Status, Streaming};
4use std::time::Duration;
5
6// Generated protobuf modules
7pub mod proto {
8    pub mod publisher {
9        tonic::include_proto!("publisher");
10    }
11    pub mod thor_streamer {
12        pub mod types {
13            tonic::include_proto!("thor_streamer.types");
14        }
15    }
16    pub mod google {
17        pub mod protobuf {
18            tonic::include_proto!("google.protobuf");
19        }
20    }
21}
22
23// Import both Empty types with aliases
24use proto::google::protobuf::Empty as GoogleEmpty;
25use proto::thor_streamer::types::Empty as ThorEmpty;
26
27use proto::publisher::{
28    event_publisher_client::EventPublisherClient,
29    SubscribeAccountsRequest, SubscribeWalletRequest, StreamResponse,
30};
31use proto::thor_streamer::types::{
32    thor_streamer_client::ThorStreamerClient,
33    MessageWrapper,
34};
35
36/// Configuration for the Thor gRPC client
37#[derive(Clone, Debug)]
38pub struct ClientConfig {
39    pub server_addr: String,
40    pub token: String,
41    pub timeout: Duration,
42}
43
44impl Default for ClientConfig {
45    fn default() -> Self {
46        Self {
47            server_addr: "http://localhost:50051".to_string(),
48            token: String::new(),
49            timeout: Duration::from_secs(30),
50        }
51    }
52}
53
54/// Main client for Thor gRPC services
55#[derive(Clone)]
56pub struct ThorClient {
57    event_client: EventPublisherClient<Channel>,
58    thor_client: ThorStreamerClient<Channel>,
59    token: String,
60}
61
62impl ThorClient {
63    /// Create a new Thor client
64    pub async fn new(config: ClientConfig) -> Result<Self, Box<dyn std::error::Error>> {
65        let endpoint = Endpoint::from_shared(config.server_addr)?
66            .timeout(config.timeout)
67            .tcp_keepalive(Some(Duration::from_secs(30)))
68            .http2_keep_alive_interval(Duration::from_secs(30))
69            .keep_alive_timeout(Duration::from_secs(10));
70
71        let channel = endpoint.connect().await?;
72
73        Ok(Self {
74            event_client: EventPublisherClient::new(channel.clone()),
75            thor_client: ThorStreamerClient::new(channel),
76            token: config.token,
77        })
78    }
79
80    /// Create a request with authentication
81    fn with_auth<T>(&self, req: T) -> Request<T> {
82        let mut request = Request::new(req);
83        if let Ok(token) = MetadataValue::try_from(&self.token) {
84            request.metadata_mut().insert("authorization", token);
85        }
86        request
87    }
88
89    /// Subscribe to transaction events (uses GoogleEmpty)
90    pub async fn subscribe_to_transactions(&mut self) -> Result<Streaming<StreamResponse>, Status> {
91        let request = self.with_auth(GoogleEmpty {});
92        let response = self.event_client.subscribe_to_transactions(request).await?;
93        Ok(response.into_inner())
94    }
95
96    /// Subscribe to slot status events (uses GoogleEmpty)
97    pub async fn subscribe_to_slot_status(&mut self) -> Result<Streaming<StreamResponse>, Status> {
98        let request = self.with_auth(GoogleEmpty {});
99        let response = self.event_client.subscribe_to_slot_status(request).await?;
100        Ok(response.into_inner())
101    }
102
103    /// Subscribe to wallet transaction events
104    pub async fn subscribe_to_wallet_transactions(
105        &mut self,
106        wallet_addresses: Vec<String>,
107    ) -> Result<Streaming<StreamResponse>, Status> {
108        let request = self.with_auth(SubscribeWalletRequest {
109            wallet_address: wallet_addresses,
110        });
111        let response = self
112            .event_client
113            .subscribe_to_wallet_transactions(request)
114            .await?;
115        Ok(response.into_inner())
116    }
117
118    /// Subscribe to account update events
119    pub async fn subscribe_to_account_updates(
120        &mut self,
121        account_addresses: Vec<String>,
122        owner_addresses: Vec<String>,
123    ) -> Result<Streaming<StreamResponse>, Status> {
124        let request = self.with_auth(SubscribeAccountsRequest {
125            account_address: account_addresses,
126            owner_address: owner_addresses,
127        });
128        let response = self
129            .event_client
130            .subscribe_to_account_updates(request)
131            .await?;
132        Ok(response.into_inner())
133    }
134
135    /// Subscribe to Thor update stream (uses ThorEmpty)
136    pub async fn subscribe_to_thor_updates(&mut self) -> Result<Streaming<MessageWrapper>, Status> {
137        let request = Request::new(ThorEmpty {});
138        let response = self.thor_client.stream_updates(request).await?;
139        Ok(response.into_inner())
140    }
141}
142
143/// Parse a StreamResponse into MessageWrapper
144pub fn parse_message(data: &[u8]) -> Result<MessageWrapper, prost::DecodeError> {
145    use prost::Message;
146    MessageWrapper::decode(data)
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn test_config_default() {
155        let config = ClientConfig::default();
156        assert_eq!(config.server_addr, "http://localhost:50051");
157        assert_eq!(config.timeout, Duration::from_secs(30));
158    }
159}