ig_client/application/
dynamic_streamer.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 30/10/25
5******************************************************************************/
6
7//! Dynamic market streaming with thread-safe subscription management.
8//!
9//! This module provides a wrapper around `StreamerClient` that allows dynamic
10//! addition and removal of market subscriptions from multiple threads.
11
12use crate::application::client::StreamerClient;
13use crate::error::AppError;
14use crate::model::streaming::StreamingMarketField;
15use crate::presentation::price::PriceData;
16use std::collections::HashSet;
17use std::sync::Arc;
18use tokio::sync::{Notify, RwLock, mpsc};
19use tracing::{debug, info, warn};
20
21/// Dynamic market streamer with thread-safe subscription management.
22///
23/// This struct wraps a `StreamerClient` and provides methods to dynamically
24/// add, remove, and clear market subscriptions. All operations are thread-safe
25/// and can be called from multiple threads concurrently.
26///
27/// # Examples
28///
29/// ```ignore
30/// use ig_client::application::dynamic_streamer::DynamicMarketStreamer;
31/// use ig_client::model::streaming::StreamingMarketField;
32/// use std::collections::HashSet;
33///
34/// #[tokio::main]
35/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
36///     // Create fields to subscribe to
37///     let fields = HashSet::from([
38///         StreamingMarketField::Bid,
39///         StreamingMarketField::Offer,
40///     ]);
41///
42///     // Create the dynamic streamer
43///     let mut streamer = DynamicMarketStreamer::new(fields).await?;
44///
45///     // Get the receiver for price updates
46///     let mut receiver = streamer.get_receiver().await?;
47///
48///     // Add markets from different threads
49///     let streamer_clone = streamer.clone();
50///     tokio::spawn(async move {
51///         streamer_clone.add("IX.D.DAX.DAILY.IP".to_string()).await.unwrap();
52///     });
53///
54///     // Start receiving updates
55///     tokio::spawn(async move {
56///         while let Some(price_data) = receiver.recv().await {
57///             println!("Price update: {}", price_data);
58///         }
59///     });
60///
61///     // Connect and run
62///     streamer.connect(None).await?;
63///     Ok(())
64/// }
65/// ```
66pub struct DynamicMarketStreamer {
67    /// Internal streamer client (recreated on epic changes)
68    client: Arc<RwLock<Option<StreamerClient>>>,
69    /// Set of EPICs currently subscribed
70    epics: Arc<RwLock<HashSet<String>>>,
71    /// Market fields to subscribe to
72    fields: HashSet<StreamingMarketField>,
73    /// Channel sender for price updates
74    price_tx: Arc<RwLock<Option<mpsc::UnboundedSender<PriceData>>>>,
75    /// Channel receiver for price updates (taken on first get_receiver call)
76    price_rx: Arc<RwLock<Option<mpsc::UnboundedReceiver<PriceData>>>>,
77    /// Flag indicating if the streamer is connected
78    is_connected: Arc<RwLock<bool>>,
79    /// Shutdown signal for current connection
80    shutdown_signal: Arc<RwLock<Option<Arc<Notify>>>>,
81}
82
83impl DynamicMarketStreamer {
84    /// Creates a new dynamic market streamer.
85    ///
86    /// # Arguments
87    ///
88    /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
89    ///
90    /// # Returns
91    ///
92    /// Returns a new `DynamicMarketStreamer` instance or an error if initialization fails.
93    ///
94    /// # Examples
95    ///
96    /// ```ignore
97    /// let fields = HashSet::from([
98    ///     StreamingMarketField::Bid,
99    ///     StreamingMarketField::Offer,
100    /// ]);
101    /// let streamer = DynamicMarketStreamer::new(fields).await?;
102    /// ```
103    pub async fn new(fields: HashSet<StreamingMarketField>) -> Result<Self, AppError> {
104        let (price_tx, price_rx) = mpsc::unbounded_channel();
105
106        Ok(Self {
107            client: Arc::new(RwLock::new(None)),
108            epics: Arc::new(RwLock::new(HashSet::new())),
109            fields,
110            price_tx: Arc::new(RwLock::new(Some(price_tx))),
111            price_rx: Arc::new(RwLock::new(Some(price_rx))),
112            is_connected: Arc::new(RwLock::new(false)),
113            shutdown_signal: Arc::new(RwLock::new(None)),
114        })
115    }
116
117    /// Adds a market EPIC to the subscription list.
118    ///
119    /// If the streamer is already connected, this will reconnect with the updated list.
120    ///
121    /// # Arguments
122    ///
123    /// * `epic` - The market EPIC to subscribe to
124    ///
125    /// # Returns
126    ///
127    /// Returns `Ok(())` if the EPIC was added successfully.
128    ///
129    /// # Examples
130    ///
131    /// ```ignore
132    /// streamer.add("IX.D.DAX.DAILY.IP".to_string()).await?;
133    /// ```
134    pub async fn add(&self, epic: String) -> Result<(), AppError> {
135        let mut epics = self.epics.write().await;
136
137        if epics.contains(&epic) {
138            debug!("EPIC {} already subscribed", epic);
139            return Ok(());
140        }
141
142        epics.insert(epic.clone());
143        info!("Added EPIC {} to subscription list", epic);
144        drop(epics); // Release lock
145
146        // If already connected, reconnect with new list
147        let is_connected = *self.is_connected.read().await;
148        if is_connected {
149            self.reconnect().await?;
150        }
151
152        Ok(())
153    }
154
155    /// Removes a market EPIC from the subscription list.
156    ///
157    /// If the streamer is already connected, this will reconnect with the updated list.
158    ///
159    /// # Arguments
160    ///
161    /// * `epic` - The market EPIC to remove
162    ///
163    /// # Returns
164    ///
165    /// Returns `Ok(())` if the EPIC was removed successfully.
166    ///
167    /// # Examples
168    ///
169    /// ```ignore
170    /// streamer.remove("IX.D.DAX.DAILY.IP".to_string()).await?;
171    /// ```
172    pub async fn remove(&self, epic: String) -> Result<(), AppError> {
173        let mut epics = self.epics.write().await;
174
175        let was_removed = epics.remove(&epic);
176        if was_removed {
177            info!("Removed EPIC {} from subscription list", epic);
178        } else {
179            debug!("EPIC {} was not in subscription list", epic);
180        }
181        drop(epics); // Release lock
182
183        // If already connected and something was removed, reconnect
184        if was_removed {
185            let is_connected = *self.is_connected.read().await;
186            if is_connected {
187                self.reconnect().await?;
188            }
189        }
190
191        Ok(())
192    }
193
194    /// Clears all market EPICs from the subscription list.
195    ///
196    /// Note: Due to Lightstreamer limitations, this does not unsubscribe from
197    /// the server immediately. All EPICs will be removed from the internal list.
198    ///
199    /// # Returns
200    ///
201    /// Returns `Ok(())` when all EPICs have been cleared.
202    ///
203    /// # Examples
204    ///
205    /// ```ignore
206    /// streamer.clear().await?;
207    /// ```
208    pub async fn clear(&self) -> Result<(), AppError> {
209        let mut epics = self.epics.write().await;
210        let count = epics.len();
211        epics.clear();
212        info!("Cleared {} EPICs from subscription list", count);
213        Ok(())
214    }
215
216    /// Gets the current list of subscribed EPICs.
217    ///
218    /// # Returns
219    ///
220    /// Returns a vector containing all currently subscribed EPICs.
221    ///
222    /// # Examples
223    ///
224    /// ```ignore
225    /// let epics = streamer.get_epics().await;
226    /// println!("Subscribed to {} markets", epics.len());
227    /// ```
228    pub async fn get_epics(&self) -> Vec<String> {
229        let epics = self.epics.read().await;
230        epics.iter().cloned().collect()
231    }
232
233    /// Gets the receiver for price updates.
234    ///
235    /// This method can only be called once. Subsequent calls will return an error.
236    ///
237    /// # Returns
238    ///
239    /// Returns a receiver channel for `PriceData` updates, or an error if the
240    /// receiver has already been taken.
241    ///
242    /// # Examples
243    ///
244    /// ```ignore
245    /// let mut receiver = streamer.get_receiver().await?;
246    /// tokio::spawn(async move {
247    ///     while let Some(price_data) = receiver.recv().await {
248    ///         println!("Price update: {}", price_data);
249    ///     }
250    /// });
251    /// ```
252    pub async fn get_receiver(&self) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
253        let mut rx_lock = self.price_rx.write().await;
254        rx_lock
255            .take()
256            .ok_or_else(|| AppError::InvalidInput("Receiver already taken".to_string()))
257    }
258
259    /// Reconnects the streamer with the current list of EPICs.
260    ///
261    /// This method disconnects the current client and creates a new one with
262    /// the updated EPIC list.
263    async fn reconnect(&self) -> Result<(), AppError> {
264        info!("Reconnecting with updated EPIC list...");
265
266        // Signal shutdown to current client
267        {
268            let shutdown_lock = self.shutdown_signal.read().await;
269            if let Some(signal) = shutdown_lock.as_ref() {
270                signal.notify_one();
271            }
272        }
273
274        // Wait a bit for graceful shutdown
275        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
276
277        // Start new connection
278        let epics = self.get_epics().await;
279        if !epics.is_empty() {
280            self.start_internal().await?;
281        }
282
283        Ok(())
284    }
285
286    /// Internal method to start connection.
287    async fn start_internal(&self) -> Result<(), AppError> {
288        let epics = self.get_epics().await;
289
290        if epics.is_empty() {
291            warn!("No EPICs to subscribe to");
292            return Ok(());
293        }
294
295        info!("Starting connection with {} EPICs", epics.len());
296
297        // Create new client
298        let mut new_client = StreamerClient::new().await?;
299
300        // Subscribe to all EPICs
301        let fields = self.fields.clone();
302        let mut receiver = new_client.market_subscribe(epics.clone(), fields).await?;
303
304        // Forward updates to the main channel
305        let price_tx = self.price_tx.read().await;
306        if let Some(tx) = price_tx.as_ref() {
307            let tx = tx.clone();
308            tokio::spawn(async move {
309                while let Some(price_data) = receiver.recv().await {
310                    if tx.send(price_data).is_err() {
311                        warn!("Failed to send price update: receiver dropped");
312                        break;
313                    }
314                }
315                debug!("Subscription forwarding task ended");
316            });
317        }
318
319        // Store the new client
320        *self.client.write().await = Some(new_client);
321
322        // Create new shutdown signal
323        let signal = Arc::new(Notify::new());
324        *self.shutdown_signal.write().await = Some(Arc::clone(&signal));
325
326        // Mark as connected
327        *self.is_connected.write().await = true;
328
329        // Spawn connection task in background
330        let client = Arc::clone(&self.client);
331        let is_connected = Arc::clone(&self.is_connected);
332
333        tokio::spawn(async move {
334            let result = {
335                let mut client_guard = client.write().await;
336                if let Some(ref mut c) = *client_guard {
337                    c.connect(Some(signal)).await
338                } else {
339                    Ok(())
340                }
341            };
342
343            // Mark as disconnected
344            *is_connected.write().await = false;
345
346            match result {
347                Ok(_) => info!("Connection task completed successfully"),
348                Err(e) => tracing::error!("Connection task failed: {:?}", e),
349            }
350        });
351
352        info!("Connection task started in background");
353        Ok(())
354    }
355
356    /// Starts the connection to the Lightstreamer server and subscribes to all initial EPICs.
357    ///
358    /// This method subscribes to all EPICs in the subscription list and then spawns a background
359    /// task to maintain the connection. This allows dynamic subscription management while connected.
360    ///
361    /// # Returns
362    ///
363    /// Returns `Ok(())` immediately after starting the connection task.
364    ///
365    /// # Examples
366    ///
367    /// ```ignore
368    /// // Start connection
369    /// streamer.start().await?;
370    ///
371    /// // Keep main thread alive
372    /// tokio::signal::ctrl_c().await?;
373    /// ```
374    pub async fn start(&mut self) -> Result<(), AppError> {
375        self.start_internal().await
376    }
377
378    /// Connects to the Lightstreamer server and blocks until shutdown.
379    ///
380    /// This is a convenience method that calls `start()` and then waits for a shutdown signal.
381    /// Use `start()` if you need non-blocking behavior.
382    ///
383    /// # Returns
384    ///
385    /// Returns `Ok(())` when the connection is closed gracefully.
386    ///
387    /// # Examples
388    ///
389    /// ```ignore
390    /// // Connect and block until shutdown
391    /// streamer.connect().await?;
392    /// ```
393    pub async fn connect(&mut self) -> Result<(), AppError> {
394        self.start().await?;
395
396        // Wait for SIGINT/SIGTERM
397        use lightstreamer_rs::utils::setup_signal_hook;
398        let signal = Arc::new(Notify::new());
399        setup_signal_hook(Arc::clone(&signal)).await;
400        signal.notified().await;
401
402        // Disconnect
403        self.disconnect().await?;
404
405        Ok(())
406    }
407
408    /// Disconnects from the Lightstreamer server.
409    ///
410    /// This method gracefully closes the connection to the server.
411    ///
412    /// # Returns
413    ///
414    /// Returns `Ok(())` if the disconnection was successful.
415    ///
416    /// # Examples
417    ///
418    /// ```ignore
419    /// streamer.disconnect().await?;
420    /// ```
421    pub async fn disconnect(&mut self) -> Result<(), AppError> {
422        // Signal shutdown
423        {
424            let shutdown_lock = self.shutdown_signal.read().await;
425            if let Some(signal) = shutdown_lock.as_ref() {
426                signal.notify_one();
427            }
428        }
429
430        // Disconnect client
431        let mut client_lock = self.client.write().await;
432        if let Some(ref mut client) = *client_lock {
433            client.disconnect().await?;
434        }
435        *client_lock = None;
436
437        *self.is_connected.write().await = false;
438        info!("Disconnected from Lightstreamer server");
439        Ok(())
440    }
441}
442
443impl Clone for DynamicMarketStreamer {
444    fn clone(&self) -> Self {
445        Self {
446            client: Arc::clone(&self.client),
447            epics: Arc::clone(&self.epics),
448            fields: self.fields.clone(),
449            price_tx: Arc::clone(&self.price_tx),
450            price_rx: Arc::clone(&self.price_rx),
451            is_connected: Arc::clone(&self.is_connected),
452            shutdown_signal: Arc::clone(&self.shutdown_signal),
453        }
454    }
455}