solana_trader_client_rust/provider/ws/
stream.rs1use super::WebSocketClient;
2use anyhow::Result;
3use solana_trader_proto::api;
4use tokio_stream::Stream;
5
6impl WebSocketClient {
7 pub async fn get_pump_fun_new_amm_pool_stream(
8 &self,
9 ) -> Result<impl Stream<Item = Result<api::GetPumpFunNewAmmPoolStreamResponse>>> {
10 let request = api::GetPumpFunNewAmmPoolStreamRequest {};
11
12 self.conn.stream_proto("GetPumpFunNewAmmPoolStream", &request).await
13 }
14
15 pub async fn get_prices_stream(
16 &self,
17 projects: Vec<api::Project>,
18 tokens: Vec<String>,
19 ) -> Result<impl Stream<Item = Result<api::GetPricesStreamResponse>>> {
20 let request = api::GetPricesStreamRequest {
21 projects: projects.iter().map(|&p| p as i32).collect(),
22 tokens,
23 };
24
25 self.conn.stream_proto("GetPricesStream", &request).await
26 }
27
28 pub async fn get_block_stream(
29 &self,
30 ) -> Result<impl Stream<Item = Result<api::GetBlockStreamResponse>>> {
31 let request = api::GetBlockStreamRequest {};
32
33 self.conn.stream_proto("GetBlockStream", &request).await
34 }
35
36 pub async fn get_orderbook_stream(
37 &self,
38 markets: Vec<String>,
39 limit: u32,
40 project: api::Project,
41 ) -> Result<impl Stream<Item = Result<api::GetOrderbooksStreamResponse>>> {
42 let request = api::GetOrderbooksRequest {
43 markets,
44 limit,
45 project: project as i32,
46 };
47
48 self.conn
49 .stream_proto("GetOrderbooksStream", &request)
50 .await
51 }
52
53 pub async fn get_market_depths_stream(
54 &self,
55 markets: Vec<String>,
56 limit: u32,
57 project: api::Project,
58 ) -> Result<impl Stream<Item = Result<api::GetMarketDepthsStreamResponse>>> {
59 let request = api::GetMarketDepthsRequest {
60 markets,
61 limit,
62 project: project as i32,
63 };
64
65 self.conn
66 .stream_proto("GetMarketDepthsStream", &request)
67 .await
68 }
69
70 pub async fn get_ticker_stream(
71 &self,
72 markets: Vec<String>,
73 project: api::Project,
74 ) -> Result<impl Stream<Item = Result<api::GetTickersStreamResponse>>> {
75 let request = api::GetTickersStreamRequest {
76 markets,
77 project: project as i32,
78 };
79
80 self.conn.stream_proto("GetTickersStream", &request).await
81 }
82
83 pub async fn get_trades_stream(
84 &self,
85 market: String,
86 limit: u32,
87 project: api::Project,
88 ) -> Result<impl Stream<Item = Result<api::GetTradesStreamResponse>>> {
89 let request = api::GetTradesRequest {
90 market,
91 limit,
92 project: project as i32,
93 };
94
95 self.conn.stream_proto("GetTradesStream", &request).await
96 }
97
98 pub async fn get_swaps_stream(
99 &self,
100 projects: Vec<api::Project>,
101 pools: Vec<String>,
102 include_failed: bool,
103 ) -> Result<impl Stream<Item = Result<api::GetSwapsStreamResponse>>> {
104 let request = api::GetSwapsStreamRequest {
105 projects: projects.iter().map(|&p| p as i32).collect(),
106 pools,
107 include_failed,
108 };
109
110 self.conn.stream_proto("GetSwapsStream", &request).await
111 }
112
113 pub async fn get_new_raydium_pools_stream(
114 &self,
115 include_cpmm: bool,
116 ) -> Result<impl Stream<Item = Result<api::GetNewRaydiumPoolsResponse>>> {
117 let request = api::GetNewRaydiumPoolsRequest {
118 include_cpmm: Some(include_cpmm),
119 };
120
121 self.conn
122 .stream_proto("GetNewRaydiumPoolsStream", &request)
123 .await
124 }
125
126 pub async fn get_new_raydium_pools_by_transaction_stream(
127 &self,
128 ) -> Result<impl Stream<Item = Result<api::GetNewRaydiumPoolsByTransactionResponse>>> {
129 let request = api::GetNewRaydiumPoolsByTransactionRequest {};
130
131 self.conn
132 .stream_proto("GetNewRaydiumPoolsByTransactionStream", &request)
133 .await
134 }
135
136 pub async fn get_recent_block_hash_stream(
137 &self,
138 ) -> Result<impl Stream<Item = Result<api::GetRecentBlockHashResponse>>> {
139 let request = api::GetRecentBlockHashRequest {};
140
141 self.conn
142 .stream_proto("GetRecentBlockHashStream", &request)
143 .await
144 }
145
146 pub async fn get_pool_reserves_stream(
147 &self,
148 projects: Vec<api::Project>,
149 pools: Vec<String>,
150 ) -> Result<impl Stream<Item = Result<api::GetPoolReservesStreamResponse>>> {
151 let request = api::GetPoolReservesStreamRequest {
152 projects: projects.iter().map(|&p| p as i32).collect(),
153 pools,
154 };
155
156 self.conn
157 .stream_proto("GetPoolReservesStream", &request)
158 .await
159 }
160
161 pub async fn get_priority_fee_stream(
162 &self,
163 project: api::Project,
164 percentile: Option<f64>,
165 ) -> Result<impl Stream<Item = Result<api::GetPriorityFeeResponse>>> {
166 let request = api::GetPriorityFeeRequest {
167 project: project as i32,
168 percentile,
169 };
170
171 self.conn
172 .stream_proto("GetPriorityFeeStream", &request)
173 .await
174 }
175
176 pub async fn get_priority_fee_by_program_stream(
177 &self,
178 programs: Vec<String>,
179 ) -> Result<impl Stream<Item = Result<api::GetPriorityFeeByProgramResponse>>> {
180 let request = api::GetPriorityFeeByProgramRequest { programs };
181
182 self.conn
183 .stream_proto("GetPriorityFeeByProgramStream", &request)
184 .await
185 }
186
187 pub async fn get_bundle_tip_stream(
188 &self,
189 ) -> Result<impl Stream<Item = Result<api::GetBundleTipResponse>>> {
190 let request = api::GetBundleTipRequest {};
191
192 self.conn.stream_proto("GetBundleTipStream", &request).await
193 }
194
195 pub async fn get_pump_fun_new_tokens_stream(
196 &self,
197 ) -> Result<impl Stream<Item = Result<api::GetPumpFunNewTokensStreamResponse>>> {
198 let request = api::GetPumpFunNewTokensStreamRequest {};
199
200 self.conn
201 .stream_proto("GetPumpFunNewTokensStream", &request)
202 .await
203 }
204
205 pub async fn get_pump_fun_swaps_stream(
206 &self,
207 tokens: Vec<String>,
208 ) -> Result<impl Stream<Item = Result<api::GetPumpFunSwapsStreamResponse>>> {
209 let request = api::GetPumpFunSwapsStreamRequest { tokens };
210
211 self.conn
212 .stream_proto("GetPumpFunSwapsStream", &request)
213 .await
214 }
215
216 pub async fn get_pump_fun_amm_swap_stream(
217 &self,
218 pools: Vec<String>,
219 ) -> Result<impl Stream<Item = Result<api::GetPumpFunAmmSwapStreamResponse>>> {
220 let request = api::GetPumpFunAmmSwapStreamRequest { pools };
221
222 self.conn
223 .stream_proto("GetPumpFunAMMSwapStream", &request)
224 .await
225 }
226
227
228 pub async fn get_quotes_stream(
229 &self,
230 projects: Vec<i32>,
231 token_pairs: Vec<api::TokenPair>
232 ) -> Result<impl Stream<Item = Result<api::GetPumpFunSwapsStreamResponse>>> {
233 let request = api::GetQuotesStreamRequest {
234 projects,
235 token_pairs,
236 };
237
238 self.conn
239 .stream_proto("GetQuotesStream", &request)
240 .await
241 }
242}