pub struct DynamicMarketStreamer { /* private fields */ }Expand description
Dynamic market streamer with thread-safe subscription management.
This struct wraps a StreamerClient and provides methods to dynamically
add, remove, and clear market subscriptions. All operations are thread-safe
and can be called from multiple threads concurrently.
§Examples
use ig_client::application::dynamic_streamer::DynamicMarketStreamer;
use ig_client::model::streaming::StreamingMarketField;
use std::collections::HashSet;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create fields to subscribe to
let fields = HashSet::from([
StreamingMarketField::Bid,
StreamingMarketField::Offer,
]);
// Create the dynamic streamer
let mut streamer = DynamicMarketStreamer::new(fields).await?;
// Get the receiver for price updates
let mut receiver = streamer.get_receiver().await?;
// Add markets from different threads
let streamer_clone = streamer.clone();
tokio::spawn(async move {
streamer_clone.add("IX.D.DAX.DAILY.IP".to_string()).await.unwrap();
});
// Start receiving updates
tokio::spawn(async move {
while let Some(price_data) = receiver.recv().await {
println!("Price update: {}", price_data);
}
});
// Connect and run
streamer.connect(None).await?;
Ok(())
}Implementations§
Source§impl DynamicMarketStreamer
impl DynamicMarketStreamer
Sourcepub async fn new(
fields: HashSet<StreamingMarketField>,
) -> Result<Self, AppError>
pub async fn new( fields: HashSet<StreamingMarketField>, ) -> Result<Self, AppError>
Creates a new dynamic market streamer.
§Arguments
fields- Set of market data fields to receive (e.g., BID, OFFER, etc.)
§Returns
Returns a new DynamicMarketStreamer instance or an error if initialization fails.
§Examples
let fields = HashSet::from([
StreamingMarketField::Bid,
StreamingMarketField::Offer,
]);
let streamer = DynamicMarketStreamer::new(fields).await?;Sourcepub async fn add(&self, epic: String) -> Result<(), AppError>
pub async fn add(&self, epic: String) -> Result<(), AppError>
Adds a market EPIC to the subscription list.
If the streamer is already connected, this will reconnect with the updated list.
§Arguments
epic- The market EPIC to subscribe to
§Returns
Returns Ok(()) if the EPIC was added successfully.
§Examples
streamer.add("IX.D.DAX.DAILY.IP".to_string()).await?;Sourcepub async fn remove(&self, epic: String) -> Result<(), AppError>
pub async fn remove(&self, epic: String) -> Result<(), AppError>
Removes a market EPIC from the subscription list.
If the streamer is already connected, this will reconnect with the updated list.
§Arguments
epic- The market EPIC to remove
§Returns
Returns Ok(()) if the EPIC was removed successfully.
§Examples
streamer.remove("IX.D.DAX.DAILY.IP".to_string()).await?;Sourcepub async fn get_receiver(
&self,
) -> Result<UnboundedReceiver<PriceData>, AppError>
pub async fn get_receiver( &self, ) -> Result<UnboundedReceiver<PriceData>, AppError>
Gets the receiver for price updates.
This method can only be called once. Subsequent calls will return an error.
§Returns
Returns a receiver channel for PriceData updates, or an error if the
receiver has already been taken.
§Examples
let mut receiver = streamer.get_receiver().await?;
tokio::spawn(async move {
while let Some(price_data) = receiver.recv().await {
println!("Price update: {}", price_data);
}
});Sourcepub async fn start(&mut self) -> Result<(), AppError>
pub async fn start(&mut self) -> Result<(), AppError>
Starts the connection to the Lightstreamer server and subscribes to all initial EPICs.
This method subscribes to all EPICs in the subscription list and then spawns a background task to maintain the connection. This allows dynamic subscription management while connected.
§Returns
Returns Ok(()) immediately after starting the connection task.
§Examples
// Start connection
streamer.start().await?;
// Keep main thread alive
tokio::signal::ctrl_c().await?;Sourcepub async fn connect(&mut self) -> Result<(), AppError>
pub async fn connect(&mut self) -> Result<(), AppError>
Connects to the Lightstreamer server and blocks until shutdown.
This is a convenience method that calls start() and then waits for a shutdown signal.
Use start() if you need non-blocking behavior.
§Returns
Returns Ok(()) when the connection is closed gracefully.
§Examples
// Connect and block until shutdown
streamer.connect().await?;Trait Implementations§
Auto Trait Implementations§
impl Freeze for DynamicMarketStreamer
impl !RefUnwindSafe for DynamicMarketStreamer
impl Send for DynamicMarketStreamer
impl Sync for DynamicMarketStreamer
impl Unpin for DynamicMarketStreamer
impl !UnwindSafe for DynamicMarketStreamer
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§unsafe fn clone_to_uninit(&self, dest: *mut u8)
unsafe fn clone_to_uninit(&self, dest: *mut u8)
clone_to_uninit)Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more