solana_trader_client_rust/provider/grpc/
stream.rs1use anyhow::Result;
2use solana_trader_proto::api;
3use tonic::Request;
4use tonic::Streaming;
5
6use super::GrpcClient;
7
8impl GrpcClient {
9 pub async fn get_pump_fun_new_amm_pool_stream(
10 &mut self,
11 ) -> Result<Streaming<api::GetPumpFunNewAmmPoolStreamResponse>> {
12 let request = Request::new(api::GetPumpFunNewAmmPoolStreamRequest {});
13
14 let response = self
15 .client
16 .get_pump_fun_new_amm_pool_stream(request)
17 .await
18 .map_err(|e| anyhow::anyhow!("GetPumpFunNewAmmPoolStream error: {}", e))?;
19
20 Ok(response.into_inner())
21 }
22
23 pub async fn get_prices_stream(
24 &mut self,
25 projects: Vec<api::Project>,
26 tokens: Vec<String>,
27 ) -> Result<Streaming<api::GetPricesStreamResponse>> {
28 let request = Request::new(api::GetPricesStreamRequest {
29 projects: projects.iter().map(|&p| p as i32).collect(),
30 tokens,
31 });
32
33 let response = self
34 .client
35 .get_prices_stream(request)
36 .await
37 .map_err(|e| anyhow::anyhow!("GetPricesStream error: {}", e))?;
38
39 Ok(response.into_inner())
40 }
41
42 pub async fn get_block_stream(&mut self) -> Result<Streaming<api::GetBlockStreamResponse>> {
43 let request = Request::new(api::GetBlockStreamRequest {});
44
45 let response = self
46 .client
47 .get_block_stream(request)
48 .await
49 .map_err(|e| anyhow::anyhow!("GetBlockStream error: {}", e))?;
50
51 Ok(response.into_inner())
52 }
53
54 pub async fn get_orderbook_stream(
55 &mut self,
56 markets: Vec<String>,
57 limit: u32,
58 project: api::Project,
59 ) -> Result<Streaming<api::GetOrderbooksStreamResponse>> {
60 let request = Request::new(api::GetOrderbooksRequest {
61 markets,
62 limit,
63 project: project as i32,
64 });
65
66 let response = self
67 .client
68 .get_orderbooks_stream(request)
69 .await
70 .map_err(|e| anyhow::anyhow!("GetOrderbooksStream error: {}", e))?;
71
72 Ok(response.into_inner())
73 }
74
75 pub async fn get_market_depths_stream(
76 &mut self,
77 markets: Vec<String>,
78 limit: u32,
79 project: api::Project,
80 ) -> Result<Streaming<api::GetMarketDepthsStreamResponse>> {
81 let request = Request::new(api::GetMarketDepthsRequest {
82 markets,
83 limit,
84 project: project as i32,
85 });
86
87 let response = self
88 .client
89 .get_market_depths_stream(request)
90 .await
91 .map_err(|e| anyhow::anyhow!("GetMarketDepthsStream error: {}", e))?;
92
93 Ok(response.into_inner())
94 }
95
96 pub async fn get_ticker_stream(
97 &mut self,
98 markets: Vec<String>,
99 project: api::Project,
100 ) -> Result<Streaming<api::GetTickersStreamResponse>> {
101 let request = Request::new(api::GetTickersStreamRequest {
102 markets,
103 project: project as i32,
104 });
105
106 let response = self
107 .client
108 .get_tickers_stream(request)
109 .await
110 .map_err(|e| anyhow::anyhow!("GetTickersStream error: {}", e))?;
111
112 Ok(response.into_inner())
113 }
114
115 pub async fn get_trades_stream(
116 &mut self,
117 market: String,
118 limit: u32,
119 project: api::Project,
120 ) -> Result<Streaming<api::GetTradesStreamResponse>> {
121 let request = Request::new(api::GetTradesRequest {
122 market,
123 limit,
124 project: project as i32,
125 });
126
127 let response = self
128 .client
129 .get_trades_stream(request)
130 .await
131 .map_err(|e| anyhow::anyhow!("GetTradesStream error: {}", e))?;
132
133 Ok(response.into_inner())
134 }
135
136 pub async fn get_swaps_stream(
137 &mut self,
138 projects: Vec<api::Project>,
139 pools: Vec<String>,
140 include_failed: bool,
141 ) -> Result<Streaming<api::GetSwapsStreamResponse>> {
142 let request = Request::new(api::GetSwapsStreamRequest {
143 projects: projects.iter().map(|&p| p as i32).collect(),
144 pools,
145 include_failed,
146 });
147
148 let response = self
149 .client
150 .get_swaps_stream(request)
151 .await
152 .map_err(|e| anyhow::anyhow!("GetSwapsStream error: {}", e))?;
153
154 Ok(response.into_inner())
155 }
156
157 pub async fn get_new_raydium_pools_stream(
158 &mut self,
159 include_cpmm: bool,
160 ) -> Result<Streaming<api::GetNewRaydiumPoolsResponse>> {
161 let request = Request::new(api::GetNewRaydiumPoolsRequest {
162 include_cpmm: Some(include_cpmm),
163 });
164
165 let response = self
166 .client
167 .get_new_raydium_pools_stream(request)
168 .await
169 .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsStream error: {}", e))?;
170
171 Ok(response.into_inner())
172 }
173
174 pub async fn get_quotes_stream(
175 &mut self,
176 projects: Vec<i32>,
177 token_pairs: Vec<api::TokenPair>,
178 ) -> Result<Streaming<api::GetQuotesStreamResponse>> {
179 let request = Request::new(api::GetQuotesStreamRequest {
180 projects,
181 token_pairs,
182 });
183
184 let response = self
185 .client
186 .get_quotes_stream(request)
187 .await
188 .map_err(|e| anyhow::anyhow!("GetQuotesStream error: {}", e))?;
189
190 Ok(response.into_inner())
191 }
192
193 pub async fn get_new_raydium_pools_by_transaction_stream(
194 &mut self,
195 ) -> Result<Streaming<api::GetNewRaydiumPoolsByTransactionResponse>> {
196 let request = Request::new(api::GetNewRaydiumPoolsByTransactionRequest {});
197
198 let response = self
199 .client
200 .get_new_raydium_pools_by_transaction_stream(request)
201 .await
202 .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsByTransactionStream error: {}", e))?;
203
204 Ok(response.into_inner())
205 }
206
207 pub async fn get_recent_block_hash_stream(
208 &mut self,
209 ) -> Result<Streaming<api::GetRecentBlockHashResponse>> {
210 let request = Request::new(api::GetRecentBlockHashRequest {});
211
212 let response = self
213 .client
214 .get_recent_block_hash_stream(request)
215 .await
216 .map_err(|e| anyhow::anyhow!("GetRecentBlockHashStream error: {}", e))?;
217
218 Ok(response.into_inner())
219 }
220
221 pub async fn get_pool_reserves_stream(
222 &mut self,
223 projects: Vec<api::Project>,
224 pools: Vec<String>,
225 ) -> Result<Streaming<api::GetPoolReservesStreamResponse>> {
226 let request = Request::new(api::GetPoolReservesStreamRequest {
227 projects: projects.iter().map(|&p| p as i32).collect(),
228 pools,
229 });
230
231 let response = self
232 .client
233 .get_pool_reserves_stream(request)
234 .await
235 .map_err(|e| anyhow::anyhow!("GetPoolReservesStream error: {}", e))?;
236
237 Ok(response.into_inner())
238 }
239
240 pub async fn get_priority_fee_stream(
241 &mut self,
242 project: api::Project,
243 percentile: Option<f64>,
244 ) -> Result<Streaming<api::GetPriorityFeeResponse>> {
245 let request = Request::new(api::GetPriorityFeeRequest {
246 project: project as i32,
247 percentile,
248 });
249
250 let response = self
251 .client
252 .get_priority_fee_stream(request)
253 .await
254 .map_err(|e| anyhow::anyhow!("GetPriorityFeeStream error: {}", e))?;
255
256 Ok(response.into_inner())
257 }
258
259 pub async fn get_bundle_tip_stream(&mut self) -> Result<Streaming<api::GetBundleTipResponse>> {
260 let request = Request::new(api::GetBundleTipRequest {});
261
262 let response = self
263 .client
264 .get_bundle_tip_stream(request)
265 .await
266 .map_err(|e| anyhow::anyhow!("GetBundleTipStream error: {}", e))?;
267
268 Ok(response.into_inner())
269 }
270
271 pub async fn get_pump_fun_new_tokens_stream(
272 &mut self,
273 ) -> Result<Streaming<api::GetPumpFunNewTokensStreamResponse>> {
274 let request = Request::new(api::GetPumpFunNewTokensStreamRequest {});
275
276 let response = self
277 .client
278 .get_pump_fun_new_tokens_stream(request)
279 .await
280 .map_err(|e| anyhow::anyhow!("GetPumpFunNewTokensStream error: {}", e))?;
281
282 Ok(response.into_inner())
283 }
284
285 pub async fn get_pump_fun_swaps_stream(
286 &mut self,
287 tokens: Vec<String>,
288 ) -> Result<Streaming<api::GetPumpFunSwapsStreamResponse>> {
289 let request = Request::new(api::GetPumpFunSwapsStreamRequest { tokens });
290
291 let response = self
292 .client
293 .get_pump_fun_swaps_stream(request)
294 .await
295 .map_err(|e| anyhow::anyhow!("GetPumpFunSwapsStream error: {}", e))?;
296
297 Ok(response.into_inner())
298 }
299
300 pub async fn get_pump_fun_amm_swap_stream(
301 &mut self,
302 pools: Vec<String>,
303 ) -> Result<Streaming<api::GetPumpFunAmmSwapStreamResponse>> {
304 let request = Request::new(api::GetPumpFunAmmSwapStreamRequest { pools });
305
306 let response = self
307 .client
308 .get_pump_fun_amm_swap_stream(request)
309 .await
310 .map_err(|e| anyhow::anyhow!("GetPumpFunAmmSwapStream error: {}", e))?;
311
312 Ok(response.into_inner())
313 }
314
315 pub async fn get_priority_fee_by_program_stream(
316 &mut self,
317 projects: Vec<String>,
318 ) -> Result<Streaming<api::GetPriorityFeeByProgramResponse>> {
319 let request = Request::new(api::GetPriorityFeeByProgramRequest { programs: projects });
320
321 let response = self
322 .client
323 .get_priority_fee_by_program_stream(request)
324 .await
325 .map_err(|e| anyhow::anyhow!("GetPriorityFeeByProjectStream error: {}", e))?;
326
327 Ok(response.into_inner())
328 }
329}