ig_client/application/dynamic_streamer.rs
1/******************************************************************************
2 Author: Joaquín Béjar García
3 Email: jb@taunais.com
4 Date: 30/10/25
5******************************************************************************/
6
7//! Dynamic market streaming with thread-safe subscription management.
8//!
9//! This module provides a wrapper around `StreamerClient` that allows dynamic
10//! addition and removal of market subscriptions from multiple threads.
11
12use crate::application::client::StreamerClient;
13use crate::error::AppError;
14use crate::model::streaming::StreamingMarketField;
15use crate::presentation::price::PriceData;
16use std::collections::HashSet;
17use std::sync::Arc;
18use tokio::sync::{Notify, RwLock, mpsc};
19use tracing::{debug, info, warn};
20
21/// Dynamic market streamer with thread-safe subscription management.
22///
23/// This struct wraps a `StreamerClient` and provides methods to dynamically
24/// add, remove, and clear market subscriptions. All operations are thread-safe
25/// and can be called from multiple threads concurrently.
26///
27/// # Examples
28///
29/// ```ignore
30/// use ig_client::application::dynamic_streamer::DynamicMarketStreamer;
31/// use ig_client::model::streaming::StreamingMarketField;
32/// use std::collections::HashSet;
33///
34/// #[tokio::main]
35/// async fn main() -> Result<(), ig_client::error::AppError> {
36/// // Create fields to subscribe to
37/// let fields = HashSet::from([
38/// StreamingMarketField::Bid,
39/// StreamingMarketField::Offer,
40/// ]);
41///
42/// // Create the dynamic streamer
43/// let mut streamer = DynamicMarketStreamer::new(fields).await?;
44///
45/// // Get the receiver for price updates
46/// let mut receiver = streamer.get_receiver().await?;
47///
48/// // Add markets from different threads
49/// let streamer_clone = streamer.clone();
50/// tokio::spawn(async move {
51/// if let Err(e) = streamer_clone.add("IX.D.DAX.DAILY.IP".to_string()).await {
52/// tracing::error!("Failed to add market: {}", e);
53/// }
54/// });
55///
56/// // Start receiving updates
57/// tokio::spawn(async move {
58/// while let Some(price_data) = receiver.recv().await {
59/// println!("Price update: {}", price_data);
60/// }
61/// });
62///
63/// // Connect and run
64/// streamer.connect(None).await?;
65/// Ok(())
66/// }
67/// ```
68pub struct DynamicMarketStreamer {
69 /// Internal streamer client (recreated on epic changes)
70 client: Arc<RwLock<Option<StreamerClient>>>,
71 /// Set of EPICs currently subscribed
72 epics: Arc<RwLock<HashSet<String>>>,
73 /// Market fields to subscribe to
74 fields: HashSet<StreamingMarketField>,
75 /// Channel sender for price updates
76 price_tx: Arc<RwLock<Option<mpsc::UnboundedSender<PriceData>>>>,
77 /// Channel receiver for price updates (taken on first get_receiver call)
78 price_rx: Arc<RwLock<Option<mpsc::UnboundedReceiver<PriceData>>>>,
79 /// Flag indicating if the streamer is connected
80 is_connected: Arc<RwLock<bool>>,
81 /// Shutdown signal for current connection
82 shutdown_signal: Arc<RwLock<Option<Arc<Notify>>>>,
83}
84
85impl DynamicMarketStreamer {
86 /// Creates a new dynamic market streamer.
87 ///
88 /// # Arguments
89 ///
90 /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
91 ///
92 /// # Returns
93 ///
94 /// Returns a new `DynamicMarketStreamer` instance or an error if initialization fails.
95 ///
96 /// # Examples
97 ///
98 /// ```ignore
99 /// let fields = HashSet::from([
100 /// StreamingMarketField::Bid,
101 /// StreamingMarketField::Offer,
102 /// ]);
103 /// let streamer = DynamicMarketStreamer::new(fields).await?;
104 /// ```
105 pub async fn new(fields: HashSet<StreamingMarketField>) -> Result<Self, AppError> {
106 let (price_tx, price_rx) = mpsc::unbounded_channel();
107
108 Ok(Self {
109 client: Arc::new(RwLock::new(None)),
110 epics: Arc::new(RwLock::new(HashSet::new())),
111 fields,
112 price_tx: Arc::new(RwLock::new(Some(price_tx))),
113 price_rx: Arc::new(RwLock::new(Some(price_rx))),
114 is_connected: Arc::new(RwLock::new(false)),
115 shutdown_signal: Arc::new(RwLock::new(None)),
116 })
117 }
118
119 /// Adds a market EPIC to the subscription list.
120 ///
121 /// If the streamer is already connected, this will reconnect with the updated list.
122 ///
123 /// # Arguments
124 ///
125 /// * `epic` - The market EPIC to subscribe to
126 ///
127 /// # Returns
128 ///
129 /// Returns `Ok(())` if the EPIC was added successfully.
130 ///
131 /// # Examples
132 ///
133 /// ```ignore
134 /// streamer.add("IX.D.DAX.DAILY.IP".to_string()).await?;
135 /// ```
136 pub async fn add(&self, epic: String) -> Result<(), AppError> {
137 let mut epics = self.epics.write().await;
138
139 if epics.contains(&epic) {
140 debug!("EPIC {} already subscribed", epic);
141 return Ok(());
142 }
143
144 epics.insert(epic.clone());
145 info!("Added EPIC {} to subscription list", epic);
146 drop(epics); // Release lock
147
148 // If already connected, reconnect with new list
149 let is_connected = *self.is_connected.read().await;
150 if is_connected {
151 self.reconnect().await?;
152 }
153
154 Ok(())
155 }
156
157 /// Removes a market EPIC from the subscription list.
158 ///
159 /// If the streamer is already connected, this will reconnect with the updated list.
160 ///
161 /// # Arguments
162 ///
163 /// * `epic` - The market EPIC to remove
164 ///
165 /// # Returns
166 ///
167 /// Returns `Ok(())` if the EPIC was removed successfully.
168 ///
169 /// # Examples
170 ///
171 /// ```ignore
172 /// streamer.remove("IX.D.DAX.DAILY.IP".to_string()).await?;
173 /// ```
174 pub async fn remove(&self, epic: String) -> Result<(), AppError> {
175 let mut epics = self.epics.write().await;
176
177 let was_removed = epics.remove(&epic);
178 if was_removed {
179 info!("Removed EPIC {} from subscription list", epic);
180 } else {
181 debug!("EPIC {} was not in subscription list", epic);
182 }
183 drop(epics); // Release lock
184
185 // If already connected and something was removed, reconnect
186 if was_removed {
187 let is_connected = *self.is_connected.read().await;
188 if is_connected {
189 self.reconnect().await?;
190 }
191 }
192
193 Ok(())
194 }
195
196 /// Clears all market EPICs from the subscription list.
197 ///
198 /// Note: Due to Lightstreamer limitations, this does not unsubscribe from
199 /// the server immediately. All EPICs will be removed from the internal list.
200 ///
201 /// # Returns
202 ///
203 /// Returns `Ok(())` when all EPICs have been cleared.
204 ///
205 /// # Examples
206 ///
207 /// ```ignore
208 /// streamer.clear().await?;
209 /// ```
210 pub async fn clear(&self) -> Result<(), AppError> {
211 let mut epics = self.epics.write().await;
212 let count = epics.len();
213 epics.clear();
214 info!("Cleared {} EPICs from subscription list", count);
215 Ok(())
216 }
217
218 /// Gets the current list of subscribed EPICs.
219 ///
220 /// # Returns
221 ///
222 /// Returns a vector containing all currently subscribed EPICs.
223 ///
224 /// # Examples
225 ///
226 /// ```ignore
227 /// let epics = streamer.get_epics().await;
228 /// println!("Subscribed to {} markets", epics.len());
229 /// ```
230 pub async fn get_epics(&self) -> Vec<String> {
231 let epics = self.epics.read().await;
232 epics.iter().cloned().collect()
233 }
234
235 /// Gets the receiver for price updates.
236 ///
237 /// This method can only be called once. Subsequent calls will return an error.
238 ///
239 /// # Returns
240 ///
241 /// Returns a receiver channel for `PriceData` updates, or an error if the
242 /// receiver has already been taken.
243 ///
244 /// # Examples
245 ///
246 /// ```ignore
247 /// let mut receiver = streamer.get_receiver().await?;
248 /// tokio::spawn(async move {
249 /// while let Some(price_data) = receiver.recv().await {
250 /// println!("Price update: {}", price_data);
251 /// }
252 /// });
253 /// ```
254 pub async fn get_receiver(&self) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
255 let mut rx_lock = self.price_rx.write().await;
256 rx_lock
257 .take()
258 .ok_or_else(|| AppError::InvalidInput("Receiver already taken".to_string()))
259 }
260
261 /// Reconnects the streamer with the current list of EPICs.
262 ///
263 /// This method disconnects the current client and creates a new one with
264 /// the updated EPIC list.
265 async fn reconnect(&self) -> Result<(), AppError> {
266 info!("Reconnecting with updated EPIC list...");
267
268 // Signal shutdown to current client
269 {
270 let shutdown_lock = self.shutdown_signal.read().await;
271 if let Some(signal) = shutdown_lock.as_ref() {
272 signal.notify_one();
273 }
274 }
275
276 // Wait a bit for graceful shutdown
277 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
278
279 // Start new connection
280 let epics = self.get_epics().await;
281 if !epics.is_empty() {
282 self.start_internal().await?;
283 }
284
285 Ok(())
286 }
287
288 /// Internal method to start connection.
289 async fn start_internal(&self) -> Result<(), AppError> {
290 let epics = self.get_epics().await;
291
292 if epics.is_empty() {
293 warn!("No EPICs to subscribe to");
294 return Ok(());
295 }
296
297 info!("Starting connection with {} EPICs", epics.len());
298
299 // Create new client
300 let mut new_client = StreamerClient::new().await?;
301
302 // Subscribe to all EPICs
303 let fields = self.fields.clone();
304 let mut receiver = new_client.market_subscribe(epics.clone(), fields).await?;
305
306 // Forward updates to the main channel
307 let price_tx = self.price_tx.read().await;
308 if let Some(tx) = price_tx.as_ref() {
309 let tx = tx.clone();
310 tokio::spawn(async move {
311 while let Some(price_data) = receiver.recv().await {
312 if tx.send(price_data).is_err() {
313 warn!("Failed to send price update: receiver dropped");
314 break;
315 }
316 }
317 debug!("Subscription forwarding task ended");
318 });
319 }
320
321 // Store the new client
322 *self.client.write().await = Some(new_client);
323
324 // Create new shutdown signal
325 let signal = Arc::new(Notify::new());
326 *self.shutdown_signal.write().await = Some(Arc::clone(&signal));
327
328 // Mark as connected
329 *self.is_connected.write().await = true;
330
331 // Spawn connection task in background
332 let client = Arc::clone(&self.client);
333 let is_connected = Arc::clone(&self.is_connected);
334
335 tokio::spawn(async move {
336 let result = {
337 let mut client_guard = client.write().await;
338 if let Some(ref mut c) = *client_guard {
339 c.connect(Some(signal)).await
340 } else {
341 Ok(())
342 }
343 };
344
345 // Mark as disconnected
346 *is_connected.write().await = false;
347
348 match result {
349 Ok(_) => info!("Connection task completed successfully"),
350 Err(e) => tracing::error!("Connection task failed: {:?}", e),
351 }
352 });
353
354 info!("Connection task started in background");
355 Ok(())
356 }
357
358 /// Starts the connection to the Lightstreamer server and subscribes to all initial EPICs.
359 ///
360 /// This method subscribes to all EPICs in the subscription list and then spawns a background
361 /// task to maintain the connection. This allows dynamic subscription management while connected.
362 ///
363 /// # Returns
364 ///
365 /// Returns `Ok(())` immediately after starting the connection task.
366 ///
367 /// # Examples
368 ///
369 /// ```ignore
370 /// // Start connection
371 /// streamer.start().await?;
372 ///
373 /// // Keep main thread alive
374 /// tokio::signal::ctrl_c().await?;
375 /// ```
376 pub async fn start(&mut self) -> Result<(), AppError> {
377 self.start_internal().await
378 }
379
380 /// Connects to the Lightstreamer server and blocks until shutdown.
381 ///
382 /// This is a convenience method that calls `start()` and then waits for a shutdown signal.
383 /// Use `start()` if you need non-blocking behavior.
384 ///
385 /// # Returns
386 ///
387 /// Returns `Ok(())` when the connection is closed gracefully.
388 ///
389 /// # Examples
390 ///
391 /// ```ignore
392 /// // Connect and block until shutdown
393 /// streamer.connect().await?;
394 /// ```
395 pub async fn connect(&mut self) -> Result<(), AppError> {
396 self.start().await?;
397
398 // Wait for SIGINT/SIGTERM
399 use lightstreamer_rs::utils::setup_signal_hook;
400 let signal = Arc::new(Notify::new());
401 setup_signal_hook(Arc::clone(&signal)).await;
402 signal.notified().await;
403
404 // Disconnect
405 self.disconnect().await?;
406
407 Ok(())
408 }
409
410 /// Disconnects from the Lightstreamer server.
411 ///
412 /// This method gracefully closes the connection to the server.
413 ///
414 /// # Returns
415 ///
416 /// Returns `Ok(())` if the disconnection was successful.
417 ///
418 /// # Examples
419 ///
420 /// ```ignore
421 /// streamer.disconnect().await?;
422 /// ```
423 pub async fn disconnect(&mut self) -> Result<(), AppError> {
424 // Signal shutdown
425 {
426 let shutdown_lock = self.shutdown_signal.read().await;
427 if let Some(signal) = shutdown_lock.as_ref() {
428 signal.notify_one();
429 }
430 }
431
432 // Disconnect client
433 let mut client_lock = self.client.write().await;
434 if let Some(ref mut client) = *client_lock {
435 client.disconnect().await?;
436 }
437 *client_lock = None;
438
439 *self.is_connected.write().await = false;
440 info!("Disconnected from Lightstreamer server");
441 Ok(())
442 }
443}
444
445impl Clone for DynamicMarketStreamer {
446 fn clone(&self) -> Self {
447 Self {
448 client: Arc::clone(&self.client),
449 epics: Arc::clone(&self.epics),
450 fields: self.fields.clone(),
451 price_tx: Arc::clone(&self.price_tx),
452 price_rx: Arc::clone(&self.price_rx),
453 is_connected: Arc::clone(&self.is_connected),
454 shutdown_signal: Arc::clone(&self.shutdown_signal),
455 }
456 }
457}