trading_ig/streaming/client.rs
1//! High-level streaming client returned by [`crate::IgClient::streaming`].
2//!
3//! Obtain a [`StreamingApi`] from [`crate::IgClient::streaming`], then call
4//! [`StreamingApi::connect`] or [`StreamingApi::connect_with`] to open a
5//! Lightstreamer session and receive a [`StreamingClient`].
6//!
7//! # Example
8//!
9//! ```no_run
10//! # use trading_ig::{IgClient, Environment, Credentials};
11//! # async fn run() -> trading_ig::Result<()> {
12//! # let client = IgClient::builder()
13//! # .environment(Environment::Demo)
14//! # .api_key("key")
15//! # .credentials(Credentials::password("u", "p"))
16//! # .build()?;
17//! client.session().login_v2().await?;
18//! let (stream, _events) = client.streaming().connect_with(Default::default()).await?;
19//! let mut rx = stream.subscribe_market("CS.D.GBPUSD.TODAY.IP").await?;
20//! while let Some(update) = rx.recv().await {
21//! println!("{} bid={:?}", update.epic, update.bid);
22//! }
23//! # Ok(()) }
24//! ```
25
26use tokio::sync::{mpsc, watch};
27use tracing::instrument;
28
29use crate::IgClient;
30use crate::error::Result;
31use crate::session::{AuthTokens, SessionHandle};
32use crate::streaming::connection::{CreateParams, LsConnection};
33use crate::streaming::events::{
34 AccountUpdate, CandleScale, ChartCandleUpdate, ChartTickUpdate, MarketUpdate, TradeUpdate,
35};
36use crate::streaming::reconnect::{AutoReconnect, StreamingEvent};
37use crate::streaming::subscription::{Registry, SubscriptionKind};
38
39/// Subscription channel capacity. Keep modest so a slow consumer causes
40/// back-pressure rather than unbounded queue growth.
41const CHANNEL_CAP: usize = 256;
42
43/// Capacity of the optional lifecycle-event channel.
44const EVENT_CHAN_CAP: usize = 64;
45
46// ---------------------------------------------------------------------------
47// StreamingApi — accessor on IgClient
48// ---------------------------------------------------------------------------
49
50/// Entry point for streaming. Obtain via [`crate::IgClient::streaming`].
51#[derive(Debug)]
52pub struct StreamingApi<'a> {
53 pub(crate) client: &'a IgClient,
54}
55
56impl StreamingApi<'_> {
57 /// Connect to the Lightstreamer streaming endpoint with the default
58 /// [`AutoReconnect`] policy (enabled, 5 attempts, 1 s–30 s back-off).
59 ///
60 /// The underlying session must already be authenticated before calling
61 /// this method. For a **v3 (OAuth)** session, call
62 /// `client.session().read(true).await?` first so that CST/XST tokens
63 /// are stored locally — Lightstreamer requires the
64 /// `CST-<cst>|XST-<xst>` password format regardless of auth flavour.
65 ///
66 /// Returns `(StreamingClient, Receiver<StreamingEvent>)`. The event
67 /// channel emits [`StreamingEvent::Reconnected`],
68 /// [`StreamingEvent::ReconnectFailed`], and
69 /// [`StreamingEvent::Disconnected`] lifecycle events.
70 ///
71 /// Equivalent to `connect_with(AutoReconnect::default())`.
72 #[instrument(skip_all, name = "streaming.connect")]
73 pub async fn connect(&self) -> Result<(StreamingClient, mpsc::Receiver<StreamingEvent>)> {
74 self.connect_with(AutoReconnect::default()).await
75 }
76
77 /// Connect with an explicit [`AutoReconnect`] policy.
78 ///
79 /// Set `policy.enabled = false` to disable auto-reconnect and get the
80 /// pre-reconnect behaviour: the stream terminates on `END` and all
81 /// subscriber channels close.
82 ///
83 /// Returns `(StreamingClient, Receiver<StreamingEvent>)`.
84 #[instrument(skip_all, name = "streaming.connect_with")]
85 pub async fn connect_with(
86 &self,
87 policy: AutoReconnect,
88 ) -> Result<(StreamingClient, mpsc::Receiver<StreamingEvent>)> {
89 let state = self.client.session.require_authenticated().await?;
90
91 let account_id = state.account_id.ok_or_else(|| {
92 crate::error::Error::Auth(
93 "no account ID in session — call session().login() first".into(),
94 )
95 })?;
96 let endpoint = state.lightstreamer_endpoint.ok_or_else(|| {
97 crate::error::Error::Auth(
98 "no Lightstreamer endpoint in session — call session().login() first".into(),
99 )
100 })?;
101
102 // Build Lightstreamer password from CST/XST tokens.
103 // OAuth sessions need CST/XST — caller must call read(true) first to
104 // exchange the OAuth token for a CST/XST pair.
105 let password = match state.tokens.as_ref() {
106 Some(AuthTokens::Cst {
107 cst,
108 x_security_token,
109 }) => format!("CST-{cst}|XST-{x_security_token}"),
110 Some(AuthTokens::OAuth { .. }) => {
111 return Err(crate::error::Error::Auth(
112 "OAuth session cannot be used directly for streaming. \
113 Call client.session().read(true).await? first to obtain \
114 CST/XST tokens, then call streaming().connect() again."
115 .into(),
116 ));
117 }
118 None => return Err(crate::error::Error::Auth("no active session tokens".into())),
119 };
120
121 let registry = Registry::new();
122 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
123 let (event_tx, event_rx) = mpsc::channel(EVENT_CHAN_CAP);
124
125 // Build a SessionHandle so the reconnect path can call login_v2().
126 let session_handle = SessionHandle {
127 transport: self.client.transport.clone(),
128 session: self.client.session.clone(),
129 credentials: self.client.credentials.clone(),
130 };
131
132 let conn = LsConnection::create(CreateParams {
133 endpoint,
134 username: account_id,
135 password,
136 registry: registry.clone(),
137 shutdown_tx: shutdown_tx.clone(),
138 policy,
139 event_tx: Some(event_tx),
140 session_handle,
141 })
142 .await?;
143
144 let client = StreamingClient {
145 conn,
146 registry,
147 shutdown_tx,
148 };
149 Ok((client, event_rx))
150 }
151}
152
153// ---------------------------------------------------------------------------
154// StreamingClient
155// ---------------------------------------------------------------------------
156
157/// A live Lightstreamer session with active subscriptions.
158///
159/// Obtained via [`StreamingApi::connect`] or [`StreamingApi::connect_with`].
160/// All subscription methods return a `tokio::sync::mpsc::Receiver<T>`.
161/// Dropping the receiver automatically cancels the subscription server-side
162/// the next time the server sends an update for that item.
163///
164/// Call [`StreamingClient::disconnect`] to cleanly tear down the session.
165#[derive(Debug)]
166pub struct StreamingClient {
167 conn: LsConnection,
168 registry: Registry,
169 shutdown_tx: watch::Sender<bool>,
170}
171
172impl StreamingClient {
173 // ------------------------------------------------------------------
174 // Market
175 // ------------------------------------------------------------------
176
177 /// Subscribe to market price updates for `epic`.
178 ///
179 /// Returns a `Receiver<MarketUpdate>`. Each received value is a snapshot
180 /// of all changed fields merged with the previous state — no field is
181 /// ever "missing".
182 #[instrument(skip(self), fields(%epic))]
183 pub async fn subscribe_market(&self, epic: &str) -> Result<mpsc::Receiver<MarketUpdate>> {
184 use crate::streaming::events::MARKET_FIELDS;
185 let (tx, rx) = mpsc::channel(CHANNEL_CAP);
186 let idx = self.registry.register(SubscriptionKind::Market {
187 epic: epic.to_owned(),
188 tx,
189 });
190 let item = format!("MARKET:{epic}");
191 let fields = MARKET_FIELDS.join(" ");
192 self.conn
193 .control("add", idx, &item, &fields, "MERGE")
194 .await?;
195 Ok(rx)
196 }
197
198 // ------------------------------------------------------------------
199 // Chart tick
200 // ------------------------------------------------------------------
201
202 /// Subscribe to chart tick data for `epic`.
203 ///
204 /// Returns a `Receiver<ChartTickUpdate>`. This is a `DISTINCT`-mode
205 /// subscription — every message is a fresh tick, not a merge.
206 #[instrument(skip(self), fields(%epic))]
207 pub async fn subscribe_chart_tick(
208 &self,
209 epic: &str,
210 ) -> Result<mpsc::Receiver<ChartTickUpdate>> {
211 use crate::streaming::events::CHART_TICK_FIELDS;
212 let (tx, rx) = mpsc::channel(CHANNEL_CAP);
213 let idx = self.registry.register(SubscriptionKind::ChartTick {
214 epic: epic.to_owned(),
215 tx,
216 });
217 let item = format!("CHART:{epic}:TICK");
218 let fields = CHART_TICK_FIELDS.join(" ");
219 self.conn
220 .control("add", idx, &item, &fields, "DISTINCT")
221 .await?;
222 Ok(rx)
223 }
224
225 // ------------------------------------------------------------------
226 // Chart candle
227 // ------------------------------------------------------------------
228
229 /// Subscribe to OHLC candle data for `epic` at `scale`.
230 ///
231 /// Returns a `Receiver<ChartCandleUpdate>`. This is a `MERGE`-mode
232 /// subscription — fields are merged across updates for the current candle.
233 #[instrument(skip(self), fields(%epic, scale = %scale))]
234 pub async fn subscribe_chart_candle(
235 &self,
236 epic: &str,
237 scale: CandleScale,
238 ) -> Result<mpsc::Receiver<ChartCandleUpdate>> {
239 use crate::streaming::events::CHART_CANDLE_FIELDS;
240 let (tx, rx) = mpsc::channel(CHANNEL_CAP);
241 let idx = self.registry.register(SubscriptionKind::ChartCandle {
242 epic: epic.to_owned(),
243 scale,
244 tx,
245 });
246 let item = format!("CHART:{epic}:{}", scale.as_str());
247 let fields = CHART_CANDLE_FIELDS.join(" ");
248 self.conn
249 .control("add", idx, &item, &fields, "MERGE")
250 .await?;
251 Ok(rx)
252 }
253
254 // ------------------------------------------------------------------
255 // Account
256 // ------------------------------------------------------------------
257
258 /// Subscribe to account balance and margin updates.
259 ///
260 /// Returns a `Receiver<AccountUpdate>`.
261 #[instrument(skip(self), fields(%account_id))]
262 pub async fn subscribe_account(
263 &self,
264 account_id: &str,
265 ) -> Result<mpsc::Receiver<AccountUpdate>> {
266 use crate::streaming::events::ACCOUNT_FIELDS;
267 let (tx, rx) = mpsc::channel(CHANNEL_CAP);
268 let idx = self.registry.register(SubscriptionKind::Account {
269 account_id: account_id.to_owned(),
270 tx,
271 });
272 let item = format!("ACCOUNT:{account_id}");
273 let fields = ACCOUNT_FIELDS.join(" ");
274 self.conn
275 .control("add", idx, &item, &fields, "MERGE")
276 .await?;
277 Ok(rx)
278 }
279
280 // ------------------------------------------------------------------
281 // Trade
282 // ------------------------------------------------------------------
283
284 /// Subscribe to trade confirmations and working-order updates.
285 ///
286 /// Returns a `Receiver<TradeUpdate>`.
287 #[instrument(skip(self), fields(%account_id))]
288 pub async fn subscribe_trade(&self, account_id: &str) -> Result<mpsc::Receiver<TradeUpdate>> {
289 use crate::streaming::events::TRADE_FIELDS;
290 let (tx, rx) = mpsc::channel(CHANNEL_CAP);
291 let idx = self.registry.register(SubscriptionKind::Trade {
292 account_id: account_id.to_owned(),
293 tx,
294 });
295 let item = format!("TRADE:{account_id}");
296 let fields = TRADE_FIELDS.join(" ");
297 self.conn
298 .control("add", idx, &item, &fields, "DISTINCT")
299 .await?;
300 Ok(rx)
301 }
302
303 // ------------------------------------------------------------------
304 // Lifecycle
305 // ------------------------------------------------------------------
306
307 /// Disconnect from Lightstreamer and stop the background read-loop task.
308 ///
309 /// After this call all pending `Receiver`s will no longer receive updates.
310 ///
311 /// The method signature is `async` for forward compatibility (future
312 /// implementations may need to await a clean shutdown handshake with the
313 /// server).
314 #[allow(clippy::unused_async)]
315 pub async fn disconnect(self) -> Result<()> {
316 // Signal the background read-loop to stop.
317 let _ = self.shutdown_tx.send(true);
318 Ok(())
319 }
320
321 /// Return the current Lightstreamer session ID.
322 pub fn session_id(&self) -> &str {
323 &self.conn.session_id
324 }
325}