DynamicMarketStreamer

Struct DynamicMarketStreamer 

Source
pub struct DynamicMarketStreamer { /* private fields */ }
Expand description

Dynamic market streamer with thread-safe subscription management.

This struct wraps a StreamerClient and provides methods to dynamically add, remove, and clear market subscriptions. All operations are thread-safe and can be called from multiple threads concurrently.

§Examples

use ig_client::application::dynamic_streamer::DynamicMarketStreamer;
use ig_client::model::streaming::StreamingMarketField;
use std::collections::HashSet;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create fields to subscribe to
    let fields = HashSet::from([
        StreamingMarketField::Bid,
        StreamingMarketField::Offer,
    ]);

    // Create the dynamic streamer
    let mut streamer = DynamicMarketStreamer::new(fields).await?;

    // Get the receiver for price updates
    let mut receiver = streamer.get_receiver().await?;

    // Add markets from different threads
    let streamer_clone = streamer.clone();
    tokio::spawn(async move {
        streamer_clone.add("IX.D.DAX.DAILY.IP".to_string()).await.unwrap();
    });

    // Start receiving updates
    tokio::spawn(async move {
        while let Some(price_data) = receiver.recv().await {
            println!("Price update: {}", price_data);
        }
    });

    // Connect and run
    streamer.connect(None).await?;
    Ok(())
}

Implementations§

Source§

impl DynamicMarketStreamer

Source

pub async fn new( fields: HashSet<StreamingMarketField>, ) -> Result<Self, AppError>

Creates a new dynamic market streamer.

§Arguments
  • fields - Set of market data fields to receive (e.g., BID, OFFER, etc.)
§Returns

Returns a new DynamicMarketStreamer instance or an error if initialization fails.

§Examples
let fields = HashSet::from([
    StreamingMarketField::Bid,
    StreamingMarketField::Offer,
]);
let streamer = DynamicMarketStreamer::new(fields).await?;
Source

pub async fn add(&self, epic: String) -> Result<(), AppError>

Adds a market EPIC to the subscription list.

If the streamer is already connected, this will reconnect with the updated list.

§Arguments
  • epic - The market EPIC to subscribe to
§Returns

Returns Ok(()) if the EPIC was added successfully.

§Examples
streamer.add("IX.D.DAX.DAILY.IP".to_string()).await?;
Source

pub async fn remove(&self, epic: String) -> Result<(), AppError>

Removes a market EPIC from the subscription list.

If the streamer is already connected, this will reconnect with the updated list.

§Arguments
  • epic - The market EPIC to remove
§Returns

Returns Ok(()) if the EPIC was removed successfully.

§Examples
streamer.remove("IX.D.DAX.DAILY.IP".to_string()).await?;
Source

pub async fn clear(&self) -> Result<(), AppError>

Clears all market EPICs from the subscription list.

Note: Due to Lightstreamer limitations, this does not unsubscribe from the server immediately. All EPICs will be removed from the internal list.

§Returns

Returns Ok(()) when all EPICs have been cleared.

§Examples
streamer.clear().await?;
Source

pub async fn get_epics(&self) -> Vec<String>

Gets the current list of subscribed EPICs.

§Returns

Returns a vector containing all currently subscribed EPICs.

§Examples
let epics = streamer.get_epics().await;
println!("Subscribed to {} markets", epics.len());
Source

pub async fn get_receiver( &self, ) -> Result<UnboundedReceiver<PriceData>, AppError>

Gets the receiver for price updates.

This method can only be called once. Subsequent calls will return an error.

§Returns

Returns a receiver channel for PriceData updates, or an error if the receiver has already been taken.

§Examples
let mut receiver = streamer.get_receiver().await?;
tokio::spawn(async move {
    while let Some(price_data) = receiver.recv().await {
        println!("Price update: {}", price_data);
    }
});
Source

pub async fn start(&mut self) -> Result<(), AppError>

Starts the connection to the Lightstreamer server and subscribes to all initial EPICs.

This method subscribes to all EPICs in the subscription list and then spawns a background task to maintain the connection. This allows dynamic subscription management while connected.

§Returns

Returns Ok(()) immediately after starting the connection task.

§Examples
// Start connection
streamer.start().await?;

// Keep main thread alive
tokio::signal::ctrl_c().await?;
Source

pub async fn connect(&mut self) -> Result<(), AppError>

Connects to the Lightstreamer server and blocks until shutdown.

This is a convenience method that calls start() and then waits for a shutdown signal. Use start() if you need non-blocking behavior.

§Returns

Returns Ok(()) when the connection is closed gracefully.

§Examples
// Connect and block until shutdown
streamer.connect().await?;
Source

pub async fn disconnect(&mut self) -> Result<(), AppError>

Disconnects from the Lightstreamer server.

This method gracefully closes the connection to the server.

§Returns

Returns Ok(()) if the disconnection was successful.

§Examples
streamer.disconnect().await?;

Trait Implementations§

Source§

impl Clone for DynamicMarketStreamer

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,