Skip to main content

titan_rust_client/
queue.rs

1//! Stream queue management for concurrency limits.
2
3use std::collections::VecDeque;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::sync::Arc;
6
7use titan_api_types::ws::v1::{RequestData, StreamDataPayload, SwapQuoteRequest, SwapQuotes};
8use tokio::sync::{mpsc, oneshot, Mutex, Notify};
9
10use crate::connection::Connection;
11use crate::error::TitanClientError;
12use crate::stream::QuoteStream;
13
14/// Queued stream request waiting to be started.
15struct QueuedRequest {
16    request: SwapQuoteRequest,
17    result_tx: oneshot::Sender<Result<QuoteStream, TitanClientError>>,
18}
19
20/// Manages stream concurrency and queuing.
21pub struct StreamManager {
22    max_concurrent: AtomicU32,
23    active_count: AtomicU32,
24    queue: Mutex<VecDeque<QueuedRequest>>,
25    connection: Arc<Connection>,
26    slot_available: Notify,
27}
28
29impl StreamManager {
30    /// Create a new stream manager.
31    pub fn new(connection: Arc<Connection>, max_concurrent: u32) -> Arc<Self> {
32        Arc::new(Self {
33            max_concurrent: AtomicU32::new(max_concurrent),
34            active_count: AtomicU32::new(0),
35            queue: Mutex::new(VecDeque::new()),
36            connection,
37            slot_available: Notify::new(),
38        })
39    }
40
41    /// Update the max concurrent streams limit.
42    pub fn set_max_concurrent(&self, max: u32) {
43        self.max_concurrent.store(max, Ordering::SeqCst);
44        // Notify in case we can now start more streams
45        self.slot_available.notify_waiters();
46    }
47
48    /// Get current active stream count.
49    pub fn active_count(&self) -> u32 {
50        self.active_count.load(Ordering::SeqCst)
51    }
52
53    /// Get current queue length.
54    pub async fn queue_len(&self) -> usize {
55        self.queue.lock().await.len()
56    }
57
58    /// Request a new stream. May wait in queue if at concurrency limit.
59    #[tracing::instrument(skip_all)]
60    pub async fn request_stream(
61        self: &Arc<Self>,
62        request: SwapQuoteRequest,
63    ) -> Result<QuoteStream, TitanClientError> {
64        // Try to start immediately if under limit
65        let max = self.max_concurrent.load(Ordering::SeqCst);
66        let current = self.active_count.load(Ordering::SeqCst);
67
68        if current < max {
69            // Try to claim a slot
70            if self
71                .active_count
72                .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
73                .is_ok()
74            {
75                return self.start_stream_internal(request).await;
76            }
77        }
78
79        // Queue the request and wait
80        let (result_tx, result_rx) = oneshot::channel();
81        {
82            let mut queue = self.queue.lock().await;
83            queue.push_back(QueuedRequest { request, result_tx });
84        }
85
86        // Spawn task to process queue when slot becomes available
87        let manager = self.clone();
88        tokio::spawn(async move {
89            manager.process_queue().await;
90        });
91
92        // Wait for our turn
93        result_rx.await.map_err(|_| {
94            TitanClientError::Unexpected(anyhow::anyhow!("Stream request cancelled"))
95        })?
96    }
97
98    /// Called when a stream ends to free up a slot.
99    pub fn stream_ended(&self) {
100        self.active_count.fetch_sub(1, Ordering::SeqCst);
101        self.slot_available.notify_one();
102    }
103
104    /// Process queued requests when slots become available.
105    async fn process_queue(self: &Arc<Self>) {
106        loop {
107            let max = self.max_concurrent.load(Ordering::SeqCst);
108            let current = self.active_count.load(Ordering::SeqCst);
109
110            if current >= max {
111                // Wait for a slot
112                self.slot_available.notified().await;
113                continue;
114            }
115
116            // Try to get next queued request
117            let queued = {
118                let mut queue = self.queue.lock().await;
119                queue.pop_front()
120            };
121
122            let Some(queued) = queued else {
123                // Queue empty, done
124                break;
125            };
126
127            // Claim a slot
128            if self
129                .active_count
130                .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
131                .is_err()
132            {
133                // Lost race, re-queue and retry
134                let mut queue = self.queue.lock().await;
135                queue.push_front(queued);
136                continue;
137            }
138
139            // Start the stream
140            let result = self.start_stream_internal(queued.request).await;
141            let _ = queued.result_tx.send(result);
142        }
143    }
144
145    /// Internal: actually start a stream (slot must already be claimed).
146    async fn start_stream_internal(
147        self: &Arc<Self>,
148        request: SwapQuoteRequest,
149    ) -> Result<QuoteStream, TitanClientError> {
150        let response = self
151            .connection
152            .send_request(RequestData::NewSwapQuoteStream(request.clone()))
153            .await
154            .inspect_err(|_| {
155                // Release slot on error
156                self.active_count.fetch_sub(1, Ordering::SeqCst);
157                self.slot_available.notify_one();
158            })?;
159
160        let stream_id = response
161            .stream
162            .ok_or_else(|| {
163                self.active_count.fetch_sub(1, Ordering::SeqCst);
164                self.slot_available.notify_one();
165                TitanClientError::Unexpected(anyhow::anyhow!(
166                    "NewSwapQuoteStream response missing stream info"
167                ))
168            })?
169            .id;
170
171        // Create channels for stream data
172        let (raw_tx, mut raw_rx) = mpsc::channel::<titan_api_types::ws::v1::StreamData>(32);
173        let (quotes_tx, quotes_rx) = mpsc::channel::<SwapQuotes>(32);
174
175        // Register the raw stream with the connection (includes request for resumption)
176        self.connection
177            .register_stream(stream_id, request, raw_tx)
178            .await;
179
180        // Spawn adapter task
181        let adapter_connection = self.connection.clone();
182        tokio::spawn(async move {
183            while let Some(data) = raw_rx.recv().await {
184                match data.payload {
185                    StreamDataPayload::SwapQuotes(quotes) => {
186                        if quotes_tx.send(quotes).await.is_err() {
187                            adapter_connection.unregister_stream(stream_id).await;
188                            break;
189                        }
190                    }
191                    StreamDataPayload::Other(_) => {
192                        tracing::warn!("Received unexpected stream data payload type");
193                    }
194                }
195            }
196        });
197
198        Ok(QuoteStream::new_managed(
199            stream_id,
200            quotes_rx,
201            self.connection.clone(),
202            Some(self.clone()),
203        ))
204    }
205}