titan_rust_client/
stream.rs1use std::sync::Arc;
4
5use titan_api_types::ws::v1::{RequestData, StopStreamRequest, SwapQuotes};
6use tokio::sync::mpsc;
7
8use crate::connection::Connection;
9use crate::queue::StreamManager;
10
11pub struct QuoteStream {
15 stream_id: u32,
16 receiver: mpsc::Receiver<SwapQuotes>,
17 connection: Arc<Connection>,
18 manager: Option<Arc<StreamManager>>,
19 stopped: bool,
20}
21
22impl QuoteStream {
23 pub fn new(
25 stream_id: u32,
26 receiver: mpsc::Receiver<SwapQuotes>,
27 connection: Arc<Connection>,
28 ) -> Self {
29 Self {
30 stream_id,
31 receiver,
32 connection,
33 manager: None,
34 stopped: false,
35 }
36 }
37
38 pub fn new_managed(
40 stream_id: u32,
41 receiver: mpsc::Receiver<SwapQuotes>,
42 connection: Arc<Connection>,
43 manager: Option<Arc<StreamManager>>,
44 ) -> Self {
45 Self {
46 stream_id,
47 receiver,
48 connection,
49 manager,
50 stopped: false,
51 }
52 }
53
54 pub fn stream_id(&self) -> u32 {
56 self.stream_id
57 }
58
59 pub async fn recv(&mut self) -> Option<SwapQuotes> {
63 self.receiver.recv().await
64 }
65
66 pub async fn stop(&mut self) -> Result<(), crate::error::TitanClientError> {
71 if self.stopped {
72 return Ok(());
73 }
74 self.stopped = true;
75
76 if let Some(ref manager) = self.manager {
78 manager.stream_ended();
79 }
80
81 self.connection.unregister_stream(self.stream_id).await;
83
84 let _ = self
86 .connection
87 .send_request(RequestData::StopStream(StopStreamRequest {
88 id: self.stream_id,
89 }))
90 .await;
91
92 Ok(())
93 }
94}
95
96impl Drop for QuoteStream {
97 fn drop(&mut self) {
98 if !self.stopped {
99 let stream_id = self.stream_id;
100 let connection = self.connection.clone();
101 let manager = self.manager.clone();
102
103 tokio::spawn(async move {
105 if let Some(ref manager) = manager {
107 manager.stream_ended();
108 }
109
110 connection.unregister_stream(stream_id).await;
111 let _ = connection
112 .send_request(RequestData::StopStream(StopStreamRequest { id: stream_id }))
113 .await;
114 });
115 }
116 }
117}