solana_trader_client_rust/provider/grpc/
stream.rs

1use anyhow::Result;
2use solana_trader_proto::api;
3use tonic::Request;
4use tonic::Streaming;
5
6use super::GrpcClient;
7
8impl GrpcClient {
9    pub async fn get_pump_fun_new_amm_pool_stream(
10        &mut self,
11    ) -> Result<Streaming<api::GetPumpFunNewAmmPoolStreamResponse>> {
12        let request = Request::new(api::GetPumpFunNewAmmPoolStreamRequest {});
13
14        let response = self
15            .client
16            .get_pump_fun_new_amm_pool_stream(request)
17            .await
18            .map_err(|e| anyhow::anyhow!("GetPumpFunNewAmmPoolStream error: {}", e))?;
19
20        Ok(response.into_inner())
21    }
22    
23    pub async fn get_prices_stream(
24        &mut self,
25        projects: Vec<api::Project>,
26        tokens: Vec<String>,
27    ) -> Result<Streaming<api::GetPricesStreamResponse>> {
28        let request = Request::new(api::GetPricesStreamRequest {
29            projects: projects.iter().map(|&p| p as i32).collect(),
30            tokens,
31        });
32
33        let response = self
34            .client
35            .get_prices_stream(request)
36            .await
37            .map_err(|e| anyhow::anyhow!("GetPricesStream error: {}", e))?;
38
39        Ok(response.into_inner())
40    }
41
42    pub async fn get_block_stream(&mut self) -> Result<Streaming<api::GetBlockStreamResponse>> {
43        let request = Request::new(api::GetBlockStreamRequest {});
44
45        let response = self
46            .client
47            .get_block_stream(request)
48            .await
49            .map_err(|e| anyhow::anyhow!("GetBlockStream error: {}", e))?;
50
51        Ok(response.into_inner())
52    }
53
54    pub async fn get_orderbook_stream(
55        &mut self,
56        markets: Vec<String>,
57        limit: u32,
58        project: api::Project,
59    ) -> Result<Streaming<api::GetOrderbooksStreamResponse>> {
60        let request = Request::new(api::GetOrderbooksRequest {
61            markets,
62            limit,
63            project: project as i32,
64        });
65
66        let response = self
67            .client
68            .get_orderbooks_stream(request)
69            .await
70            .map_err(|e| anyhow::anyhow!("GetOrderbooksStream error: {}", e))?;
71
72        Ok(response.into_inner())
73    }
74
75    pub async fn get_market_depths_stream(
76        &mut self,
77        markets: Vec<String>,
78        limit: u32,
79        project: api::Project,
80    ) -> Result<Streaming<api::GetMarketDepthsStreamResponse>> {
81        let request = Request::new(api::GetMarketDepthsRequest {
82            markets,
83            limit,
84            project: project as i32,
85        });
86
87        let response = self
88            .client
89            .get_market_depths_stream(request)
90            .await
91            .map_err(|e| anyhow::anyhow!("GetMarketDepthsStream error: {}", e))?;
92
93        Ok(response.into_inner())
94    }
95
96    pub async fn get_ticker_stream(
97        &mut self,
98        markets: Vec<String>,
99        project: api::Project,
100    ) -> Result<Streaming<api::GetTickersStreamResponse>> {
101        let request = Request::new(api::GetTickersStreamRequest {
102            markets,
103            project: project as i32,
104        });
105
106        let response = self
107            .client
108            .get_tickers_stream(request)
109            .await
110            .map_err(|e| anyhow::anyhow!("GetTickersStream error: {}", e))?;
111
112        Ok(response.into_inner())
113    }
114
115    pub async fn get_trades_stream(
116        &mut self,
117        market: String,
118        limit: u32,
119        project: api::Project,
120    ) -> Result<Streaming<api::GetTradesStreamResponse>> {
121        let request = Request::new(api::GetTradesRequest {
122            market,
123            limit,
124            project: project as i32,
125        });
126
127        let response = self
128            .client
129            .get_trades_stream(request)
130            .await
131            .map_err(|e| anyhow::anyhow!("GetTradesStream error: {}", e))?;
132
133        Ok(response.into_inner())
134    }
135
136    pub async fn get_swaps_stream(
137        &mut self,
138        projects: Vec<api::Project>,
139        pools: Vec<String>,
140        include_failed: bool,
141    ) -> Result<Streaming<api::GetSwapsStreamResponse>> {
142        let request = Request::new(api::GetSwapsStreamRequest {
143            projects: projects.iter().map(|&p| p as i32).collect(),
144            pools,
145            include_failed,
146        });
147
148        let response = self
149            .client
150            .get_swaps_stream(request)
151            .await
152            .map_err(|e| anyhow::anyhow!("GetSwapsStream error: {}", e))?;
153
154        Ok(response.into_inner())
155    }
156
157    pub async fn get_new_raydium_pools_stream(
158        &mut self,
159        include_cpmm: bool,
160    ) -> Result<Streaming<api::GetNewRaydiumPoolsResponse>> {
161        let request = Request::new(api::GetNewRaydiumPoolsRequest {
162            include_cpmm: Some(include_cpmm),
163        });
164
165        let response = self
166            .client
167            .get_new_raydium_pools_stream(request)
168            .await
169            .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsStream error: {}", e))?;
170
171        Ok(response.into_inner())
172    }
173
174    pub async fn get_quotes_stream(
175        &mut self,
176        projects: Vec<i32>,
177        token_pairs: Vec<api::TokenPair>,
178    ) -> Result<Streaming<api::GetQuotesStreamResponse>> {
179        let request = Request::new(api::GetQuotesStreamRequest {
180            projects,
181            token_pairs,
182        });
183        
184        let response = self
185            .client
186            .get_quotes_stream(request)
187            .await
188            .map_err(|e| anyhow::anyhow!("GetQuotesStream error: {}", e))?;
189        
190        Ok(response.into_inner())
191    }
192
193    pub async fn get_new_raydium_pools_by_transaction_stream(
194        &mut self,
195    ) -> Result<Streaming<api::GetNewRaydiumPoolsByTransactionResponse>> {
196        let request = Request::new(api::GetNewRaydiumPoolsByTransactionRequest {});
197
198        let response = self
199            .client
200            .get_new_raydium_pools_by_transaction_stream(request)
201            .await
202            .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsByTransactionStream error: {}", e))?;
203
204        Ok(response.into_inner())
205    }
206
207    pub async fn get_recent_block_hash_stream(
208        &mut self,
209    ) -> Result<Streaming<api::GetRecentBlockHashResponse>> {
210        let request = Request::new(api::GetRecentBlockHashRequest {});
211
212        let response = self
213            .client
214            .get_recent_block_hash_stream(request)
215            .await
216            .map_err(|e| anyhow::anyhow!("GetRecentBlockHashStream error: {}", e))?;
217
218        Ok(response.into_inner())
219    }
220
221    pub async fn get_pool_reserves_stream(
222        &mut self,
223        projects: Vec<api::Project>,
224        pools: Vec<String>,
225    ) -> Result<Streaming<api::GetPoolReservesStreamResponse>> {
226        let request = Request::new(api::GetPoolReservesStreamRequest {
227            projects: projects.iter().map(|&p| p as i32).collect(),
228            pools,
229        });
230
231        let response = self
232            .client
233            .get_pool_reserves_stream(request)
234            .await
235            .map_err(|e| anyhow::anyhow!("GetPoolReservesStream error: {}", e))?;
236
237        Ok(response.into_inner())
238    }
239
240    pub async fn get_priority_fee_stream(
241        &mut self,
242        project: api::Project,
243        percentile: Option<f64>,
244    ) -> Result<Streaming<api::GetPriorityFeeResponse>> {
245        let request = Request::new(api::GetPriorityFeeRequest {
246            project: project as i32,
247            percentile,
248        });
249
250        let response = self
251            .client
252            .get_priority_fee_stream(request)
253            .await
254            .map_err(|e| anyhow::anyhow!("GetPriorityFeeStream error: {}", e))?;
255
256        Ok(response.into_inner())
257    }
258
259    pub async fn get_bundle_tip_stream(&mut self) -> Result<Streaming<api::GetBundleTipResponse>> {
260        let request = Request::new(api::GetBundleTipRequest {});
261
262        let response = self
263            .client
264            .get_bundle_tip_stream(request)
265            .await
266            .map_err(|e| anyhow::anyhow!("GetBundleTipStream error: {}", e))?;
267
268        Ok(response.into_inner())
269    }
270
271    pub async fn get_pump_fun_new_tokens_stream(
272        &mut self,
273    ) -> Result<Streaming<api::GetPumpFunNewTokensStreamResponse>> {
274        let request = Request::new(api::GetPumpFunNewTokensStreamRequest {});
275
276        let response = self
277            .client
278            .get_pump_fun_new_tokens_stream(request)
279            .await
280            .map_err(|e| anyhow::anyhow!("GetPumpFunNewTokensStream error: {}", e))?;
281
282        Ok(response.into_inner())
283    }
284
285    pub async fn get_pump_fun_swaps_stream(
286        &mut self,
287        tokens: Vec<String>,
288    ) -> Result<Streaming<api::GetPumpFunSwapsStreamResponse>> {
289        let request = Request::new(api::GetPumpFunSwapsStreamRequest { tokens });
290
291        let response = self
292            .client
293            .get_pump_fun_swaps_stream(request)
294            .await
295            .map_err(|e| anyhow::anyhow!("GetPumpFunSwapsStream error: {}", e))?;
296
297        Ok(response.into_inner())
298    }
299
300    pub async fn get_pump_fun_amm_swap_stream(
301        &mut self,
302        pools: Vec<String>,
303    ) -> Result<Streaming<api::GetPumpFunAmmSwapStreamResponse>> {
304        let request = Request::new(api::GetPumpFunAmmSwapStreamRequest { pools });
305
306        let response = self
307            .client
308            .get_pump_fun_amm_swap_stream(request)
309            .await
310            .map_err(|e| anyhow::anyhow!("GetPumpFunAmmSwapStream error: {}", e))?;
311
312        Ok(response.into_inner())
313    }
314
315    pub async fn get_priority_fee_by_program_stream(
316        &mut self,
317        projects: Vec<String>,
318    ) -> Result<Streaming<api::GetPriorityFeeByProgramResponse>> {
319        let request = Request::new(api::GetPriorityFeeByProgramRequest { programs: projects });
320
321        let response = self
322            .client
323            .get_priority_fee_by_program_stream(request)
324            .await
325            .map_err(|e| anyhow::anyhow!("GetPriorityFeeByProjectStream error: {}", e))?;
326
327        Ok(response.into_inner())
328    }
329}