Skip to main content

trading_ig/streaming/
client.rs

1//! High-level streaming client returned by [`crate::IgClient::streaming`].
2//!
3//! Obtain a [`StreamingApi`] from [`crate::IgClient::streaming`], then call
4//! [`StreamingApi::connect`] or [`StreamingApi::connect_with`] to open a
5//! Lightstreamer session and receive a [`StreamingClient`].
6//!
7//! # Example
8//!
9//! ```no_run
10//! # use trading_ig::{IgClient, Environment, Credentials};
11//! # async fn run() -> trading_ig::Result<()> {
12//! # let client = IgClient::builder()
13//! #     .environment(Environment::Demo)
14//! #     .api_key("key")
15//! #     .credentials(Credentials::password("u", "p"))
16//! #     .build()?;
17//! client.session().login_v2().await?;
18//! let (stream, _events) = client.streaming().connect_with(Default::default()).await?;
19//! let mut rx = stream.subscribe_market("CS.D.GBPUSD.TODAY.IP").await?;
20//! while let Some(update) = rx.recv().await {
21//!     println!("{} bid={:?}", update.epic, update.bid);
22//! }
23//! # Ok(()) }
24//! ```
25
26use tokio::sync::{mpsc, watch};
27use tracing::instrument;
28
29use crate::IgClient;
30use crate::error::Result;
31use crate::session::{AuthTokens, SessionHandle};
32use crate::streaming::connection::{CreateParams, LsConnection};
33use crate::streaming::events::{
34    AccountUpdate, CandleScale, ChartCandleUpdate, ChartTickUpdate, MarketUpdate, TradeUpdate,
35};
36use crate::streaming::reconnect::{AutoReconnect, StreamingEvent};
37use crate::streaming::subscription::{Registry, SubscriptionKind};
38
39/// Subscription channel capacity.  Keep modest so a slow consumer causes
40/// back-pressure rather than unbounded queue growth.
41const CHANNEL_CAP: usize = 256;
42
43/// Capacity of the optional lifecycle-event channel.
44const EVENT_CHAN_CAP: usize = 64;
45
46// ---------------------------------------------------------------------------
47// StreamingApi — accessor on IgClient
48// ---------------------------------------------------------------------------
49
50/// Entry point for streaming.  Obtain via [`crate::IgClient::streaming`].
51#[derive(Debug)]
52pub struct StreamingApi<'a> {
53    pub(crate) client: &'a IgClient,
54}
55
56impl StreamingApi<'_> {
57    /// Connect to the Lightstreamer streaming endpoint with the default
58    /// [`AutoReconnect`] policy (enabled, 5 attempts, 1 s–30 s back-off).
59    ///
60    /// The underlying session must already be authenticated before calling
61    /// this method.  For a **v3 (OAuth)** session, call
62    /// `client.session().read(true).await?` first so that CST/XST tokens
63    /// are stored locally — Lightstreamer requires the
64    /// `CST-<cst>|XST-<xst>` password format regardless of auth flavour.
65    ///
66    /// Returns `(StreamingClient, Receiver<StreamingEvent>)`.  The event
67    /// channel emits [`StreamingEvent::Reconnected`],
68    /// [`StreamingEvent::ReconnectFailed`], and
69    /// [`StreamingEvent::Disconnected`] lifecycle events.
70    ///
71    /// Equivalent to `connect_with(AutoReconnect::default())`.
72    #[instrument(skip_all, name = "streaming.connect")]
73    pub async fn connect(&self) -> Result<(StreamingClient, mpsc::Receiver<StreamingEvent>)> {
74        self.connect_with(AutoReconnect::default()).await
75    }
76
77    /// Connect with an explicit [`AutoReconnect`] policy.
78    ///
79    /// Set `policy.enabled = false` to disable auto-reconnect and get the
80    /// pre-reconnect behaviour: the stream terminates on `END` and all
81    /// subscriber channels close.
82    ///
83    /// Returns `(StreamingClient, Receiver<StreamingEvent>)`.
84    #[instrument(skip_all, name = "streaming.connect_with")]
85    pub async fn connect_with(
86        &self,
87        policy: AutoReconnect,
88    ) -> Result<(StreamingClient, mpsc::Receiver<StreamingEvent>)> {
89        let state = self.client.session.require_authenticated().await?;
90
91        let account_id = state.account_id.ok_or_else(|| {
92            crate::error::Error::Auth(
93                "no account ID in session — call session().login() first".into(),
94            )
95        })?;
96        let endpoint = state.lightstreamer_endpoint.ok_or_else(|| {
97            crate::error::Error::Auth(
98                "no Lightstreamer endpoint in session — call session().login() first".into(),
99            )
100        })?;
101
102        // Build Lightstreamer password from CST/XST tokens.
103        // OAuth sessions need CST/XST — caller must call read(true) first to
104        // exchange the OAuth token for a CST/XST pair.
105        let password = match state.tokens.as_ref() {
106            Some(AuthTokens::Cst {
107                cst,
108                x_security_token,
109            }) => format!("CST-{cst}|XST-{x_security_token}"),
110            Some(AuthTokens::OAuth { .. }) => {
111                return Err(crate::error::Error::Auth(
112                    "OAuth session cannot be used directly for streaming. \
113                     Call client.session().read(true).await? first to obtain \
114                     CST/XST tokens, then call streaming().connect() again."
115                        .into(),
116                ));
117            }
118            None => return Err(crate::error::Error::Auth("no active session tokens".into())),
119        };
120
121        let registry = Registry::new();
122        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
123        let (event_tx, event_rx) = mpsc::channel(EVENT_CHAN_CAP);
124
125        // Build a SessionHandle so the reconnect path can call login_v2().
126        let session_handle = SessionHandle {
127            transport: self.client.transport.clone(),
128            session: self.client.session.clone(),
129            credentials: self.client.credentials.clone(),
130        };
131
132        let conn = LsConnection::create(CreateParams {
133            endpoint,
134            username: account_id,
135            password,
136            registry: registry.clone(),
137            shutdown_tx: shutdown_tx.clone(),
138            policy,
139            event_tx: Some(event_tx),
140            session_handle,
141        })
142        .await?;
143
144        let client = StreamingClient {
145            conn,
146            registry,
147            shutdown_tx,
148        };
149        Ok((client, event_rx))
150    }
151}
152
153// ---------------------------------------------------------------------------
154// StreamingClient
155// ---------------------------------------------------------------------------
156
157/// A live Lightstreamer session with active subscriptions.
158///
159/// Obtained via [`StreamingApi::connect`] or [`StreamingApi::connect_with`].
160/// All subscription methods return a `tokio::sync::mpsc::Receiver<T>`.
161/// Dropping the receiver automatically cancels the subscription server-side
162/// the next time the server sends an update for that item.
163///
164/// Call [`StreamingClient::disconnect`] to cleanly tear down the session.
165#[derive(Debug)]
166pub struct StreamingClient {
167    conn: LsConnection,
168    registry: Registry,
169    shutdown_tx: watch::Sender<bool>,
170}
171
172impl StreamingClient {
173    // ------------------------------------------------------------------
174    // Market
175    // ------------------------------------------------------------------
176
177    /// Subscribe to market price updates for `epic`.
178    ///
179    /// Returns a `Receiver<MarketUpdate>`.  Each received value is a snapshot
180    /// of all changed fields merged with the previous state — no field is
181    /// ever "missing".
182    #[instrument(skip(self), fields(%epic))]
183    pub async fn subscribe_market(&self, epic: &str) -> Result<mpsc::Receiver<MarketUpdate>> {
184        use crate::streaming::events::MARKET_FIELDS;
185        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
186        let idx = self.registry.register(SubscriptionKind::Market {
187            epic: epic.to_owned(),
188            tx,
189        });
190        let item = format!("MARKET:{epic}");
191        let fields = MARKET_FIELDS.join(" ");
192        self.conn
193            .control("add", idx, &item, &fields, "MERGE")
194            .await?;
195        Ok(rx)
196    }
197
198    // ------------------------------------------------------------------
199    // Chart tick
200    // ------------------------------------------------------------------
201
202    /// Subscribe to chart tick data for `epic`.
203    ///
204    /// Returns a `Receiver<ChartTickUpdate>`.  This is a `DISTINCT`-mode
205    /// subscription — every message is a fresh tick, not a merge.
206    #[instrument(skip(self), fields(%epic))]
207    pub async fn subscribe_chart_tick(
208        &self,
209        epic: &str,
210    ) -> Result<mpsc::Receiver<ChartTickUpdate>> {
211        use crate::streaming::events::CHART_TICK_FIELDS;
212        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
213        let idx = self.registry.register(SubscriptionKind::ChartTick {
214            epic: epic.to_owned(),
215            tx,
216        });
217        let item = format!("CHART:{epic}:TICK");
218        let fields = CHART_TICK_FIELDS.join(" ");
219        self.conn
220            .control("add", idx, &item, &fields, "DISTINCT")
221            .await?;
222        Ok(rx)
223    }
224
225    // ------------------------------------------------------------------
226    // Chart candle
227    // ------------------------------------------------------------------
228
229    /// Subscribe to OHLC candle data for `epic` at `scale`.
230    ///
231    /// Returns a `Receiver<ChartCandleUpdate>`.  This is a `MERGE`-mode
232    /// subscription — fields are merged across updates for the current candle.
233    #[instrument(skip(self), fields(%epic, scale = %scale))]
234    pub async fn subscribe_chart_candle(
235        &self,
236        epic: &str,
237        scale: CandleScale,
238    ) -> Result<mpsc::Receiver<ChartCandleUpdate>> {
239        use crate::streaming::events::CHART_CANDLE_FIELDS;
240        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
241        let idx = self.registry.register(SubscriptionKind::ChartCandle {
242            epic: epic.to_owned(),
243            scale,
244            tx,
245        });
246        let item = format!("CHART:{epic}:{}", scale.as_str());
247        let fields = CHART_CANDLE_FIELDS.join(" ");
248        self.conn
249            .control("add", idx, &item, &fields, "MERGE")
250            .await?;
251        Ok(rx)
252    }
253
254    // ------------------------------------------------------------------
255    // Account
256    // ------------------------------------------------------------------
257
258    /// Subscribe to account balance and margin updates.
259    ///
260    /// Returns a `Receiver<AccountUpdate>`.
261    #[instrument(skip(self), fields(%account_id))]
262    pub async fn subscribe_account(
263        &self,
264        account_id: &str,
265    ) -> Result<mpsc::Receiver<AccountUpdate>> {
266        use crate::streaming::events::ACCOUNT_FIELDS;
267        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
268        let idx = self.registry.register(SubscriptionKind::Account {
269            account_id: account_id.to_owned(),
270            tx,
271        });
272        let item = format!("ACCOUNT:{account_id}");
273        let fields = ACCOUNT_FIELDS.join(" ");
274        self.conn
275            .control("add", idx, &item, &fields, "MERGE")
276            .await?;
277        Ok(rx)
278    }
279
280    // ------------------------------------------------------------------
281    // Trade
282    // ------------------------------------------------------------------
283
284    /// Subscribe to trade confirmations and working-order updates.
285    ///
286    /// Returns a `Receiver<TradeUpdate>`.
287    #[instrument(skip(self), fields(%account_id))]
288    pub async fn subscribe_trade(&self, account_id: &str) -> Result<mpsc::Receiver<TradeUpdate>> {
289        use crate::streaming::events::TRADE_FIELDS;
290        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
291        let idx = self.registry.register(SubscriptionKind::Trade {
292            account_id: account_id.to_owned(),
293            tx,
294        });
295        let item = format!("TRADE:{account_id}");
296        let fields = TRADE_FIELDS.join(" ");
297        self.conn
298            .control("add", idx, &item, &fields, "DISTINCT")
299            .await?;
300        Ok(rx)
301    }
302
303    // ------------------------------------------------------------------
304    // Lifecycle
305    // ------------------------------------------------------------------
306
307    /// Disconnect from Lightstreamer and stop the background read-loop task.
308    ///
309    /// After this call all pending `Receiver`s will no longer receive updates.
310    ///
311    /// The method signature is `async` for forward compatibility (future
312    /// implementations may need to await a clean shutdown handshake with the
313    /// server).
314    #[allow(clippy::unused_async)]
315    pub async fn disconnect(self) -> Result<()> {
316        // Signal the background read-loop to stop.
317        let _ = self.shutdown_tx.send(true);
318        Ok(())
319    }
320
321    /// Return the current Lightstreamer session ID.
322    pub fn session_id(&self) -> &str {
323        &self.conn.session_id
324    }
325}