Skip to main content

ig_client/application/
dynamic_streamer.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 30/10/25
5******************************************************************************/
6
7//! Dynamic market streaming with thread-safe subscription management.
8//!
9//! This module provides a wrapper around `StreamerClient` that allows dynamic
10//! addition and removal of market subscriptions from multiple threads.
11
12use crate::application::client::StreamerClient;
13use crate::error::AppError;
14use crate::model::streaming::StreamingMarketField;
15use crate::presentation::price::PriceData;
16use std::collections::HashSet;
17use std::sync::Arc;
18use tokio::sync::{Notify, RwLock, mpsc};
19use tracing::{debug, info, warn};
20
21/// Dynamic market streamer with thread-safe subscription management.
22///
23/// This struct wraps a `StreamerClient` and provides methods to dynamically
24/// add, remove, and clear market subscriptions. All operations are thread-safe
25/// and can be called from multiple threads concurrently.
26///
27/// # Examples
28///
29/// ```ignore
30/// use ig_client::application::dynamic_streamer::DynamicMarketStreamer;
31/// use ig_client::model::streaming::StreamingMarketField;
32/// use std::collections::HashSet;
33///
34/// #[tokio::main]
35/// async fn main() -> Result<(), ig_client::error::AppError> {
36///     // Create fields to subscribe to
37///     let fields = HashSet::from([
38///         StreamingMarketField::Bid,
39///         StreamingMarketField::Offer,
40///     ]);
41///
42///     // Create the dynamic streamer
43///     let mut streamer = DynamicMarketStreamer::new(fields).await?;
44///
45///     // Get the receiver for price updates
46///     let mut receiver = streamer.get_receiver().await?;
47///
48///     // Add markets from different threads
49///     let streamer_clone = streamer.clone();
50///     tokio::spawn(async move {
51///         if let Err(e) = streamer_clone.add("IX.D.DAX.DAILY.IP".to_string()).await {
52///             tracing::error!("Failed to add market: {}", e);
53///         }
54///     });
55///
56///     // Start receiving updates
57///     tokio::spawn(async move {
58///         while let Some(price_data) = receiver.recv().await {
59///             println!("Price update: {}", price_data);
60///         }
61///     });
62///
63///     // Connect and run
64///     streamer.connect(None).await?;
65///     Ok(())
66/// }
67/// ```
68pub struct DynamicMarketStreamer {
69    /// Internal streamer client (recreated on epic changes)
70    client: Arc<RwLock<Option<StreamerClient>>>,
71    /// Set of EPICs currently subscribed
72    epics: Arc<RwLock<HashSet<String>>>,
73    /// Market fields to subscribe to
74    fields: HashSet<StreamingMarketField>,
75    /// Channel sender for price updates
76    price_tx: Arc<RwLock<Option<mpsc::UnboundedSender<PriceData>>>>,
77    /// Channel receiver for price updates (taken on first get_receiver call)
78    price_rx: Arc<RwLock<Option<mpsc::UnboundedReceiver<PriceData>>>>,
79    /// Flag indicating if the streamer is connected
80    is_connected: Arc<RwLock<bool>>,
81    /// Shutdown signal for current connection
82    shutdown_signal: Arc<RwLock<Option<Arc<Notify>>>>,
83}
84
85impl DynamicMarketStreamer {
86    /// Creates a new dynamic market streamer.
87    ///
88    /// # Arguments
89    ///
90    /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
91    ///
92    /// # Returns
93    ///
94    /// Returns a new `DynamicMarketStreamer` instance or an error if initialization fails.
95    ///
96    /// # Examples
97    ///
98    /// ```ignore
99    /// let fields = HashSet::from([
100    ///     StreamingMarketField::Bid,
101    ///     StreamingMarketField::Offer,
102    /// ]);
103    /// let streamer = DynamicMarketStreamer::new(fields).await?;
104    /// ```
105    pub async fn new(fields: HashSet<StreamingMarketField>) -> Result<Self, AppError> {
106        let (price_tx, price_rx) = mpsc::unbounded_channel();
107
108        Ok(Self {
109            client: Arc::new(RwLock::new(None)),
110            epics: Arc::new(RwLock::new(HashSet::new())),
111            fields,
112            price_tx: Arc::new(RwLock::new(Some(price_tx))),
113            price_rx: Arc::new(RwLock::new(Some(price_rx))),
114            is_connected: Arc::new(RwLock::new(false)),
115            shutdown_signal: Arc::new(RwLock::new(None)),
116        })
117    }
118
119    /// Adds a market EPIC to the subscription list.
120    ///
121    /// If the streamer is already connected, this will reconnect with the updated list.
122    ///
123    /// # Arguments
124    ///
125    /// * `epic` - The market EPIC to subscribe to
126    ///
127    /// # Returns
128    ///
129    /// Returns `Ok(())` if the EPIC was added successfully.
130    ///
131    /// # Examples
132    ///
133    /// ```ignore
134    /// streamer.add("IX.D.DAX.DAILY.IP".to_string()).await?;
135    /// ```
136    pub async fn add(&self, epic: String) -> Result<(), AppError> {
137        let mut epics = self.epics.write().await;
138
139        if epics.contains(&epic) {
140            debug!("EPIC {} already subscribed", epic);
141            return Ok(());
142        }
143
144        epics.insert(epic.clone());
145        info!("Added EPIC {} to subscription list", epic);
146        drop(epics); // Release lock
147
148        // If already connected, reconnect with new list
149        let is_connected = *self.is_connected.read().await;
150        if is_connected {
151            self.reconnect().await?;
152        }
153
154        Ok(())
155    }
156
157    /// Removes a market EPIC from the subscription list.
158    ///
159    /// If the streamer is already connected, this will reconnect with the updated list.
160    ///
161    /// # Arguments
162    ///
163    /// * `epic` - The market EPIC to remove
164    ///
165    /// # Returns
166    ///
167    /// Returns `Ok(())` if the EPIC was removed successfully.
168    ///
169    /// # Examples
170    ///
171    /// ```ignore
172    /// streamer.remove("IX.D.DAX.DAILY.IP".to_string()).await?;
173    /// ```
174    pub async fn remove(&self, epic: String) -> Result<(), AppError> {
175        let mut epics = self.epics.write().await;
176
177        let was_removed = epics.remove(&epic);
178        if was_removed {
179            info!("Removed EPIC {} from subscription list", epic);
180        } else {
181            debug!("EPIC {} was not in subscription list", epic);
182        }
183        drop(epics); // Release lock
184
185        // If already connected and something was removed, reconnect
186        if was_removed {
187            let is_connected = *self.is_connected.read().await;
188            if is_connected {
189                self.reconnect().await?;
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Clears all market EPICs from the subscription list.
197    ///
198    /// Note: Due to Lightstreamer limitations, this does not unsubscribe from
199    /// the server immediately. All EPICs will be removed from the internal list.
200    ///
201    /// # Returns
202    ///
203    /// Returns `Ok(())` when all EPICs have been cleared.
204    ///
205    /// # Examples
206    ///
207    /// ```ignore
208    /// streamer.clear().await?;
209    /// ```
210    pub async fn clear(&self) -> Result<(), AppError> {
211        let mut epics = self.epics.write().await;
212        let count = epics.len();
213        epics.clear();
214        info!("Cleared {} EPICs from subscription list", count);
215        Ok(())
216    }
217
218    /// Gets the current list of subscribed EPICs.
219    ///
220    /// # Returns
221    ///
222    /// Returns a vector containing all currently subscribed EPICs.
223    ///
224    /// # Examples
225    ///
226    /// ```ignore
227    /// let epics = streamer.get_epics().await;
228    /// println!("Subscribed to {} markets", epics.len());
229    /// ```
230    pub async fn get_epics(&self) -> Vec<String> {
231        let epics = self.epics.read().await;
232        epics.iter().cloned().collect()
233    }
234
235    /// Gets the receiver for price updates.
236    ///
237    /// This method can only be called once. Subsequent calls will return an error.
238    ///
239    /// # Returns
240    ///
241    /// Returns a receiver channel for `PriceData` updates, or an error if the
242    /// receiver has already been taken.
243    ///
244    /// # Examples
245    ///
246    /// ```ignore
247    /// let mut receiver = streamer.get_receiver().await?;
248    /// tokio::spawn(async move {
249    ///     while let Some(price_data) = receiver.recv().await {
250    ///         println!("Price update: {}", price_data);
251    ///     }
252    /// });
253    /// ```
254    pub async fn get_receiver(&self) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
255        let mut rx_lock = self.price_rx.write().await;
256        rx_lock
257            .take()
258            .ok_or_else(|| AppError::InvalidInput("Receiver already taken".to_string()))
259    }
260
261    /// Reconnects the streamer with the current list of EPICs.
262    ///
263    /// This method disconnects the current client and creates a new one with
264    /// the updated EPIC list.
265    async fn reconnect(&self) -> Result<(), AppError> {
266        info!("Reconnecting with updated EPIC list...");
267
268        // Signal shutdown to current client
269        {
270            let shutdown_lock = self.shutdown_signal.read().await;
271            if let Some(signal) = shutdown_lock.as_ref() {
272                signal.notify_one();
273            }
274        }
275
276        // Wait a bit for graceful shutdown
277        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
278
279        // Start new connection
280        let epics = self.get_epics().await;
281        if !epics.is_empty() {
282            self.start_internal().await?;
283        }
284
285        Ok(())
286    }
287
288    /// Internal method to start connection.
289    async fn start_internal(&self) -> Result<(), AppError> {
290        let epics = self.get_epics().await;
291
292        if epics.is_empty() {
293            warn!("No EPICs to subscribe to");
294            return Ok(());
295        }
296
297        info!("Starting connection with {} EPICs", epics.len());
298
299        // Create new client
300        let mut new_client = StreamerClient::new().await?;
301
302        // Subscribe to all EPICs
303        let fields = self.fields.clone();
304        let mut receiver = new_client.market_subscribe(epics.clone(), fields).await?;
305
306        // Forward updates to the main channel
307        let price_tx = self.price_tx.read().await;
308        if let Some(tx) = price_tx.as_ref() {
309            let tx = tx.clone();
310            tokio::spawn(async move {
311                while let Some(price_data) = receiver.recv().await {
312                    if tx.send(price_data).is_err() {
313                        warn!("Failed to send price update: receiver dropped");
314                        break;
315                    }
316                }
317                debug!("Subscription forwarding task ended");
318            });
319        }
320
321        // Store the new client
322        *self.client.write().await = Some(new_client);
323
324        // Create new shutdown signal
325        let signal = Arc::new(Notify::new());
326        *self.shutdown_signal.write().await = Some(Arc::clone(&signal));
327
328        // Mark as connected
329        *self.is_connected.write().await = true;
330
331        // Spawn connection task in background
332        let client = Arc::clone(&self.client);
333        let is_connected = Arc::clone(&self.is_connected);
334
335        tokio::spawn(async move {
336            let result = {
337                let mut client_guard = client.write().await;
338                if let Some(ref mut c) = *client_guard {
339                    c.connect(Some(signal)).await
340                } else {
341                    Ok(())
342                }
343            };
344
345            // Mark as disconnected
346            *is_connected.write().await = false;
347
348            match result {
349                Ok(_) => info!("Connection task completed successfully"),
350                Err(e) => tracing::error!("Connection task failed: {:?}", e),
351            }
352        });
353
354        info!("Connection task started in background");
355        Ok(())
356    }
357
358    /// Starts the connection to the Lightstreamer server and subscribes to all initial EPICs.
359    ///
360    /// This method subscribes to all EPICs in the subscription list and then spawns a background
361    /// task to maintain the connection. This allows dynamic subscription management while connected.
362    ///
363    /// # Returns
364    ///
365    /// Returns `Ok(())` immediately after starting the connection task.
366    ///
367    /// # Examples
368    ///
369    /// ```ignore
370    /// // Start connection
371    /// streamer.start().await?;
372    ///
373    /// // Keep main thread alive
374    /// tokio::signal::ctrl_c().await?;
375    /// ```
376    pub async fn start(&mut self) -> Result<(), AppError> {
377        self.start_internal().await
378    }
379
380    /// Connects to the Lightstreamer server and blocks until shutdown.
381    ///
382    /// This is a convenience method that calls `start()` and then waits for a shutdown signal.
383    /// Use `start()` if you need non-blocking behavior.
384    ///
385    /// # Returns
386    ///
387    /// Returns `Ok(())` when the connection is closed gracefully.
388    ///
389    /// # Examples
390    ///
391    /// ```ignore
392    /// // Connect and block until shutdown
393    /// streamer.connect().await?;
394    /// ```
395    pub async fn connect(&mut self) -> Result<(), AppError> {
396        self.start().await?;
397
398        // Wait for SIGINT/SIGTERM
399        use lightstreamer_rs::utils::setup_signal_hook;
400        let signal = Arc::new(Notify::new());
401        setup_signal_hook(Arc::clone(&signal)).await;
402        signal.notified().await;
403
404        // Disconnect
405        self.disconnect().await?;
406
407        Ok(())
408    }
409
410    /// Disconnects from the Lightstreamer server.
411    ///
412    /// This method gracefully closes the connection to the server.
413    ///
414    /// # Returns
415    ///
416    /// Returns `Ok(())` if the disconnection was successful.
417    ///
418    /// # Examples
419    ///
420    /// ```ignore
421    /// streamer.disconnect().await?;
422    /// ```
423    pub async fn disconnect(&mut self) -> Result<(), AppError> {
424        // Signal shutdown
425        {
426            let shutdown_lock = self.shutdown_signal.read().await;
427            if let Some(signal) = shutdown_lock.as_ref() {
428                signal.notify_one();
429            }
430        }
431
432        // Disconnect client
433        let mut client_lock = self.client.write().await;
434        if let Some(ref mut client) = *client_lock {
435            client.disconnect().await?;
436        }
437        *client_lock = None;
438
439        *self.is_connected.write().await = false;
440        info!("Disconnected from Lightstreamer server");
441        Ok(())
442    }
443}
444
445impl Clone for DynamicMarketStreamer {
446    fn clone(&self) -> Self {
447        Self {
448            client: Arc::clone(&self.client),
449            epics: Arc::clone(&self.epics),
450            fields: self.fields.clone(),
451            price_tx: Arc::clone(&self.price_tx),
452            price_rx: Arc::clone(&self.price_rx),
453            is_connected: Arc::clone(&self.is_connected),
454            shutdown_signal: Arc::clone(&self.shutdown_signal),
455        }
456    }
457}