use marketdata_core::{
websocket::{ConnectionConfig, ConnectionEvent, StockSubscription},
AuthRequest, Channel, WebSocketClient,
};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), marketdata_core::MarketDataError> {
let api_key =
std::env::var("FUGLE_API_KEY").expect("FUGLE_API_KEY environment variable not set");
println!("=== WebSocket Streaming Example ===\n");
let config = ConnectionConfig::fugle_stock(AuthRequest::with_api_key(&api_key));
let client = WebSocketClient::new(config);
println!("Connecting to WebSocket server...");
client.connect().await?;
println!("Connected and authenticated!\n");
println!("Subscribing to channels...");
let trades_sub = StockSubscription::new(Channel::Trades, "2330");
client.subscribe_channel(trades_sub).await?;
println!(" Subscribed to 2330 trades");
let books_sub = StockSubscription::new(Channel::Books, "2330");
client.subscribe_channel(books_sub).await?;
println!(" Subscribed to 2330 books");
let aggregates_sub = StockSubscription::new(Channel::Aggregates, "2330");
client.subscribe_channel(aggregates_sub).await?;
println!(" Subscribed to 2330 aggregates");
client
.subscribe_symbols(Channel::Trades, &["2317", "2454"], false)
.await?;
println!(" Subscribed to 2317, 2454 trades");
println!("\nListening for messages (10 seconds)...\n");
let messages = client.messages();
let events = Arc::clone(client.state_events());
let event_handle = tokio::spawn(async move {
loop {
let event = tokio::task::spawn_blocking({
let events = Arc::clone(&events);
move || {
let rx = events.blocking_lock();
rx.recv_timeout(Duration::from_millis(100))
}
})
.await;
match event {
Ok(Ok(ConnectionEvent::Disconnected { code, reason })) => {
println!("Event: Disconnected (code: {:?}, reason: {})", code, reason);
break;
}
Ok(Ok(event)) => {
println!("Event: {:?}", event);
}
Ok(Err(_)) => {
}
Err(_) => break,
}
}
});
let start = std::time::Instant::now();
let duration = Duration::from_secs(10);
let mut message_count = 0;
while start.elapsed() < duration {
match messages.receive_timeout(Duration::from_secs(1)) {
Ok(Some(msg)) => {
message_count += 1;
if msg.is_data() {
println!(
"[{}] Data: channel={:?}, symbol={:?}",
message_count,
msg.channel.as_deref().unwrap_or("?"),
msg.symbol.as_deref().unwrap_or("?")
);
if let Some(ref data) = msg.data {
if msg.channel.as_deref() == Some("trades") {
let price = data.get("price").and_then(serde_json::Value::as_f64);
let size = data.get("size").and_then(serde_json::Value::as_i64);
if let (Some(price), Some(size)) = (price, size) {
println!(" Trade: price={}, size={}", price, size);
}
} else if msg.channel.as_deref() == Some("aggregates") {
let close = data.get("closePrice").and_then(serde_json::Value::as_f64);
if let Some(close) = close {
println!(" Aggregate: closePrice={}", close);
}
}
}
} else if msg.is_subscribed() {
println!(
"[{}] Subscribed: id={:?}",
message_count,
msg.id.as_deref().unwrap_or("?")
);
} else if msg.is_error() {
println!(
"[{}] Error: {:?}",
message_count,
msg.error_message().unwrap_or_else(|| "Unknown".to_string())
);
} else {
println!("[{}] Event: {}", message_count, msg.event);
}
}
Ok(None) => {
if !client.is_connected().await {
println!("Connection lost");
break;
}
}
Err(e) => {
println!("Channel error: {}", e);
break;
}
}
}
println!("\n=== Summary ===");
println!("Total messages received: {}", message_count);
println!("Active subscriptions: {:?}", client.subscription_keys());
println!("\nDisconnecting...");
client.disconnect().await?;
println!("Disconnected gracefully");
event_handle.abort();
if client.is_closed().await {
println!("Client is now closed (cannot be reused)");
}
println!("\n=== Complete ===");
println!("WebSocket example finished successfully.");
Ok(())
}