Skip to main content

titan_rust_client/
stream.rs

1//! Stream management and types.
2
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::sync::Arc;
5
6use titan_api_types::ws::v1::{RequestData, StopStreamRequest, SwapQuotes};
7use tokio::sync::mpsc;
8
9use crate::connection::Connection;
10use crate::queue::StreamManager;
11
12/// A handle to an active quote stream.
13///
14/// When dropped, automatically sends `StopStream` to the server.
15pub struct QuoteStream {
16    stream_id: u32,
17    effective_stream_id: Arc<AtomicU32>,
18    stopped_flag: Arc<AtomicBool>,
19    slot_released: Arc<AtomicBool>,
20    receiver: mpsc::Receiver<SwapQuotes>,
21    connection: Arc<Connection>,
22    manager: Option<Arc<StreamManager>>,
23    stopped: bool,
24}
25
26impl QuoteStream {
27    /// Create a new quote stream handle (unmanaged, for backwards compatibility).
28    pub fn new(
29        stream_id: u32,
30        receiver: mpsc::Receiver<SwapQuotes>,
31        connection: Arc<Connection>,
32    ) -> Self {
33        Self {
34            stream_id,
35            effective_stream_id: Arc::new(AtomicU32::new(stream_id)),
36            stopped_flag: Arc::new(AtomicBool::new(false)),
37            slot_released: Arc::new(AtomicBool::new(false)),
38            receiver,
39            connection,
40            manager: None,
41            stopped: false,
42        }
43    }
44
45    /// Create a new managed quote stream handle.
46    pub fn new_managed(
47        stream_id: u32,
48        receiver: mpsc::Receiver<SwapQuotes>,
49        connection: Arc<Connection>,
50        manager: Option<Arc<StreamManager>>,
51        effective_stream_id: Arc<AtomicU32>,
52        stopped_flag: Arc<AtomicBool>,
53        slot_released: Arc<AtomicBool>,
54    ) -> Self {
55        Self {
56            stream_id,
57            effective_stream_id,
58            stopped_flag,
59            slot_released,
60            receiver,
61            connection,
62            manager,
63            stopped: false,
64        }
65    }
66
67    /// Get the original stream ID assigned at creation.
68    pub fn stream_id(&self) -> u32 {
69        self.stream_id
70    }
71
72    /// Get the current effective stream ID (may differ after reconnection).
73    pub fn effective_stream_id(&self) -> u32 {
74        self.effective_stream_id.load(Ordering::SeqCst)
75    }
76
77    /// Receive the next quote update.
78    ///
79    /// Returns `None` when the stream ends.
80    pub async fn recv(&mut self) -> Option<SwapQuotes> {
81        self.receiver.recv().await
82    }
83
84    /// Explicitly stop the stream.
85    ///
86    /// This is called automatically on drop, but can be called manually
87    /// if you want to handle the result.
88    pub async fn stop(&mut self) -> Result<(), crate::error::TitanClientError> {
89        if self.stopped {
90            return Ok(());
91        }
92        self.stopped = true;
93        self.stopped_flag.store(true, Ordering::SeqCst);
94
95        let mut ids = Vec::with_capacity(3);
96        let first_id = self.effective_stream_id.load(Ordering::SeqCst);
97        ids.push(first_id);
98
99        let second_id = self.effective_stream_id.load(Ordering::SeqCst);
100        if second_id != first_id {
101            ids.push(second_id);
102        }
103
104        if self.stream_id != first_id && self.stream_id != second_id {
105            ids.push(self.stream_id);
106        }
107
108        for id in ids {
109            // Send stop request first so server releases the stream before we free the slot
110            let _ = self
111                .connection
112                .send_request(RequestData::StopStream(StopStreamRequest { id }))
113                .await;
114
115            // Unregister from connection
116            self.connection.unregister_stream(id).await;
117        }
118
119        // CAS guard: only the first path to release the slot wins
120        if self
121            .slot_released
122            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
123            .is_ok()
124        {
125            if let Some(ref manager) = self.manager {
126                manager.stream_ended();
127            }
128        }
129
130        Ok(())
131    }
132}
133
134impl Drop for QuoteStream {
135    fn drop(&mut self) {
136        if !self.stopped {
137            self.stopped_flag.store(true, Ordering::SeqCst);
138
139            let connection = self.connection.clone();
140            let manager = self.manager.clone();
141            let slot_released = self.slot_released.clone();
142            let effective_stream_id = self.effective_stream_id.clone();
143            let stream_id = self.stream_id;
144
145            if tokio::runtime::Handle::try_current().is_ok() {
146                tokio::spawn(async move {
147                    let mut ids = Vec::with_capacity(3);
148                    let first_id = effective_stream_id.load(Ordering::SeqCst);
149                    ids.push(first_id);
150
151                    let second_id = effective_stream_id.load(Ordering::SeqCst);
152
153                    if second_id != first_id {
154                        ids.push(second_id);
155                    }
156
157                    if stream_id != first_id && stream_id != second_id {
158                        ids.push(stream_id);
159                    }
160
161                    for id in ids {
162                        // Send stop request first so server releases the stream
163                        let _ = connection
164                            .send_request(RequestData::StopStream(StopStreamRequest { id }))
165                            .await;
166
167                        connection.unregister_stream(id).await;
168                    }
169
170                    // CAS guard: only the first path to release the slot wins
171                    if slot_released
172                        .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
173                        .is_ok()
174                    {
175                        if let Some(ref manager) = manager {
176                            manager.stream_ended();
177                        }
178                    }
179                });
180            } else if slot_released
181                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
182                .is_ok()
183            {
184                if let Some(ref manager) = manager {
185                    manager.stream_ended();
186                }
187            }
188        }
189    }
190}