solana_trader_client_rust/provider/grpc/
stream.rs1use anyhow::Result;
2use solana_trader_proto::api;
3use tonic::Request;
4use tonic::Streaming;
5
6use super::GrpcClient;
7
8impl GrpcClient {
9 pub async fn get_prices_stream(
10 &mut self,
11 projects: Vec<api::Project>,
12 tokens: Vec<String>,
13 ) -> Result<Streaming<api::GetPricesStreamResponse>> {
14 let request = Request::new(api::GetPricesStreamRequest {
15 projects: projects.iter().map(|&p| p as i32).collect(),
16 tokens,
17 });
18
19 let response = self
20 .client
21 .get_prices_stream(request)
22 .await
23 .map_err(|e| anyhow::anyhow!("GetPricesStream error: {}", e))?;
24
25 Ok(response.into_inner())
26 }
27
28 pub async fn get_block_stream(&mut self) -> Result<Streaming<api::GetBlockStreamResponse>> {
29 let request = Request::new(api::GetBlockStreamRequest {});
30
31 let response = self
32 .client
33 .get_block_stream(request)
34 .await
35 .map_err(|e| anyhow::anyhow!("GetBlockStream error: {}", e))?;
36
37 Ok(response.into_inner())
38 }
39
40 pub async fn get_orderbook_stream(
41 &mut self,
42 markets: Vec<String>,
43 limit: u32,
44 project: api::Project,
45 ) -> Result<Streaming<api::GetOrderbooksStreamResponse>> {
46 let request = Request::new(api::GetOrderbooksRequest {
47 markets,
48 limit,
49 project: project as i32,
50 });
51
52 let response = self
53 .client
54 .get_orderbooks_stream(request)
55 .await
56 .map_err(|e| anyhow::anyhow!("GetOrderbooksStream error: {}", e))?;
57
58 Ok(response.into_inner())
59 }
60
61 pub async fn get_market_depths_stream(
62 &mut self,
63 markets: Vec<String>,
64 limit: u32,
65 project: api::Project,
66 ) -> Result<Streaming<api::GetMarketDepthsStreamResponse>> {
67 let request = Request::new(api::GetMarketDepthsRequest {
68 markets,
69 limit,
70 project: project as i32,
71 });
72
73 let response = self
74 .client
75 .get_market_depths_stream(request)
76 .await
77 .map_err(|e| anyhow::anyhow!("GetMarketDepthsStream error: {}", e))?;
78
79 Ok(response.into_inner())
80 }
81
82 pub async fn get_ticker_stream(
83 &mut self,
84 markets: Vec<String>,
85 project: api::Project,
86 ) -> Result<Streaming<api::GetTickersStreamResponse>> {
87 let request = Request::new(api::GetTickersStreamRequest {
88 markets,
89 project: project as i32,
90 });
91
92 let response = self
93 .client
94 .get_tickers_stream(request)
95 .await
96 .map_err(|e| anyhow::anyhow!("GetTickersStream error: {}", e))?;
97
98 Ok(response.into_inner())
99 }
100
101 pub async fn get_trades_stream(
102 &mut self,
103 market: String,
104 limit: u32,
105 project: api::Project,
106 ) -> Result<Streaming<api::GetTradesStreamResponse>> {
107 let request = Request::new(api::GetTradesRequest {
108 market,
109 limit,
110 project: project as i32,
111 });
112
113 let response = self
114 .client
115 .get_trades_stream(request)
116 .await
117 .map_err(|e| anyhow::anyhow!("GetTradesStream error: {}", e))?;
118
119 Ok(response.into_inner())
120 }
121
122 pub async fn get_swaps_stream(
123 &mut self,
124 projects: Vec<api::Project>,
125 pools: Vec<String>,
126 include_failed: bool,
127 ) -> Result<Streaming<api::GetSwapsStreamResponse>> {
128 let request = Request::new(api::GetSwapsStreamRequest {
129 projects: projects.iter().map(|&p| p as i32).collect(),
130 pools,
131 include_failed,
132 });
133
134 let response = self
135 .client
136 .get_swaps_stream(request)
137 .await
138 .map_err(|e| anyhow::anyhow!("GetSwapsStream error: {}", e))?;
139
140 Ok(response.into_inner())
141 }
142
143 pub async fn get_new_raydium_pools_stream(
144 &mut self,
145 include_cpmm: bool,
146 ) -> Result<Streaming<api::GetNewRaydiumPoolsResponse>> {
147 let request = Request::new(api::GetNewRaydiumPoolsRequest {
148 include_cpmm: Some(include_cpmm),
149 });
150
151 let response = self
152 .client
153 .get_new_raydium_pools_stream(request)
154 .await
155 .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsStream error: {}", e))?;
156
157 Ok(response.into_inner())
158 }
159
160 pub async fn get_new_raydium_pools_by_transaction_stream(
161 &mut self,
162 ) -> Result<Streaming<api::GetNewRaydiumPoolsByTransactionResponse>> {
163 let request = Request::new(api::GetNewRaydiumPoolsByTransactionRequest {});
164
165 let response = self
166 .client
167 .get_new_raydium_pools_by_transaction_stream(request)
168 .await
169 .map_err(|e| anyhow::anyhow!("GetNewRaydiumPoolsByTransactionStream error: {}", e))?;
170
171 Ok(response.into_inner())
172 }
173
174 pub async fn get_recent_block_hash_stream(
175 &mut self,
176 ) -> Result<Streaming<api::GetRecentBlockHashResponse>> {
177 let request = Request::new(api::GetRecentBlockHashRequest {});
178
179 let response = self
180 .client
181 .get_recent_block_hash_stream(request)
182 .await
183 .map_err(|e| anyhow::anyhow!("GetRecentBlockHashStream error: {}", e))?;
184
185 Ok(response.into_inner())
186 }
187
188 pub async fn get_pool_reserves_stream(
189 &mut self,
190 projects: Vec<api::Project>,
191 pools: Vec<String>,
192 ) -> Result<Streaming<api::GetPoolReservesStreamResponse>> {
193 let request = Request::new(api::GetPoolReservesStreamRequest {
194 projects: projects.iter().map(|&p| p as i32).collect(),
195 pools,
196 });
197
198 let response = self
199 .client
200 .get_pool_reserves_stream(request)
201 .await
202 .map_err(|e| anyhow::anyhow!("GetPoolReservesStream error: {}", e))?;
203
204 Ok(response.into_inner())
205 }
206
207 pub async fn get_priority_fee_stream(
208 &mut self,
209 project: api::Project,
210 percentile: Option<f64>,
211 ) -> Result<Streaming<api::GetPriorityFeeResponse>> {
212 let request = Request::new(api::GetPriorityFeeRequest {
213 project: project as i32,
214 percentile,
215 });
216
217 let response = self
218 .client
219 .get_priority_fee_stream(request)
220 .await
221 .map_err(|e| anyhow::anyhow!("GetPriorityFeeStream error: {}", e))?;
222
223 Ok(response.into_inner())
224 }
225
226 pub async fn get_bundle_tip_stream(&mut self) -> Result<Streaming<api::GetBundleTipResponse>> {
227 let request = Request::new(api::GetBundleTipRequest {});
228
229 let response = self
230 .client
231 .get_bundle_tip_stream(request)
232 .await
233 .map_err(|e| anyhow::anyhow!("GetBundleTipStream error: {}", e))?;
234
235 Ok(response.into_inner())
236 }
237
238 pub async fn get_pump_fun_new_tokens_stream(
239 &mut self,
240 ) -> Result<Streaming<api::GetPumpFunNewTokensStreamResponse>> {
241 let request = Request::new(api::GetPumpFunNewTokensStreamRequest {});
242
243 let response = self
244 .client
245 .get_pump_fun_new_tokens_stream(request)
246 .await
247 .map_err(|e| anyhow::anyhow!("GetPumpFunNewTokensStream error: {}", e))?;
248
249 Ok(response.into_inner())
250 }
251
252 pub async fn get_pump_fun_swaps_stream(
253 &mut self,
254 tokens: Vec<String>,
255 ) -> Result<Streaming<api::GetPumpFunSwapsStreamResponse>> {
256 let request = Request::new(api::GetPumpFunSwapsStreamRequest { tokens });
257
258 let response = self
259 .client
260 .get_pump_fun_swaps_stream(request)
261 .await
262 .map_err(|e| anyhow::anyhow!("GetPumpFunSwapsStream error: {}", e))?;
263
264 Ok(response.into_inner())
265 }
266
267 pub async fn get_priority_fee_by_program_stream(
268 &mut self,
269 projects: Vec<String>,
270 ) -> Result<Streaming<api::GetPriorityFeeByProgramResponse>> {
271 let request = Request::new(api::GetPriorityFeeByProgramRequest { programs: projects });
272
273 let response = self
274 .client
275 .get_priority_fee_by_program_stream(request)
276 .await
277 .map_err(|e| anyhow::anyhow!("GetPriorityFeeByProjectStream error: {}", e))?;
278
279 Ok(response.into_inner())
280 }
281}