titan_rust_client/
queue.rs1use std::collections::VecDeque;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::sync::Arc;
6
7use titan_api_types::ws::v1::{RequestData, StreamDataPayload, SwapQuoteRequest, SwapQuotes};
8use tokio::sync::{mpsc, oneshot, Mutex, Notify};
9
10use crate::connection::Connection;
11use crate::error::TitanClientError;
12use crate::stream::QuoteStream;
13
14struct QueuedRequest {
16 request: SwapQuoteRequest,
17 result_tx: oneshot::Sender<Result<QuoteStream, TitanClientError>>,
18}
19
20pub struct StreamManager {
22 max_concurrent: AtomicU32,
23 active_count: AtomicU32,
24 queue: Mutex<VecDeque<QueuedRequest>>,
25 connection: Arc<Connection>,
26 slot_available: Notify,
27}
28
29impl StreamManager {
30 pub fn new(connection: Arc<Connection>, max_concurrent: u32) -> Arc<Self> {
32 Arc::new(Self {
33 max_concurrent: AtomicU32::new(max_concurrent),
34 active_count: AtomicU32::new(0),
35 queue: Mutex::new(VecDeque::new()),
36 connection,
37 slot_available: Notify::new(),
38 })
39 }
40
41 pub fn set_max_concurrent(&self, max: u32) {
43 self.max_concurrent.store(max, Ordering::SeqCst);
44 self.slot_available.notify_waiters();
46 }
47
48 pub fn active_count(&self) -> u32 {
50 self.active_count.load(Ordering::SeqCst)
51 }
52
53 pub async fn queue_len(&self) -> usize {
55 self.queue.lock().await.len()
56 }
57
58 #[tracing::instrument(skip_all)]
60 pub async fn request_stream(
61 self: &Arc<Self>,
62 request: SwapQuoteRequest,
63 ) -> Result<QuoteStream, TitanClientError> {
64 let max = self.max_concurrent.load(Ordering::SeqCst);
66 let current = self.active_count.load(Ordering::SeqCst);
67
68 if current < max {
69 if self
71 .active_count
72 .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
73 .is_ok()
74 {
75 return self.start_stream_internal(request).await;
76 }
77 }
78
79 let (result_tx, result_rx) = oneshot::channel();
81 {
82 let mut queue = self.queue.lock().await;
83 queue.push_back(QueuedRequest { request, result_tx });
84 }
85
86 let manager = self.clone();
88 tokio::spawn(async move {
89 manager.process_queue().await;
90 });
91
92 result_rx.await.map_err(|_| {
94 TitanClientError::Unexpected(anyhow::anyhow!("Stream request cancelled"))
95 })?
96 }
97
98 pub fn stream_ended(&self) {
100 self.active_count.fetch_sub(1, Ordering::SeqCst);
101 self.slot_available.notify_one();
102 }
103
104 async fn process_queue(self: &Arc<Self>) {
106 loop {
107 let max = self.max_concurrent.load(Ordering::SeqCst);
108 let current = self.active_count.load(Ordering::SeqCst);
109
110 if current >= max {
111 self.slot_available.notified().await;
113 continue;
114 }
115
116 let queued = {
118 let mut queue = self.queue.lock().await;
119 queue.pop_front()
120 };
121
122 let Some(queued) = queued else {
123 break;
125 };
126
127 if self
129 .active_count
130 .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
131 .is_err()
132 {
133 let mut queue = self.queue.lock().await;
135 queue.push_front(queued);
136 continue;
137 }
138
139 let result = self.start_stream_internal(queued.request).await;
141 let _ = queued.result_tx.send(result);
142 }
143 }
144
145 async fn start_stream_internal(
147 self: &Arc<Self>,
148 request: SwapQuoteRequest,
149 ) -> Result<QuoteStream, TitanClientError> {
150 let response = self
151 .connection
152 .send_request(RequestData::NewSwapQuoteStream(request.clone()))
153 .await
154 .inspect_err(|_| {
155 self.active_count.fetch_sub(1, Ordering::SeqCst);
157 self.slot_available.notify_one();
158 })?;
159
160 let stream_id = response
161 .stream
162 .ok_or_else(|| {
163 self.active_count.fetch_sub(1, Ordering::SeqCst);
164 self.slot_available.notify_one();
165 TitanClientError::Unexpected(anyhow::anyhow!(
166 "NewSwapQuoteStream response missing stream info"
167 ))
168 })?
169 .id;
170
171 let (raw_tx, mut raw_rx) = mpsc::channel::<titan_api_types::ws::v1::StreamData>(32);
173 let (quotes_tx, quotes_rx) = mpsc::channel::<SwapQuotes>(32);
174
175 self.connection
177 .register_stream(stream_id, request, raw_tx)
178 .await;
179
180 let adapter_connection = self.connection.clone();
182 tokio::spawn(async move {
183 while let Some(data) = raw_rx.recv().await {
184 match data.payload {
185 StreamDataPayload::SwapQuotes(quotes) => {
186 if quotes_tx.send(quotes).await.is_err() {
187 adapter_connection.unregister_stream(stream_id).await;
188 break;
189 }
190 }
191 #[allow(unreachable_patterns)]
192 _ => {
193 tracing::warn!("Received unexpected stream data payload type");
194 }
195 }
196 }
197 });
198
199 Ok(QuoteStream::new_managed(
200 stream_id,
201 quotes_rx,
202 self.connection.clone(),
203 Some(self.clone()),
204 ))
205 }
206}