solana_trader_client_rust/provider/ws/
stream.rs

1use super::WebSocketClient;
2use anyhow::Result;
3use solana_trader_proto::api;
4use tokio_stream::Stream;
5
6impl WebSocketClient {
7    pub async fn get_pump_fun_new_amm_pool_stream(
8        &self,
9    ) -> Result<impl Stream<Item = Result<api::GetPumpFunNewAmmPoolStreamResponse>>> {
10        let request = api::GetPumpFunNewAmmPoolStreamRequest {};
11
12        self.conn.stream_proto("GetPumpFunNewAmmPoolStream", &request).await
13    }
14    
15    pub async fn get_prices_stream(
16        &self,
17        projects: Vec<api::Project>,
18        tokens: Vec<String>,
19    ) -> Result<impl Stream<Item = Result<api::GetPricesStreamResponse>>> {
20        let request = api::GetPricesStreamRequest {
21            projects: projects.iter().map(|&p| p as i32).collect(),
22            tokens,
23        };
24
25        self.conn.stream_proto("GetPricesStream", &request).await
26    }
27
28    pub async fn get_block_stream(
29        &self,
30    ) -> Result<impl Stream<Item = Result<api::GetBlockStreamResponse>>> {
31        let request = api::GetBlockStreamRequest {};
32
33        self.conn.stream_proto("GetBlockStream", &request).await
34    }
35
36    pub async fn get_orderbook_stream(
37        &self,
38        markets: Vec<String>,
39        limit: u32,
40        project: api::Project,
41    ) -> Result<impl Stream<Item = Result<api::GetOrderbooksStreamResponse>>> {
42        let request = api::GetOrderbooksRequest {
43            markets,
44            limit,
45            project: project as i32,
46        };
47
48        self.conn
49            .stream_proto("GetOrderbooksStream", &request)
50            .await
51    }
52
53    pub async fn get_market_depths_stream(
54        &self,
55        markets: Vec<String>,
56        limit: u32,
57        project: api::Project,
58    ) -> Result<impl Stream<Item = Result<api::GetMarketDepthsStreamResponse>>> {
59        let request = api::GetMarketDepthsRequest {
60            markets,
61            limit,
62            project: project as i32,
63        };
64
65        self.conn
66            .stream_proto("GetMarketDepthsStream", &request)
67            .await
68    }
69
70    pub async fn get_ticker_stream(
71        &self,
72        markets: Vec<String>,
73        project: api::Project,
74    ) -> Result<impl Stream<Item = Result<api::GetTickersStreamResponse>>> {
75        let request = api::GetTickersStreamRequest {
76            markets,
77            project: project as i32,
78        };
79
80        self.conn.stream_proto("GetTickersStream", &request).await
81    }
82
83    pub async fn get_trades_stream(
84        &self,
85        market: String,
86        limit: u32,
87        project: api::Project,
88    ) -> Result<impl Stream<Item = Result<api::GetTradesStreamResponse>>> {
89        let request = api::GetTradesRequest {
90            market,
91            limit,
92            project: project as i32,
93        };
94
95        self.conn.stream_proto("GetTradesStream", &request).await
96    }
97
98    pub async fn get_swaps_stream(
99        &self,
100        projects: Vec<api::Project>,
101        pools: Vec<String>,
102        include_failed: bool,
103    ) -> Result<impl Stream<Item = Result<api::GetSwapsStreamResponse>>> {
104        let request = api::GetSwapsStreamRequest {
105            projects: projects.iter().map(|&p| p as i32).collect(),
106            pools,
107            include_failed,
108        };
109
110        self.conn.stream_proto("GetSwapsStream", &request).await
111    }
112
113    pub async fn get_new_raydium_pools_stream(
114        &self,
115        include_cpmm: bool,
116    ) -> Result<impl Stream<Item = Result<api::GetNewRaydiumPoolsResponse>>> {
117        let request = api::GetNewRaydiumPoolsRequest {
118            include_cpmm: Some(include_cpmm),
119        };
120
121        self.conn
122            .stream_proto("GetNewRaydiumPoolsStream", &request)
123            .await
124    }
125
126    pub async fn get_new_raydium_pools_by_transaction_stream(
127        &self,
128    ) -> Result<impl Stream<Item = Result<api::GetNewRaydiumPoolsByTransactionResponse>>> {
129        let request = api::GetNewRaydiumPoolsByTransactionRequest {};
130
131        self.conn
132            .stream_proto("GetNewRaydiumPoolsByTransactionStream", &request)
133            .await
134    }
135
136    pub async fn get_recent_block_hash_stream(
137        &self,
138    ) -> Result<impl Stream<Item = Result<api::GetRecentBlockHashResponse>>> {
139        let request = api::GetRecentBlockHashRequest {};
140
141        self.conn
142            .stream_proto("GetRecentBlockHashStream", &request)
143            .await
144    }
145
146    pub async fn get_pool_reserves_stream(
147        &self,
148        projects: Vec<api::Project>,
149        pools: Vec<String>,
150    ) -> Result<impl Stream<Item = Result<api::GetPoolReservesStreamResponse>>> {
151        let request = api::GetPoolReservesStreamRequest {
152            projects: projects.iter().map(|&p| p as i32).collect(),
153            pools,
154        };
155
156        self.conn
157            .stream_proto("GetPoolReservesStream", &request)
158            .await
159    }
160
161    pub async fn get_priority_fee_stream(
162        &self,
163        project: api::Project,
164        percentile: Option<f64>,
165    ) -> Result<impl Stream<Item = Result<api::GetPriorityFeeResponse>>> {
166        let request = api::GetPriorityFeeRequest {
167            project: project as i32,
168            percentile,
169        };
170
171        self.conn
172            .stream_proto("GetPriorityFeeStream", &request)
173            .await
174    }
175
176    pub async fn get_priority_fee_by_program_stream(
177        &self,
178        programs: Vec<String>,
179    ) -> Result<impl Stream<Item = Result<api::GetPriorityFeeByProgramResponse>>> {
180        let request = api::GetPriorityFeeByProgramRequest { programs };
181
182        self.conn
183            .stream_proto("GetPriorityFeeByProgramStream", &request)
184            .await
185    }
186
187    pub async fn get_bundle_tip_stream(
188        &self,
189    ) -> Result<impl Stream<Item = Result<api::GetBundleTipResponse>>> {
190        let request = api::GetBundleTipRequest {};
191
192        self.conn.stream_proto("GetBundleTipStream", &request).await
193    }
194
195    pub async fn get_pump_fun_new_tokens_stream(
196        &self,
197    ) -> Result<impl Stream<Item = Result<api::GetPumpFunNewTokensStreamResponse>>> {
198        let request = api::GetPumpFunNewTokensStreamRequest {};
199
200        self.conn
201            .stream_proto("GetPumpFunNewTokensStream", &request)
202            .await
203    }
204
205    pub async fn get_pump_fun_swaps_stream(
206        &self,
207        tokens: Vec<String>,
208    ) -> Result<impl Stream<Item = Result<api::GetPumpFunSwapsStreamResponse>>> {
209        let request = api::GetPumpFunSwapsStreamRequest { tokens };
210
211        self.conn
212            .stream_proto("GetPumpFunSwapsStream", &request)
213            .await
214    }
215
216    pub async fn get_pump_fun_amm_swap_stream(
217        &self,
218        pools: Vec<String>,
219    ) -> Result<impl Stream<Item = Result<api::GetPumpFunAmmSwapStreamResponse>>> {
220        let request = api::GetPumpFunAmmSwapStreamRequest { pools };
221
222        self.conn
223            .stream_proto("GetPumpFunAMMSwapStream", &request)
224            .await
225    }
226    
227
228    pub async fn get_quotes_stream(
229        &self,
230        projects: Vec<i32>,
231        token_pairs: Vec<api::TokenPair>
232    ) -> Result<impl Stream<Item = Result<api::GetPumpFunSwapsStreamResponse>>> {
233        let request = api::GetQuotesStreamRequest {
234            projects,
235            token_pairs,
236        };
237        
238        self.conn
239            .stream_proto("GetQuotesStream", &request)
240            .await
241    }
242}