Skip to main content

titan_rust_client/
stream.rs

1//! Stream management and types.
2
3use std::sync::Arc;
4
5use titan_api_types::ws::v1::{RequestData, StopStreamRequest, SwapQuotes};
6use tokio::sync::mpsc;
7
8use crate::connection::Connection;
9use crate::queue::StreamManager;
10
11/// A handle to an active quote stream.
12///
13/// When dropped, automatically sends `StopStream` to the server.
14pub struct QuoteStream {
15    stream_id: u32,
16    receiver: mpsc::Receiver<SwapQuotes>,
17    connection: Arc<Connection>,
18    manager: Option<Arc<StreamManager>>,
19    stopped: bool,
20}
21
22impl QuoteStream {
23    /// Create a new quote stream handle (unmanaged, for backwards compatibility).
24    pub fn new(
25        stream_id: u32,
26        receiver: mpsc::Receiver<SwapQuotes>,
27        connection: Arc<Connection>,
28    ) -> Self {
29        Self {
30            stream_id,
31            receiver,
32            connection,
33            manager: None,
34            stopped: false,
35        }
36    }
37
38    /// Create a new managed quote stream handle.
39    pub fn new_managed(
40        stream_id: u32,
41        receiver: mpsc::Receiver<SwapQuotes>,
42        connection: Arc<Connection>,
43        manager: Option<Arc<StreamManager>>,
44    ) -> Self {
45        Self {
46            stream_id,
47            receiver,
48            connection,
49            manager,
50            stopped: false,
51        }
52    }
53
54    /// Get the stream ID.
55    pub fn stream_id(&self) -> u32 {
56        self.stream_id
57    }
58
59    /// Receive the next quote update.
60    ///
61    /// Returns `None` when the stream ends.
62    pub async fn recv(&mut self) -> Option<SwapQuotes> {
63        self.receiver.recv().await
64    }
65
66    /// Explicitly stop the stream.
67    ///
68    /// This is called automatically on drop, but can be called manually
69    /// if you want to handle the result.
70    pub async fn stop(&mut self) -> Result<(), crate::error::TitanClientError> {
71        if self.stopped {
72            return Ok(());
73        }
74        self.stopped = true;
75
76        // Notify manager that slot is freed
77        if let Some(ref manager) = self.manager {
78            manager.stream_ended();
79        }
80
81        // Unregister from connection
82        self.connection.unregister_stream(self.stream_id).await;
83
84        // Send stop request (fire and forget - we don't care about the response)
85        let _ = self
86            .connection
87            .send_request(RequestData::StopStream(StopStreamRequest {
88                id: self.stream_id,
89            }))
90            .await;
91
92        Ok(())
93    }
94}
95
96impl Drop for QuoteStream {
97    fn drop(&mut self) {
98        if !self.stopped {
99            let stream_id = self.stream_id;
100            let connection = self.connection.clone();
101            let manager = self.manager.clone();
102
103            // Spawn a task to stop the stream
104            tokio::spawn(async move {
105                // Notify manager that slot is freed
106                if let Some(ref manager) = manager {
107                    manager.stream_ended();
108                }
109
110                connection.unregister_stream(stream_id).await;
111                let _ = connection
112                    .send_request(RequestData::StopStream(StopStreamRequest { id: stream_id }))
113                    .await;
114            });
115        }
116    }
117}