Skip to main content

titan_rust_client/
client.rs

1//! Main Titan client implementation.
2
3use std::sync::Arc;
4
5use std::time::Duration;
6
7use titan_api_types::ws::v1::{
8    GetInfoRequest, GetVenuesRequest, ListProvidersRequest, ProviderInfo, RequestData,
9    ResponseData, ServerInfo, SwapPrice, SwapPriceRequest, SwapQuoteRequest, SwapQuotes, VenueInfo,
10};
11use tokio::sync::RwLock;
12
13use crate::config::TitanConfig;
14use crate::connection::Connection;
15use crate::error::TitanClientError;
16use crate::queue::StreamManager;
17use crate::state::ConnectionState;
18
19/// Default max concurrent streams if server doesn't specify.
20const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 10;
21
22/// Titan Exchange WebSocket client.
23///
24/// Thread-safe client for interacting with the Titan Exchange API.
25/// Can be shared across axum handlers via `Arc<TitanClient>`.
26pub struct TitanClient {
27    connection: Arc<RwLock<Option<Arc<Connection>>>>,
28    stream_manager: Arc<RwLock<Option<Arc<StreamManager>>>>,
29    config: TitanConfig,
30}
31
32impl TitanClient {
33    /// Create a new client with the given configuration.
34    ///
35    /// Connects eagerly and fetches server info to determine stream limits.
36    #[tracing::instrument(skip_all)]
37    pub async fn new(config: TitanConfig) -> Result<Self, TitanClientError> {
38        let connection = Arc::new(Connection::connect(config.clone()).await?);
39
40        // Fetch server info to get max concurrent streams
41        let max_streams = Self::fetch_max_streams(&connection).await;
42
43        let stream_manager = StreamManager::new(connection.clone(), max_streams);
44
45        Ok(Self {
46            connection: Arc::new(RwLock::new(Some(connection))),
47            stream_manager: Arc::new(RwLock::new(Some(stream_manager))),
48            config,
49        })
50    }
51
52    /// Fetch max concurrent streams from server info.
53    async fn fetch_max_streams(connection: &Arc<Connection>) -> u32 {
54        match connection
55            .send_request(RequestData::GetInfo(GetInfoRequest { dummy: None }))
56            .await
57        {
58            Ok(response) => match response.data {
59                ResponseData::GetInfo(info) => info.settings.connection.concurrent_streams,
60                _ => DEFAULT_MAX_CONCURRENT_STREAMS,
61            },
62            Err(e) => {
63                tracing::warn!("Failed to fetch server info: {}, using default limits", e);
64                DEFAULT_MAX_CONCURRENT_STREAMS
65            }
66        }
67    }
68
69    /// Returns a watch receiver for connection state changes.
70    ///
71    /// Use this to observe state transitions (Connected, Reconnecting, Disconnected).
72    pub async fn state_receiver(&self) -> tokio::sync::watch::Receiver<ConnectionState> {
73        let conn = self.connection.read().await;
74        if let Some(ref connection) = *conn {
75            connection.state_receiver()
76        } else {
77            let (tx, rx) = tokio::sync::watch::channel(ConnectionState::Disconnected {
78                reason: "Not connected".to_string(),
79            });
80            drop(tx);
81            rx
82        }
83    }
84
85    /// Get the current connection state.
86    pub async fn state(&self) -> ConnectionState {
87        let conn = self.connection.read().await;
88        if let Some(ref connection) = *conn {
89            connection.state()
90        } else {
91            ConnectionState::Disconnected {
92                reason: "Not connected".to_string(),
93            }
94        }
95    }
96
97    /// Returns true if currently connected.
98    pub async fn is_connected(&self) -> bool {
99        self.state().await.is_connected()
100    }
101
102    /// Wait until the connection is established.
103    ///
104    /// Returns immediately if already connected.
105    /// Returns error if connection is permanently closed.
106    pub async fn wait_for_connected(&self) -> Result<(), TitanClientError> {
107        let mut receiver = self.state_receiver().await;
108
109        loop {
110            let state = receiver.borrow_and_update().clone();
111            match state {
112                ConnectionState::Connected => return Ok(()),
113                ConnectionState::Disconnected { reason } => {
114                    // Check if this is a permanent disconnection
115                    if reason.contains("Max reconnect attempts") {
116                        return Err(TitanClientError::ConnectionFailed {
117                            attempts: 0,
118                            reason,
119                        });
120                    }
121                }
122                ConnectionState::Reconnecting { .. } => {}
123            }
124
125            // Wait for next state change
126            if receiver.changed().await.is_err() {
127                return Err(TitanClientError::ConnectionFailed {
128                    attempts: 0,
129                    reason: "Connection closed".to_string(),
130                });
131            }
132        }
133    }
134
135    /// Get a clone of the connection Arc.
136    async fn get_connection(&self) -> Result<Arc<Connection>, TitanClientError> {
137        let conn = self.connection.read().await;
138        conn.clone()
139            .ok_or_else(|| TitanClientError::ConnectionFailed {
140                attempts: 0,
141                reason: "Not connected".to_string(),
142            })
143    }
144
145    /// Get a clone of the stream manager Arc.
146    async fn get_stream_manager(&self) -> Result<Arc<StreamManager>, TitanClientError> {
147        let manager = self.stream_manager.read().await;
148        manager
149            .clone()
150            .ok_or_else(|| TitanClientError::ConnectionFailed {
151                attempts: 0,
152                reason: "Not connected".to_string(),
153            })
154    }
155
156    /// Get the current active stream count.
157    pub async fn active_stream_count(&self) -> u32 {
158        match self.get_stream_manager().await {
159            Ok(manager) => manager.active_count(),
160            Err(_) => 0,
161        }
162    }
163
164    /// Get the current queue length.
165    pub async fn queued_stream_count(&self) -> usize {
166        match self.get_stream_manager().await {
167            Ok(manager) => manager.queue_len().await,
168            Err(_) => 0,
169        }
170    }
171
172    /// Graceful shutdown: stops all streams, then closes WebSocket.
173    ///
174    /// This method:
175    /// 1. Sends StopStream for all active streams
176    /// 2. Clears the stream manager
177    /// 3. Closes the WebSocket connection
178    ///
179    /// After calling this, the client cannot be reused.
180    #[tracing::instrument(skip_all)]
181    pub async fn close(&self) -> Result<(), TitanClientError> {
182        // First, shutdown the connection (stops all streams)
183        {
184            let conn = self.connection.read().await;
185            if let Some(ref connection) = *conn {
186                connection.shutdown().await;
187            }
188        }
189
190        // Clear stream manager
191        {
192            let mut manager = self.stream_manager.write().await;
193            *manager = None;
194        }
195
196        // Clear connection (this will cause the background loop to exit)
197        {
198            let mut conn = self.connection.write().await;
199            *conn = None;
200        }
201
202        Ok(())
203    }
204
205    /// Check if the client has been closed.
206    pub async fn is_closed(&self) -> bool {
207        let conn = self.connection.read().await;
208        conn.is_none()
209    }
210
211    // ========== Streaming API methods ==========
212
213    /// Open a new swap quote stream.
214    ///
215    /// Returns a `QuoteStream` handle that receives real-time quote updates.
216    /// Respects the server's concurrent stream limit; queues if at capacity.
217    #[tracing::instrument(skip_all)]
218    pub async fn new_swap_quote_stream(
219        &self,
220        request: SwapQuoteRequest,
221    ) -> Result<crate::stream::QuoteStream, TitanClientError> {
222        let manager = self.get_stream_manager().await?;
223        manager.request_stream(request).await
224    }
225
226    /// Get a one-shot swap quote by opening a stream, receiving the first quote, and closing.
227    ///
228    /// Uses `SwapQuoteRequest` inputs and returns full `SwapQuotes` with transaction instructions.
229    /// Respects `config.one_shot_timeout_ms` for the receive timeout.
230    #[tracing::instrument(skip_all)]
231    pub async fn get_swap_price(
232        &self,
233        request: SwapQuoteRequest,
234    ) -> Result<SwapQuotes, TitanClientError> {
235        let timeout_ms = self.config.one_shot_timeout_ms;
236        let timeout = Duration::from_millis(timeout_ms);
237
238        let mut stream = self.new_swap_quote_stream(request).await?;
239
240        let result = tokio::time::timeout(timeout, stream.recv()).await;
241
242        match result {
243            Ok(Some(quotes)) => {
244                let _ = stream.stop().await;
245                Ok(quotes)
246            }
247            Ok(None) => Err(TitanClientError::Unexpected(anyhow::anyhow!(
248                "Stream ended without sending quotes"
249            ))),
250            Err(_elapsed) => {
251                let _ = stream.stop().await;
252                Err(TitanClientError::Timeout {
253                    duration_ms: timeout_ms,
254                })
255            }
256        }
257    }
258
259    // ========== One-shot API methods ==========
260
261    /// Get server info and connection limits.
262    #[tracing::instrument(skip_all)]
263    pub async fn get_info(&self) -> Result<ServerInfo, TitanClientError> {
264        let connection = self.get_connection().await?;
265        let response = connection
266            .send_request(RequestData::GetInfo(GetInfoRequest { dummy: None }))
267            .await?;
268
269        match response.data {
270            ResponseData::GetInfo(info) => Ok(info),
271            other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
272                "Unexpected response type: expected GetInfo, got {:?}",
273                std::mem::discriminant(&other)
274            ))),
275        }
276    }
277
278    /// Get available venues.
279    #[tracing::instrument(skip_all)]
280    pub async fn get_venues(&self) -> Result<VenueInfo, TitanClientError> {
281        let connection = self.get_connection().await?;
282        let response = connection
283            .send_request(RequestData::GetVenues(GetVenuesRequest {
284                include_program_ids: Some(true),
285            }))
286            .await?;
287
288        match response.data {
289            ResponseData::GetVenues(venues) => Ok(venues),
290            other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
291                "Unexpected response type: expected GetVenues, got {:?}",
292                std::mem::discriminant(&other)
293            ))),
294        }
295    }
296
297    /// List available providers.
298    #[tracing::instrument(skip_all)]
299    pub async fn list_providers(&self) -> Result<Vec<ProviderInfo>, TitanClientError> {
300        let connection = self.get_connection().await?;
301        let response = connection
302            .send_request(RequestData::ListProviders(ListProvidersRequest {
303                include_icons: Some(true),
304            }))
305            .await?;
306
307        match response.data {
308            ResponseData::ListProviders(providers) => Ok(providers),
309            other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
310                "Unexpected response type: expected ListProviders, got {:?}",
311                std::mem::discriminant(&other)
312            ))),
313        }
314    }
315
316    /// Get a point-in-time swap price (simple request/response, no streaming).
317    #[tracing::instrument(skip_all)]
318    pub async fn get_swap_price_simple(
319        &self,
320        request: SwapPriceRequest,
321    ) -> Result<SwapPrice, TitanClientError> {
322        let connection = self.get_connection().await?;
323        let response = connection
324            .send_request(RequestData::GetSwapPrice(request))
325            .await?;
326
327        match response.data {
328            ResponseData::GetSwapPrice(price) => Ok(price),
329            other => Err(TitanClientError::Unexpected(anyhow::anyhow!(
330                "Unexpected response type: expected GetSwapPrice, got {:?}",
331                std::mem::discriminant(&other)
332            ))),
333        }
334    }
335}