titan_rust_client/
client.rs1use std::sync::Arc;
4
5use std::time::Duration;
6
7use titan_api_types::ws::v1::{
8 GetInfoRequest, GetVenuesRequest, ListProvidersRequest, ProviderInfo, RequestData,
9 ResponseData, ServerInfo, SwapPrice, SwapPriceRequest, SwapQuoteRequest, SwapQuotes, VenueInfo,
10};
11use tokio::sync::RwLock;
12
13use crate::config::TitanConfig;
14use crate::connection::Connection;
15use crate::error::TitanClientError;
16use crate::queue::StreamManager;
17use crate::state::ConnectionState;
18
19const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 10;
21
22pub struct TitanClient {
27 connection: Arc<RwLock<Option<Arc<Connection>>>>,
28 stream_manager: Arc<RwLock<Option<Arc<StreamManager>>>>,
29 config: TitanConfig,
30}
31
32impl TitanClient {
33 #[tracing::instrument(skip_all)]
37 pub async fn new(config: TitanConfig) -> Result<Self, TitanClientError> {
38 let connection = Arc::new(Connection::connect(config.clone()).await?);
39
40 let max_streams = Self::fetch_max_streams(&connection).await;
42
43 let stream_manager = StreamManager::new(connection.clone(), max_streams);
44
45 Ok(Self {
46 connection: Arc::new(RwLock::new(Some(connection))),
47 stream_manager: Arc::new(RwLock::new(Some(stream_manager))),
48 config,
49 })
50 }
51
52 async fn fetch_max_streams(connection: &Arc<Connection>) -> u32 {
54 match connection
55 .send_request(RequestData::GetInfo(GetInfoRequest { dummy: None }))
56 .await
57 {
58 Ok(response) => match response.data {
59 ResponseData::GetInfo(info) => info.settings.connection.concurrent_streams,
60 _ => DEFAULT_MAX_CONCURRENT_STREAMS,
61 },
62 Err(e) => {
63 tracing::warn!("Failed to fetch server info: {}, using default limits", e);
64 DEFAULT_MAX_CONCURRENT_STREAMS
65 }
66 }
67 }
68
69 pub async fn state_receiver(&self) -> tokio::sync::watch::Receiver<ConnectionState> {
73 let conn = self.connection.read().await;
74 if let Some(ref connection) = *conn {
75 connection.state_receiver()
76 } else {
77 let (tx, rx) = tokio::sync::watch::channel(ConnectionState::Disconnected {
78 reason: "Not connected".to_string(),
79 });
80 drop(tx);
81 rx
82 }
83 }
84
85 pub async fn state(&self) -> ConnectionState {
87 let conn = self.connection.read().await;
88 if let Some(ref connection) = *conn {
89 connection.state()
90 } else {
91 ConnectionState::Disconnected {
92 reason: "Not connected".to_string(),
93 }
94 }
95 }
96
97 pub async fn is_connected(&self) -> bool {
99 self.state().await.is_connected()
100 }
101
102 pub async fn wait_for_connected(&self) -> Result<(), TitanClientError> {
107 let mut receiver = self.state_receiver().await;
108
109 loop {
110 let state = receiver.borrow_and_update().clone();
111 match state {
112 ConnectionState::Connected => return Ok(()),
113 ConnectionState::Disconnected { reason } => {
114 if reason.contains("Max reconnect attempts") {
116 return Err(TitanClientError::ConnectionFailed {
117 attempts: 0,
118 reason,
119 });
120 }
121 }
122 ConnectionState::Reconnecting { .. } => {}
123 }
124
125 if receiver.changed().await.is_err() {
127 return Err(TitanClientError::ConnectionFailed {
128 attempts: 0,
129 reason: "Connection closed".to_string(),
130 });
131 }
132 }
133 }
134
135 async fn get_connection(&self) -> Result<Arc<Connection>, TitanClientError> {
137 let conn = self.connection.read().await;
138 conn.clone()
139 .ok_or_else(|| TitanClientError::ConnectionFailed {
140 attempts: 0,
141 reason: "Not connected".to_string(),
142 })
143 }
144
145 async fn get_stream_manager(&self) -> Result<Arc<StreamManager>, TitanClientError> {
147 let manager = self.stream_manager.read().await;
148 manager
149 .clone()
150 .ok_or_else(|| TitanClientError::ConnectionFailed {
151 attempts: 0,
152 reason: "Not connected".to_string(),
153 })
154 }
155
156 pub async fn active_stream_count(&self) -> u32 {
158 match self.get_stream_manager().await {
159 Ok(manager) => manager.active_count(),
160 Err(_) => 0,
161 }
162 }
163
164 pub async fn queued_stream_count(&self) -> usize {
166 match self.get_stream_manager().await {
167 Ok(manager) => manager.queue_len().await,
168 Err(_) => 0,
169 }
170 }
171
172 #[tracing::instrument(skip_all)]
181 pub async fn close(&self) -> Result<(), TitanClientError> {
182 {
184 let conn = self.connection.read().await;
185 if let Some(ref connection) = *conn {
186 connection.shutdown().await;
187 }
188 }
189
190 {
192 let mut manager = self.stream_manager.write().await;
193 *manager = None;
194 }
195
196 {
198 let mut conn = self.connection.write().await;
199 *conn = None;
200 }
201
202 Ok(())
203 }
204
205 pub async fn is_closed(&self) -> bool {
207 let conn = self.connection.read().await;
208 conn.is_none()
209 }
210
211 #[tracing::instrument(skip_all)]
218 pub async fn new_swap_quote_stream(
219 &self,
220 request: SwapQuoteRequest,
221 ) -> Result<crate::stream::QuoteStream, TitanClientError> {
222 let manager = self.get_stream_manager().await?;
223 manager.request_stream(request).await
224 }
225
226 #[tracing::instrument(skip_all)]
231 pub async fn get_swap_price(
232 &self,
233 request: SwapQuoteRequest,
234 ) -> Result<SwapQuotes, TitanClientError> {
235 let timeout_ms = self.config.one_shot_timeout_ms;
236 let timeout = Duration::from_millis(timeout_ms);
237
238 let mut stream = self.new_swap_quote_stream(request).await?;
239
240 let result = tokio::time::timeout(timeout, stream.recv()).await;
241
242 match result {
243 Ok(Some(quotes)) => {
244 let _ = stream.stop().await;
245 Ok(quotes)
246 }
247 Ok(None) => Err(TitanClientError::Unexpected(anyhow::anyhow!(
248 "Stream ended without sending quotes"
249 ))),
250 Err(_elapsed) => {
251 let _ = stream.stop().await;
252 Err(TitanClientError::Timeout {
253 duration_ms: timeout_ms,
254 })
255 }
256 }
257 }
258
259 #[tracing::instrument(skip_all)]
263 pub async fn get_info(&self) -> Result<ServerInfo, TitanClientError> {
264 let connection = self.get_connection().await?;
265 let response = connection
266 .send_request(RequestData::GetInfo(GetInfoRequest { dummy: None }))
267 .await?;
268
269 match response.data {
270 ResponseData::GetInfo(info) => Ok(info),
271 other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
272 "Unexpected response type: expected GetInfo, got {:?}",
273 std::mem::discriminant(&other)
274 ))),
275 }
276 }
277
278 #[tracing::instrument(skip_all)]
280 pub async fn get_venues(&self) -> Result<VenueInfo, TitanClientError> {
281 let connection = self.get_connection().await?;
282 let response = connection
283 .send_request(RequestData::GetVenues(GetVenuesRequest {
284 include_program_ids: Some(true),
285 }))
286 .await?;
287
288 match response.data {
289 ResponseData::GetVenues(venues) => Ok(venues),
290 other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
291 "Unexpected response type: expected GetVenues, got {:?}",
292 std::mem::discriminant(&other)
293 ))),
294 }
295 }
296
297 #[tracing::instrument(skip_all)]
299 pub async fn list_providers(&self) -> Result<Vec<ProviderInfo>, TitanClientError> {
300 let connection = self.get_connection().await?;
301 let response = connection
302 .send_request(RequestData::ListProviders(ListProvidersRequest {
303 include_icons: Some(true),
304 }))
305 .await?;
306
307 match response.data {
308 ResponseData::ListProviders(providers) => Ok(providers),
309 other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
310 "Unexpected response type: expected ListProviders, got {:?}",
311 std::mem::discriminant(&other)
312 ))),
313 }
314 }
315
316 #[tracing::instrument(skip_all)]
318 pub async fn get_swap_price_simple(
319 &self,
320 request: SwapPriceRequest,
321 ) -> Result<SwapPrice, TitanClientError> {
322 let connection = self.get_connection().await?;
323 let response = connection
324 .send_request(RequestData::GetSwapPrice(request))
325 .await?;
326
327 match response.data {
328 ResponseData::GetSwapPrice(price) => Ok(price),
329 other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
330 "Unexpected response type: expected GetSwapPrice, got {:?}",
331 std::mem::discriminant(&other)
332 ))),
333 }
334 }
335}