titan_rust_client/
stream.rs1use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::sync::Arc;
5
6use titan_api_types::ws::v1::{RequestData, StopStreamRequest, SwapQuotes};
7use tokio::sync::mpsc;
8
9use crate::connection::Connection;
10use crate::queue::StreamManager;
11
12pub struct QuoteStream {
16 stream_id: u32,
17 effective_stream_id: Arc<AtomicU32>,
18 stopped_flag: Arc<AtomicBool>,
19 slot_released: Arc<AtomicBool>,
20 receiver: mpsc::Receiver<SwapQuotes>,
21 connection: Arc<Connection>,
22 manager: Option<Arc<StreamManager>>,
23 stopped: bool,
24}
25
26impl QuoteStream {
27 pub fn new(
29 stream_id: u32,
30 receiver: mpsc::Receiver<SwapQuotes>,
31 connection: Arc<Connection>,
32 ) -> Self {
33 Self {
34 stream_id,
35 effective_stream_id: Arc::new(AtomicU32::new(stream_id)),
36 stopped_flag: Arc::new(AtomicBool::new(false)),
37 slot_released: Arc::new(AtomicBool::new(false)),
38 receiver,
39 connection,
40 manager: None,
41 stopped: false,
42 }
43 }
44
45 pub fn new_managed(
47 stream_id: u32,
48 receiver: mpsc::Receiver<SwapQuotes>,
49 connection: Arc<Connection>,
50 manager: Option<Arc<StreamManager>>,
51 effective_stream_id: Arc<AtomicU32>,
52 stopped_flag: Arc<AtomicBool>,
53 slot_released: Arc<AtomicBool>,
54 ) -> Self {
55 Self {
56 stream_id,
57 effective_stream_id,
58 stopped_flag,
59 slot_released,
60 receiver,
61 connection,
62 manager,
63 stopped: false,
64 }
65 }
66
67 pub fn stream_id(&self) -> u32 {
69 self.stream_id
70 }
71
72 pub fn effective_stream_id(&self) -> u32 {
74 self.effective_stream_id.load(Ordering::SeqCst)
75 }
76
77 pub async fn recv(&mut self) -> Option<SwapQuotes> {
81 self.receiver.recv().await
82 }
83
84 pub async fn stop(&mut self) -> Result<(), crate::error::TitanClientError> {
89 if self.stopped {
90 return Ok(());
91 }
92 self.stopped = true;
93 self.stopped_flag.store(true, Ordering::SeqCst);
94
95 let mut ids = Vec::with_capacity(3);
96 let first_id = self.effective_stream_id.load(Ordering::SeqCst);
97 ids.push(first_id);
98
99 let second_id = self.effective_stream_id.load(Ordering::SeqCst);
100 if second_id != first_id {
101 ids.push(second_id);
102 }
103
104 if self.stream_id != first_id && self.stream_id != second_id {
105 ids.push(self.stream_id);
106 }
107
108 for id in ids {
109 let _ = self
111 .connection
112 .send_request(RequestData::StopStream(StopStreamRequest { id }))
113 .await;
114
115 self.connection.unregister_stream(id).await;
117 }
118
119 if self
121 .slot_released
122 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
123 .is_ok()
124 {
125 if let Some(ref manager) = self.manager {
126 manager.stream_ended();
127 }
128 }
129
130 Ok(())
131 }
132}
133
134impl Drop for QuoteStream {
135 fn drop(&mut self) {
136 if !self.stopped {
137 self.stopped_flag.store(true, Ordering::SeqCst);
138
139 let connection = self.connection.clone();
140 let manager = self.manager.clone();
141 let slot_released = self.slot_released.clone();
142 let effective_stream_id = self.effective_stream_id.clone();
143 let stream_id = self.stream_id;
144
145 if tokio::runtime::Handle::try_current().is_ok() {
146 tokio::spawn(async move {
147 let mut ids = Vec::with_capacity(3);
148 let first_id = effective_stream_id.load(Ordering::SeqCst);
149 ids.push(first_id);
150
151 let second_id = effective_stream_id.load(Ordering::SeqCst);
152
153 if second_id != first_id {
154 ids.push(second_id);
155 }
156
157 if stream_id != first_id && stream_id != second_id {
158 ids.push(stream_id);
159 }
160
161 for id in ids {
162 let _ = connection
164 .send_request(RequestData::StopStream(StopStreamRequest { id }))
165 .await;
166
167 connection.unregister_stream(id).await;
168 }
169
170 if slot_released
172 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
173 .is_ok()
174 {
175 if let Some(ref manager) = manager {
176 manager.stream_ended();
177 }
178 }
179 });
180 } else if slot_released
181 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
182 .is_ok()
183 {
184 if let Some(ref manager) = manager {
185 manager.stream_ended();
186 }
187 }
188 }
189 }
190}