use tokio::sync::{mpsc, watch};
use tracing::instrument;
use crate::IgClient;
use crate::error::Result;
use crate::session::{AuthTokens, SessionHandle};
use crate::streaming::connection::{CreateParams, LsConnection};
use crate::streaming::events::{
AccountUpdate, CandleScale, ChartCandleUpdate, ChartTickUpdate, MarketUpdate, TradeUpdate,
};
use crate::streaming::reconnect::{AutoReconnect, StreamingEvent};
use crate::streaming::subscription::{Registry, SubscriptionKind};
const CHANNEL_CAP: usize = 256;
const EVENT_CHAN_CAP: usize = 64;
#[derive(Debug)]
pub struct StreamingApi<'a> {
pub(crate) client: &'a IgClient,
}
impl StreamingApi<'_> {
#[instrument(skip_all, name = "streaming.connect")]
pub async fn connect(&self) -> Result<(StreamingClient, mpsc::Receiver<StreamingEvent>)> {
self.connect_with(AutoReconnect::default()).await
}
#[instrument(skip_all, name = "streaming.connect_with")]
pub async fn connect_with(
&self,
policy: AutoReconnect,
) -> Result<(StreamingClient, mpsc::Receiver<StreamingEvent>)> {
let state = self.client.session.require_authenticated().await?;
let account_id = state.account_id.ok_or_else(|| {
crate::error::Error::Auth(
"no account ID in session — call session().login() first".into(),
)
})?;
let endpoint = state.lightstreamer_endpoint.ok_or_else(|| {
crate::error::Error::Auth(
"no Lightstreamer endpoint in session — call session().login() first".into(),
)
})?;
let password = match state.tokens.as_ref() {
Some(AuthTokens::Cst {
cst,
x_security_token,
}) => format!("CST-{cst}|XST-{x_security_token}"),
Some(AuthTokens::OAuth { .. }) => {
return Err(crate::error::Error::Auth(
"OAuth session cannot be used directly for streaming. \
Call client.session().read(true).await? first to obtain \
CST/XST tokens, then call streaming().connect() again."
.into(),
));
}
None => return Err(crate::error::Error::Auth("no active session tokens".into())),
};
let registry = Registry::new();
let (shutdown_tx, _shutdown_rx) = watch::channel(false);
let (event_tx, event_rx) = mpsc::channel(EVENT_CHAN_CAP);
let session_handle = SessionHandle {
transport: self.client.transport.clone(),
session: self.client.session.clone(),
credentials: self.client.credentials.clone(),
};
let conn = LsConnection::create(CreateParams {
endpoint,
username: account_id,
password,
registry: registry.clone(),
shutdown_tx: shutdown_tx.clone(),
policy,
event_tx: Some(event_tx),
session_handle,
})
.await?;
let client = StreamingClient {
conn,
registry,
shutdown_tx,
};
Ok((client, event_rx))
}
}
#[derive(Debug)]
pub struct StreamingClient {
conn: LsConnection,
registry: Registry,
shutdown_tx: watch::Sender<bool>,
}
impl StreamingClient {
#[instrument(skip(self), fields(%epic))]
pub async fn subscribe_market(&self, epic: &str) -> Result<mpsc::Receiver<MarketUpdate>> {
use crate::streaming::events::MARKET_FIELDS;
let (tx, rx) = mpsc::channel(CHANNEL_CAP);
let idx = self.registry.register(SubscriptionKind::Market {
epic: epic.to_owned(),
tx,
});
let item = format!("MARKET:{epic}");
let fields = MARKET_FIELDS.join(" ");
self.conn
.control("add", idx, &item, &fields, "MERGE")
.await?;
Ok(rx)
}
#[instrument(skip(self), fields(%epic))]
pub async fn subscribe_chart_tick(
&self,
epic: &str,
) -> Result<mpsc::Receiver<ChartTickUpdate>> {
use crate::streaming::events::CHART_TICK_FIELDS;
let (tx, rx) = mpsc::channel(CHANNEL_CAP);
let idx = self.registry.register(SubscriptionKind::ChartTick {
epic: epic.to_owned(),
tx,
});
let item = format!("CHART:{epic}:TICK");
let fields = CHART_TICK_FIELDS.join(" ");
self.conn
.control("add", idx, &item, &fields, "DISTINCT")
.await?;
Ok(rx)
}
#[instrument(skip(self), fields(%epic, scale = %scale))]
pub async fn subscribe_chart_candle(
&self,
epic: &str,
scale: CandleScale,
) -> Result<mpsc::Receiver<ChartCandleUpdate>> {
use crate::streaming::events::CHART_CANDLE_FIELDS;
let (tx, rx) = mpsc::channel(CHANNEL_CAP);
let idx = self.registry.register(SubscriptionKind::ChartCandle {
epic: epic.to_owned(),
scale,
tx,
});
let item = format!("CHART:{epic}:{}", scale.as_str());
let fields = CHART_CANDLE_FIELDS.join(" ");
self.conn
.control("add", idx, &item, &fields, "MERGE")
.await?;
Ok(rx)
}
#[instrument(skip(self), fields(%account_id))]
pub async fn subscribe_account(
&self,
account_id: &str,
) -> Result<mpsc::Receiver<AccountUpdate>> {
use crate::streaming::events::ACCOUNT_FIELDS;
let (tx, rx) = mpsc::channel(CHANNEL_CAP);
let idx = self.registry.register(SubscriptionKind::Account {
account_id: account_id.to_owned(),
tx,
});
let item = format!("ACCOUNT:{account_id}");
let fields = ACCOUNT_FIELDS.join(" ");
self.conn
.control("add", idx, &item, &fields, "MERGE")
.await?;
Ok(rx)
}
#[instrument(skip(self), fields(%account_id))]
pub async fn subscribe_trade(&self, account_id: &str) -> Result<mpsc::Receiver<TradeUpdate>> {
use crate::streaming::events::TRADE_FIELDS;
let (tx, rx) = mpsc::channel(CHANNEL_CAP);
let idx = self.registry.register(SubscriptionKind::Trade {
account_id: account_id.to_owned(),
tx,
});
let item = format!("TRADE:{account_id}");
let fields = TRADE_FIELDS.join(" ");
self.conn
.control("add", idx, &item, &fields, "DISTINCT")
.await?;
Ok(rx)
}
#[allow(clippy::unused_async)]
pub async fn disconnect(self) -> Result<()> {
let _ = self.shutdown_tx.send(true);
Ok(())
}
pub fn session_id(&self) -> &str {
&self.conn.session_id
}
}