ig_client/application/dynamic_streamer.rs
1/******************************************************************************
2 Author: Joaquín Béjar García
3 Email: jb@taunais.com
4 Date: 30/10/25
5******************************************************************************/
6
7//! Dynamic market streaming with thread-safe subscription management.
8//!
9//! This module provides a wrapper around `StreamerClient` that allows dynamic
10//! addition and removal of market subscriptions from multiple threads.
11
12use crate::application::client::StreamerClient;
13use crate::error::AppError;
14use crate::model::streaming::StreamingMarketField;
15use crate::presentation::price::PriceData;
16use std::collections::HashSet;
17use std::sync::Arc;
18use tokio::sync::{Notify, RwLock, mpsc};
19use tracing::{debug, info, warn};
20
21/// Dynamic market streamer with thread-safe subscription management.
22///
23/// This struct wraps a `StreamerClient` and provides methods to dynamically
24/// add, remove, and clear market subscriptions. All operations are thread-safe
25/// and can be called from multiple threads concurrently.
26///
27/// # Examples
28///
29/// ```ignore
30/// use ig_client::application::dynamic_streamer::DynamicMarketStreamer;
31/// use ig_client::model::streaming::StreamingMarketField;
32/// use std::collections::HashSet;
33///
34/// #[tokio::main]
35/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
36/// // Create fields to subscribe to
37/// let fields = HashSet::from([
38/// StreamingMarketField::Bid,
39/// StreamingMarketField::Offer,
40/// ]);
41///
42/// // Create the dynamic streamer
43/// let mut streamer = DynamicMarketStreamer::new(fields).await?;
44///
45/// // Get the receiver for price updates
46/// let mut receiver = streamer.get_receiver().await?;
47///
48/// // Add markets from different threads
49/// let streamer_clone = streamer.clone();
50/// tokio::spawn(async move {
51/// streamer_clone.add("IX.D.DAX.DAILY.IP".to_string()).await.unwrap();
52/// });
53///
54/// // Start receiving updates
55/// tokio::spawn(async move {
56/// while let Some(price_data) = receiver.recv().await {
57/// println!("Price update: {}", price_data);
58/// }
59/// });
60///
61/// // Connect and run
62/// streamer.connect(None).await?;
63/// Ok(())
64/// }
65/// ```
66pub struct DynamicMarketStreamer {
67 /// Internal streamer client (recreated on epic changes)
68 client: Arc<RwLock<Option<StreamerClient>>>,
69 /// Set of EPICs currently subscribed
70 epics: Arc<RwLock<HashSet<String>>>,
71 /// Market fields to subscribe to
72 fields: HashSet<StreamingMarketField>,
73 /// Channel sender for price updates
74 price_tx: Arc<RwLock<Option<mpsc::UnboundedSender<PriceData>>>>,
75 /// Channel receiver for price updates (taken on first get_receiver call)
76 price_rx: Arc<RwLock<Option<mpsc::UnboundedReceiver<PriceData>>>>,
77 /// Flag indicating if the streamer is connected
78 is_connected: Arc<RwLock<bool>>,
79 /// Shutdown signal for current connection
80 shutdown_signal: Arc<RwLock<Option<Arc<Notify>>>>,
81}
82
83impl DynamicMarketStreamer {
84 /// Creates a new dynamic market streamer.
85 ///
86 /// # Arguments
87 ///
88 /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
89 ///
90 /// # Returns
91 ///
92 /// Returns a new `DynamicMarketStreamer` instance or an error if initialization fails.
93 ///
94 /// # Examples
95 ///
96 /// ```ignore
97 /// let fields = HashSet::from([
98 /// StreamingMarketField::Bid,
99 /// StreamingMarketField::Offer,
100 /// ]);
101 /// let streamer = DynamicMarketStreamer::new(fields).await?;
102 /// ```
103 pub async fn new(fields: HashSet<StreamingMarketField>) -> Result<Self, AppError> {
104 let (price_tx, price_rx) = mpsc::unbounded_channel();
105
106 Ok(Self {
107 client: Arc::new(RwLock::new(None)),
108 epics: Arc::new(RwLock::new(HashSet::new())),
109 fields,
110 price_tx: Arc::new(RwLock::new(Some(price_tx))),
111 price_rx: Arc::new(RwLock::new(Some(price_rx))),
112 is_connected: Arc::new(RwLock::new(false)),
113 shutdown_signal: Arc::new(RwLock::new(None)),
114 })
115 }
116
117 /// Adds a market EPIC to the subscription list.
118 ///
119 /// If the streamer is already connected, this will reconnect with the updated list.
120 ///
121 /// # Arguments
122 ///
123 /// * `epic` - The market EPIC to subscribe to
124 ///
125 /// # Returns
126 ///
127 /// Returns `Ok(())` if the EPIC was added successfully.
128 ///
129 /// # Examples
130 ///
131 /// ```ignore
132 /// streamer.add("IX.D.DAX.DAILY.IP".to_string()).await?;
133 /// ```
134 pub async fn add(&self, epic: String) -> Result<(), AppError> {
135 let mut epics = self.epics.write().await;
136
137 if epics.contains(&epic) {
138 debug!("EPIC {} already subscribed", epic);
139 return Ok(());
140 }
141
142 epics.insert(epic.clone());
143 info!("Added EPIC {} to subscription list", epic);
144 drop(epics); // Release lock
145
146 // If already connected, reconnect with new list
147 let is_connected = *self.is_connected.read().await;
148 if is_connected {
149 self.reconnect().await?;
150 }
151
152 Ok(())
153 }
154
155 /// Removes a market EPIC from the subscription list.
156 ///
157 /// If the streamer is already connected, this will reconnect with the updated list.
158 ///
159 /// # Arguments
160 ///
161 /// * `epic` - The market EPIC to remove
162 ///
163 /// # Returns
164 ///
165 /// Returns `Ok(())` if the EPIC was removed successfully.
166 ///
167 /// # Examples
168 ///
169 /// ```ignore
170 /// streamer.remove("IX.D.DAX.DAILY.IP".to_string()).await?;
171 /// ```
172 pub async fn remove(&self, epic: String) -> Result<(), AppError> {
173 let mut epics = self.epics.write().await;
174
175 let was_removed = epics.remove(&epic);
176 if was_removed {
177 info!("Removed EPIC {} from subscription list", epic);
178 } else {
179 debug!("EPIC {} was not in subscription list", epic);
180 }
181 drop(epics); // Release lock
182
183 // If already connected and something was removed, reconnect
184 if was_removed {
185 let is_connected = *self.is_connected.read().await;
186 if is_connected {
187 self.reconnect().await?;
188 }
189 }
190
191 Ok(())
192 }
193
194 /// Clears all market EPICs from the subscription list.
195 ///
196 /// Note: Due to Lightstreamer limitations, this does not unsubscribe from
197 /// the server immediately. All EPICs will be removed from the internal list.
198 ///
199 /// # Returns
200 ///
201 /// Returns `Ok(())` when all EPICs have been cleared.
202 ///
203 /// # Examples
204 ///
205 /// ```ignore
206 /// streamer.clear().await?;
207 /// ```
208 pub async fn clear(&self) -> Result<(), AppError> {
209 let mut epics = self.epics.write().await;
210 let count = epics.len();
211 epics.clear();
212 info!("Cleared {} EPICs from subscription list", count);
213 Ok(())
214 }
215
216 /// Gets the current list of subscribed EPICs.
217 ///
218 /// # Returns
219 ///
220 /// Returns a vector containing all currently subscribed EPICs.
221 ///
222 /// # Examples
223 ///
224 /// ```ignore
225 /// let epics = streamer.get_epics().await;
226 /// println!("Subscribed to {} markets", epics.len());
227 /// ```
228 pub async fn get_epics(&self) -> Vec<String> {
229 let epics = self.epics.read().await;
230 epics.iter().cloned().collect()
231 }
232
233 /// Gets the receiver for price updates.
234 ///
235 /// This method can only be called once. Subsequent calls will return an error.
236 ///
237 /// # Returns
238 ///
239 /// Returns a receiver channel for `PriceData` updates, or an error if the
240 /// receiver has already been taken.
241 ///
242 /// # Examples
243 ///
244 /// ```ignore
245 /// let mut receiver = streamer.get_receiver().await?;
246 /// tokio::spawn(async move {
247 /// while let Some(price_data) = receiver.recv().await {
248 /// println!("Price update: {}", price_data);
249 /// }
250 /// });
251 /// ```
252 pub async fn get_receiver(&self) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
253 let mut rx_lock = self.price_rx.write().await;
254 rx_lock
255 .take()
256 .ok_or_else(|| AppError::InvalidInput("Receiver already taken".to_string()))
257 }
258
259 /// Reconnects the streamer with the current list of EPICs.
260 ///
261 /// This method disconnects the current client and creates a new one with
262 /// the updated EPIC list.
263 async fn reconnect(&self) -> Result<(), AppError> {
264 info!("Reconnecting with updated EPIC list...");
265
266 // Signal shutdown to current client
267 {
268 let shutdown_lock = self.shutdown_signal.read().await;
269 if let Some(signal) = shutdown_lock.as_ref() {
270 signal.notify_one();
271 }
272 }
273
274 // Wait a bit for graceful shutdown
275 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
276
277 // Start new connection
278 let epics = self.get_epics().await;
279 if !epics.is_empty() {
280 self.start_internal().await?;
281 }
282
283 Ok(())
284 }
285
286 /// Internal method to start connection.
287 async fn start_internal(&self) -> Result<(), AppError> {
288 let epics = self.get_epics().await;
289
290 if epics.is_empty() {
291 warn!("No EPICs to subscribe to");
292 return Ok(());
293 }
294
295 info!("Starting connection with {} EPICs", epics.len());
296
297 // Create new client
298 let mut new_client = StreamerClient::new().await?;
299
300 // Subscribe to all EPICs
301 let fields = self.fields.clone();
302 let mut receiver = new_client.market_subscribe(epics.clone(), fields).await?;
303
304 // Forward updates to the main channel
305 let price_tx = self.price_tx.read().await;
306 if let Some(tx) = price_tx.as_ref() {
307 let tx = tx.clone();
308 tokio::spawn(async move {
309 while let Some(price_data) = receiver.recv().await {
310 if tx.send(price_data).is_err() {
311 warn!("Failed to send price update: receiver dropped");
312 break;
313 }
314 }
315 debug!("Subscription forwarding task ended");
316 });
317 }
318
319 // Store the new client
320 *self.client.write().await = Some(new_client);
321
322 // Create new shutdown signal
323 let signal = Arc::new(Notify::new());
324 *self.shutdown_signal.write().await = Some(Arc::clone(&signal));
325
326 // Mark as connected
327 *self.is_connected.write().await = true;
328
329 // Spawn connection task in background
330 let client = Arc::clone(&self.client);
331 let is_connected = Arc::clone(&self.is_connected);
332
333 tokio::spawn(async move {
334 let result = {
335 let mut client_guard = client.write().await;
336 if let Some(ref mut c) = *client_guard {
337 c.connect(Some(signal)).await
338 } else {
339 Ok(())
340 }
341 };
342
343 // Mark as disconnected
344 *is_connected.write().await = false;
345
346 match result {
347 Ok(_) => info!("Connection task completed successfully"),
348 Err(e) => tracing::error!("Connection task failed: {:?}", e),
349 }
350 });
351
352 info!("Connection task started in background");
353 Ok(())
354 }
355
356 /// Starts the connection to the Lightstreamer server and subscribes to all initial EPICs.
357 ///
358 /// This method subscribes to all EPICs in the subscription list and then spawns a background
359 /// task to maintain the connection. This allows dynamic subscription management while connected.
360 ///
361 /// # Returns
362 ///
363 /// Returns `Ok(())` immediately after starting the connection task.
364 ///
365 /// # Examples
366 ///
367 /// ```ignore
368 /// // Start connection
369 /// streamer.start().await?;
370 ///
371 /// // Keep main thread alive
372 /// tokio::signal::ctrl_c().await?;
373 /// ```
374 pub async fn start(&mut self) -> Result<(), AppError> {
375 self.start_internal().await
376 }
377
378 /// Connects to the Lightstreamer server and blocks until shutdown.
379 ///
380 /// This is a convenience method that calls `start()` and then waits for a shutdown signal.
381 /// Use `start()` if you need non-blocking behavior.
382 ///
383 /// # Returns
384 ///
385 /// Returns `Ok(())` when the connection is closed gracefully.
386 ///
387 /// # Examples
388 ///
389 /// ```ignore
390 /// // Connect and block until shutdown
391 /// streamer.connect().await?;
392 /// ```
393 pub async fn connect(&mut self) -> Result<(), AppError> {
394 self.start().await?;
395
396 // Wait for SIGINT/SIGTERM
397 use lightstreamer_rs::utils::setup_signal_hook;
398 let signal = Arc::new(Notify::new());
399 setup_signal_hook(Arc::clone(&signal)).await;
400 signal.notified().await;
401
402 // Disconnect
403 self.disconnect().await?;
404
405 Ok(())
406 }
407
408 /// Disconnects from the Lightstreamer server.
409 ///
410 /// This method gracefully closes the connection to the server.
411 ///
412 /// # Returns
413 ///
414 /// Returns `Ok(())` if the disconnection was successful.
415 ///
416 /// # Examples
417 ///
418 /// ```ignore
419 /// streamer.disconnect().await?;
420 /// ```
421 pub async fn disconnect(&mut self) -> Result<(), AppError> {
422 // Signal shutdown
423 {
424 let shutdown_lock = self.shutdown_signal.read().await;
425 if let Some(signal) = shutdown_lock.as_ref() {
426 signal.notify_one();
427 }
428 }
429
430 // Disconnect client
431 let mut client_lock = self.client.write().await;
432 if let Some(ref mut client) = *client_lock {
433 client.disconnect().await?;
434 }
435 *client_lock = None;
436
437 *self.is_connected.write().await = false;
438 info!("Disconnected from Lightstreamer server");
439 Ok(())
440 }
441}
442
443impl Clone for DynamicMarketStreamer {
444 fn clone(&self) -> Self {
445 Self {
446 client: Arc::clone(&self.client),
447 epics: Arc::clone(&self.epics),
448 fields: self.fields.clone(),
449 price_tx: Arc::clone(&self.price_tx),
450 price_rx: Arc::clone(&self.price_rx),
451 is_connected: Arc::clone(&self.is_connected),
452 shutdown_signal: Arc::clone(&self.shutdown_signal),
453 }
454 }
455}