solana_trader_client_rust/provider/grpc/
stream.rs

1use anyhow::Result;
2use solana_trader_proto::api;
3use tonic::Request;
4use tonic::Streaming;
5
6use super::GrpcClient;
7
8impl GrpcClient {
9    pub async fn get_prices_stream(
10        &mut self,
11        projects: Vec<api::Project>,
12        tokens: Vec<String>,
13    ) -> Result<Streaming<api::GetPricesStreamResponse>> {
14        let request = Request::new(api::GetPricesStreamRequest {
15            projects: projects.iter().map(|&p| p as i32).collect(),
16            tokens,
17        });
18
19        let response = self
20            .client
21            .get_prices_stream(request)
22            .await
23            .map_err(|e| anyhow::anyhow!("GetPricesStream error: {}", e))?;
24
25        Ok(response.into_inner())
26    }
27
28    pub async fn get_block_stream(&mut self) -> Result<Streaming<api::GetBlockStreamResponse>> {
29        let request = Request::new(api::GetBlockStreamRequest {});
30
31        let response = self
32            .client
33            .get_block_stream(request)
34            .await
35            .map_err(|e| anyhow::anyhow!("GetBlockStream error: {}", e))?;
36
37        Ok(response.into_inner())
38    }
39
40    pub async fn get_orderbook_stream(
41        &mut self,
42        markets: Vec<String>,
43        limit: u32,
44        project: api::Project,
45    ) -> Result<Streaming<api::GetOrderbooksStreamResponse>> {
46        let request = Request::new(api::GetOrderbooksRequest {
47            markets,
48            limit,
49            project: project as i32,
50        });
51
52        let response = self
53            .client
54            .get_orderbooks_stream(request)
55            .await
56            .map_err(|e| anyhow::anyhow!("GetOrderbooksStream error: {}", e))?;
57
58        Ok(response.into_inner())
59    }
60
61    pub async fn get_market_depths_stream(
62        &mut self,
63        markets: Vec<String>,
64        limit: u32,
65        project: api::Project,
66    ) -> Result<Streaming<api::GetMarketDepthsStreamResponse>> {
67        let request = Request::new(api::GetMarketDepthsRequest {
68            markets,
69            limit,
70            project: project as i32,
71        });
72
73        let response = self
74            .client
75            .get_market_depths_stream(request)
76            .await
77            .map_err(|e| anyhow::anyhow!("GetMarketDepthsStream error: {}", e))?;
78
79        Ok(response.into_inner())
80    }
81
82    pub async fn get_ticker_stream(
83        &mut self,
84        markets: Vec<String>,
85        project: api::Project,
86    ) -> Result<Streaming<api::GetTickersStreamResponse>> {
87        let request = Request::new(api::GetTickersStreamRequest {
88            markets,
89            project: project as i32,
90        });
91
92        let response = self
93            .client
94            .get_tickers_stream(request)
95            .await
96            .map_err(|e| anyhow::anyhow!("GetTickersStream error: {}", e))?;
97
98        Ok(response.into_inner())
99    }
100
101    pub async fn get_trades_stream(
102        &mut self,
103        market: String,
104        limit: u32,
105        project: api::Project,
106    ) -> Result<Streaming<api::GetTradesStreamResponse>> {
107        let request = Request::new(api::GetTradesRequest {
108            market,
109            limit,
110            project: project as i32,
111        });
112
113        let response = self
114            .client
115            .get_trades_stream(request)
116            .await
117            .map_err(|e| anyhow::anyhow!("GetTradesStream error: {}", e))?;
118
119        Ok(response.into_inner())
120    }
121
122    pub async fn get_swaps_stream(
123        &mut self,
124        projects: Vec<api::Project>,
125        pools: Vec<String>,
126        include_failed: bool,
127    ) -> Result<Streaming<api::GetSwapsStreamResponse>> {
128        let request = Request::new(api::GetSwapsStreamRequest {
129            projects: projects.iter().map(|&p| p as i32).collect(),
130            pools,
131            include_failed,
132        });
133
134        let response = self
135            .client
136            .get_swaps_stream(request)
137            .await
138            .map_err(|e| anyhow::anyhow!("GetSwapsStream error: {}", e))?;
139
140        Ok(response.into_inner())
141    }
142
143    pub async fn get_new_raydium_pools_stream(
144        &mut self,
145        include_cpmm: bool,
146    ) -> Result<Streaming<api::GetNewRaydiumPoolsResponse>> {
147        let request = Request::new(api::GetNewRaydiumPoolsRequest {
148            include_cpmm: Some(include_cpmm),
149        });
150
151        let response = self
152            .client
153            .get_new_raydium_pools_stream(request)
154            .await
155            .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsStream error: {}", e))?;
156
157        Ok(response.into_inner())
158    }
159
160    pub async fn get_new_raydium_pools_by_transaction_stream(
161        &mut self,
162    ) -> Result<Streaming<api::GetNewRaydiumPoolsByTransactionResponse>> {
163        let request = Request::new(api::GetNewRaydiumPoolsByTransactionRequest {});
164
165        let response = self
166            .client
167            .get_new_raydium_pools_by_transaction_stream(request)
168            .await
169            .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsByTransactionStream error: {}", e))?;
170
171        Ok(response.into_inner())
172    }
173
174    pub async fn get_recent_block_hash_stream(
175        &mut self,
176    ) -> Result<Streaming<api::GetRecentBlockHashResponse>> {
177        let request = Request::new(api::GetRecentBlockHashRequest {});
178
179        let response = self
180            .client
181            .get_recent_block_hash_stream(request)
182            .await
183            .map_err(|e| anyhow::anyhow!("GetRecentBlockHashStream error: {}", e))?;
184
185        Ok(response.into_inner())
186    }
187
188    pub async fn get_pool_reserves_stream(
189        &mut self,
190        projects: Vec<api::Project>,
191        pools: Vec<String>,
192    ) -> Result<Streaming<api::GetPoolReservesStreamResponse>> {
193        let request = Request::new(api::GetPoolReservesStreamRequest {
194            projects: projects.iter().map(|&p| p as i32).collect(),
195            pools,
196        });
197
198        let response = self
199            .client
200            .get_pool_reserves_stream(request)
201            .await
202            .map_err(|e| anyhow::anyhow!("GetPoolReservesStream error: {}", e))?;
203
204        Ok(response.into_inner())
205    }
206
207    pub async fn get_priority_fee_stream(
208        &mut self,
209        project: api::Project,
210        percentile: Option<f64>,
211    ) -> Result<Streaming<api::GetPriorityFeeResponse>> {
212        let request = Request::new(api::GetPriorityFeeRequest {
213            project: project as i32,
214            percentile,
215        });
216
217        let response = self
218            .client
219            .get_priority_fee_stream(request)
220            .await
221            .map_err(|e| anyhow::anyhow!("GetPriorityFeeStream error: {}", e))?;
222
223        Ok(response.into_inner())
224    }
225
226    pub async fn get_bundle_tip_stream(&mut self) -> Result<Streaming<api::GetBundleTipResponse>> {
227        let request = Request::new(api::GetBundleTipRequest {});
228
229        let response = self
230            .client
231            .get_bundle_tip_stream(request)
232            .await
233            .map_err(|e| anyhow::anyhow!("GetBundleTipStream error: {}", e))?;
234
235        Ok(response.into_inner())
236    }
237
238    pub async fn get_pump_fun_new_tokens_stream(
239        &mut self,
240    ) -> Result<Streaming<api::GetPumpFunNewTokensStreamResponse>> {
241        let request = Request::new(api::GetPumpFunNewTokensStreamRequest {});
242
243        let response = self
244            .client
245            .get_pump_fun_new_tokens_stream(request)
246            .await
247            .map_err(|e| anyhow::anyhow!("GetPumpFunNewTokensStream error: {}", e))?;
248
249        Ok(response.into_inner())
250    }
251
252    pub async fn get_pump_fun_swaps_stream(
253        &mut self,
254        tokens: Vec<String>,
255    ) -> Result<Streaming<api::GetPumpFunSwapsStreamResponse>> {
256        let request = Request::new(api::GetPumpFunSwapsStreamRequest { tokens });
257
258        let response = self
259            .client
260            .get_pump_fun_swaps_stream(request)
261            .await
262            .map_err(|e| anyhow::anyhow!("GetPumpFunSwapsStream error: {}", e))?;
263
264        Ok(response.into_inner())
265    }
266
267    pub async fn get_priority_fee_by_program_stream(
268        &mut self,
269        projects: Vec<String>,
270    ) -> Result<Streaming<api::GetPriorityFeeByProgramResponse>> {
271        let request = Request::new(api::GetPriorityFeeByProgramRequest { programs: projects });
272
273        let response = self
274            .client
275            .get_priority_fee_by_program_stream(request)
276            .await
277            .map_err(|e| anyhow::anyhow!("GetPriorityFeeByProjectStream error: {}", e))?;
278
279        Ok(response.into_inner())
280    }
281}