1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//! `Client::stream()` entry point for WebSocket streaming sessions.
use tracing::instrument;
use crate::stream_session::transport::{TungsteniteTransport, WsTransport};
use crate::stream_session::{SessionCredentials, StreamingSession};
use crate::{Client, Error};
impl Client {
/// Open a WebSocket streaming session for real-time Schwab streaming data.
///
/// Fetches the user preference to obtain streamer connection details,
/// establishes a WebSocket connection, logs in, and returns a
/// [`StreamingSession`] ready to accept subscriptions.
///
/// # Errors
///
/// Returns [`crate::Error::AuthRequired`] if no bearer token is configured.
/// Returns [`crate::Error::MissingRequiredParameter`] if user preferences
/// do not contain the required streamer connection fields.
/// Returns [`crate::Error::WebSocket`] if the WebSocket connection fails.
/// Returns [`crate::Error::StreamLogin`] if the server rejects the login.
///
/// # Examples
///
/// ```no_run
/// # async fn example() -> schwab::Result<()> {
/// use schwab::{Client, Config};
/// use schwab::EquityField;
///
/// let config = Config::new().bearer_token("my_token");
/// let client = Client::new(config);
///
/// let session = client.stream().await?;
/// let mut events = session.subscribe();
/// session.subscribe_equities(&["AAPL"], &[EquityField::BidPrice, EquityField::AskPrice]).await?;
/// # Ok(())
/// # }
/// ```
#[instrument(skip_all)]
pub async fn stream(&self) -> crate::Result<StreamingSession> {
let bearer_token = self
.config
.bearer_token
.clone()
.ok_or(Error::AuthRequired)?;
let prefs = self.get_user_preference().await?;
let pref = prefs
.into_iter()
.next()
.ok_or_else(|| Error::MissingRequiredParameter("no user preferences returned"))?;
let streamer_info_vec = pref.streamer_info.ok_or_else(|| {
Error::MissingRequiredParameter("streamer_info missing from user preference")
})?;
let info = streamer_info_vec
.into_iter()
.next()
.ok_or_else(|| Error::MissingRequiredParameter("streamer_info list is empty"))?;
let customer_id = info.schwab_client_customer_id.ok_or_else(|| {
Error::MissingRequiredParameter("schwab_client_customer_id missing from streamer_info")
})?;
let correl_id = info.schwab_client_correl_id.ok_or_else(|| {
Error::MissingRequiredParameter("schwab_client_correl_id missing from streamer_info")
})?;
let channel = info.schwab_client_channel.ok_or_else(|| {
Error::MissingRequiredParameter("schwab_client_channel missing from streamer_info")
})?;
let function_id = info.schwab_client_function_id.ok_or_else(|| {
Error::MissingRequiredParameter("schwab_client_function_id missing from streamer_info")
})?;
let socket_url = info.streamer_socket_url.ok_or_else(|| {
Error::MissingRequiredParameter("streamer_socket_url missing from streamer_info")
})?;
let credentials = SessionCredentials {
customer_id,
correl_id,
channel,
function_id,
bearer_token,
socket_url: socket_url.clone(),
};
let transport = TungsteniteTransport::connect(&socket_url).await?;
StreamingSession::new(transport, credentials).await
}
}