Crate lightstreamer_rs

Source
Expand description

§Lightstreamer Rust Client

This project is a Rust implementation of the Lightstreamer TLCP (Text-based Live Connections Protocol). It provides a robust client SDK to interact with Lightstreamer servers, enabling real-time data streaming for financial applications, IoT systems, and other use cases requiring live data updates. While it was initially developed to support the ig_trading_api project, it has evolved into a more comprehensive SDK with broader applicability.

§About Lightstreamer

Lightstreamer is a high-performance real-time messaging server that provides several key features:

  • Real-time data streaming with optimized bandwidth usage
  • Support for various transport mechanisms (WebSockets, HTTP streaming, etc.)
  • Different subscription modes for various data delivery patterns
  • Robust connection management with automatic recovery
  • Scalable architecture for high-volume applications

§Features

This Rust client SDK provides the following capabilities:

  • Connection Management:

    • Full-duplex WebSocket-based connection mode
    • Automatic reconnection with configurable retry policies
    • Session recovery after temporary disconnections
    • Connection status monitoring and event notifications
    • Proxy support for enterprise environments
  • Subscription Capabilities:

    • Support for multiple subscription modes (MERGE, DISTINCT, COMMAND, RAW)
    • Subscription to individual items or item groups
    • Field schema definition for structured data
    • Real-time item updates with change detection
    • Snapshot support for initial state recovery
  • Configuration Options:

    • Extensive connection options (polling intervals, timeouts, etc.)
    • Bandwidth control and throttling
    • Custom HTTP headers for authentication
    • Logging configuration for debugging
  • Event Handling:

    • Comprehensive event listener system
    • Subscription lifecycle events (subscription, unsubscription)
    • Item update notifications with field-level change detection
    • Connection status change notifications
    • Error handling and reporting

§Implementation Status

The current implementation supports most core features of the Lightstreamer protocol, with a focus on the WebSocket transport mechanism and the MERGE subscription mode. While initially developed for specific trading API requirements, the library has expanded to include:

  • All subscription modes (MERGE, DISTINCT, COMMAND, RAW)
  • Robust error handling and recovery mechanisms
  • Comprehensive configuration options
  • Thread-safe asynchronous operation using Tokio

Some advanced features that may be implemented in future versions include:

  • Message sending capabilities (MPN)
  • Client-side filtering and frequency limitations
  • Additional transport mechanisms beyond WebSockets
  • Enhanced security features

§Installation

To use this SDK in your Rust project, add the following dependency to your Cargo.toml:

[dependencies]
lightstreamer-rs = "0.1.3"

§Usage

Here’s a comprehensive example of how to use the Lightstreamer Rust Client SDK:

// This example shows how to use the Lightstreamer Rust client
use lightstreamer_rs::client::{LightstreamerClient, Transport};
use lightstreamer_rs::subscription::{Subscription, SubscriptionMode, SubscriptionListener, ItemUpdate};
use std::sync::Arc;
use tokio::sync::Notify;
use std::time::Duration;

// Define a custom subscription listener
struct MySubscriptionListener;

impl SubscriptionListener for MySubscriptionListener {
    fn on_subscription(&self) {
        info!("Subscription confirmed by the server");
    }
     
    fn on_item_update(&self, update: ItemUpdate) {
        info!("Received update for item: {}", update.get_item_name());
        for field in update.get_fields() {
            if let Some(value) = update.get_value(field) {
                info!("  {} = {}", field, value);
            }
        }
    }
}

async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Create a new LightstreamerClient instance
    let result = LightstreamerClient::new(
        Some("ws://your-lightstreamer-server.com"),  // Server address
        Some("YOUR_ADAPTER_SET"),                   // Adapter set
        None,                                       // User (optional)
        None,                                       // Password (optional)
    );
     
    let mut client = match result {
        Ok(client) => client,
        Err(e) => return Err(e),
    };
     
    // Configure the connection details if needed
    client.connection_details.set_user("YOUR_USERNAME");
    client.connection_details.set_password("YOUR_PASSWORD");
     
    // Configure connection options if needed
    client.connection_options.set_content_length(50000000);
    client.connection_options.set_keepalive_interval(5);
    client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
     
    // Create a shutdown signal for graceful termination
    let shutdown_signal = Arc::new(Notify::new());
     
    // Connect to the Lightstreamer server
    if let Err(e) = client.connect(shutdown_signal.clone()).await {
        return Err(e);
    }
     
    // Create a subscription
    let subscription_result = Subscription::new(
        SubscriptionMode::Merge,
        Some(vec!["item1".to_string(), "item2".to_string()]),
        Some(vec!["field1".to_string(), "field2".to_string()]),
    );
     
    let subscription = match subscription_result {
        Ok(sub) => sub,
        Err(e) => return Err(e),
    };
     
    // Add a listener to the subscription (optional)
    let listener = Box::new(MySubscriptionListener);
    subscription.add_listener(listener);
     
    // Get the subscription sender from the client
    // Note: This method might not exist in the current API, check the documentation
    let subscription_sender = client.get_subscriptions()[0].clone();
     
    // Subscribe and get the subscription ID
    let subscription_id_result = LightstreamerClient::subscribe_get_id(
        subscription_sender.clone(),
        subscription
    ).await;
     
    let subscription_id = match subscription_id_result {
        Ok(id) => id,
        Err(e) => return Err(e),
    };
     
    info!("Subscribed with ID: {}", subscription_id);
     
    // Wait for some time (in a real application, you would wait for a shutdown signal)
    tokio::time::sleep(Duration::from_secs(5)).await;
     
    // Unsubscribe before disconnecting
    LightstreamerClient::unsubscribe(subscription_sender, subscription_id).await;
     
    Ok(())
}

§Handling Client Events

You can also add listeners to handle client events:

Here’s an example of implementing a client listener:

// Note: ClientListener is a private trait, this is just for illustration
use lightstreamer_rs::client::model::ClientStatus;

struct MyClientListener;

// This is just an example of what the ClientListener trait might look like
// The actual implementation is internal to the library
trait ClientListener {
    fn on_status_change(&self, status: &ClientStatus);
    fn on_server_error(&self, error_code: i32, error_message: &str);
    fn on_property_change(&self, property: &str);
}

impl ClientListener for MyClientListener {
    fn on_status_change(&self, status: &ClientStatus) {
        info!("Client status changed to: {:?}", status);
    }
     
    fn on_server_error(&self, error_code: i32, error_message: &str) {
        info!("Server error: {} - {}", error_code, error_message);
    }
     
    fn on_property_change(&self, property: &str) {
        info!("Property changed: {}", property);
    }
}

// Then add the listener to your client
// client.add_listener(Box::new(MyClientListener));

Modules§

client
Module containing client-related functionality.
subscription
Module containing subscription-related functionality.
utils
Module containing utility functions and error types.