use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{debug, info, warn};
use tokio::sync::mpsc;
use crate::stream::client::{
strategy_config_from_optional, validate_strategy_thresholds, IntoPositionSelector,
PositionSelector, StreamClient, StreamClientError, StreamConfigure, StreamConnection,
StreamSender,
};
use crate::stream::proto::{ServerMessage, StrategyConfigMsg};
#[derive(Debug)]
enum SessionReceiver {
Direct(StreamConnection),
Lanes {
sender: StreamSender,
high: mpsc::UnboundedReceiver<ServerMessage>,
low: mpsc::Receiver<ServerMessage>,
},
Transitioning,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PositionHandle {
pub position_id: u64,
pub token_account: String,
pub wallet_pubkey: String,
pub mint: String,
pub token_program: Option<String>,
pub tokens: u64,
pub entry_quote_units: u64,
pub token_name: Option<String>,
pub token_symbol: Option<String>,
pub token_decimals: Option<u8>,
pub market_type: Option<String>,
pub launch_platform: Option<String>,
pub token_price_quote: Option<u64>,
pub market_cap_quote: Option<u64>,
pub pool_liquidity_quote: Option<u64>,
pub opened_at_ms: Option<u64>,
}
impl IntoPositionSelector for PositionHandle {
fn into_position_selector(self) -> PositionSelector {
PositionSelector::TokenAccount(self.token_account)
}
}
impl IntoPositionSelector for &PositionHandle {
fn into_position_selector(self) -> PositionSelector {
PositionSelector::TokenAccount(self.token_account.clone())
}
}
#[derive(Clone, Debug)]
pub enum StreamEvent {
Message(ServerMessage),
PositionOpened {
handle: PositionHandle,
message: ServerMessage,
},
PositionClosed {
handle: Option<PositionHandle>,
message: ServerMessage,
},
ExitSignalWithTx {
handle: Option<PositionHandle>,
message: ServerMessage,
},
PnlUpdate {
handle: Option<PositionHandle>,
message: ServerMessage,
},
LiquiditySnapshot {
handle: Option<PositionHandle>,
message: ServerMessage,
},
TradeTick {
handle: Option<PositionHandle>,
message: ServerMessage,
},
MirrorBuySignal {
message: ServerMessage,
},
MirrorBuyFailed {
message: ServerMessage,
},
MirrorWalletAutoDisabled {
message: ServerMessage,
},
}
#[derive(Debug)]
pub struct StreamSession {
receiver: SessionReceiver,
positions: HashMap<u64, PositionHandle>,
liquidity_cache: HashMap<u64, ServerMessage>,
strategy: StrategyConfigMsg,
deadline_timeout_sec: u64,
deadline_tasks: HashMap<String, JoinHandle<()>>,
opened_at: HashMap<String, Instant>,
open_tokens: Arc<RwLock<HashSet<String>>>,
}
impl StreamSession {
pub async fn connect(
client: &StreamClient,
configure: StreamConfigure,
) -> Result<Self, StreamClientError> {
let deadline_timeout_sec = configure.deadline_timeout_sec;
let strategy = configure.strategy.clone();
let connection = client.connect(configure).await?;
debug!(event = "session_connected", deadline_timeout_sec);
Ok(Self::from_connection_with_strategy(
connection,
strategy,
deadline_timeout_sec,
))
}
pub fn from_connection(connection: StreamConnection) -> Self {
Self::from_connection_with_strategy(connection, default_strategy(), 0)
}
pub fn from_connection_with_strategy(
connection: StreamConnection,
strategy: StrategyConfigMsg,
deadline_timeout_sec: u64,
) -> Self {
Self {
receiver: SessionReceiver::Direct(connection),
positions: HashMap::new(),
liquidity_cache: HashMap::new(),
strategy,
deadline_timeout_sec,
deadline_tasks: HashMap::new(),
opened_at: HashMap::new(),
open_tokens: Arc::new(RwLock::new(HashSet::new())),
}
}
pub fn enable_lanes(&mut self, low_capacity: usize) {
let prev = std::mem::replace(&mut self.receiver, SessionReceiver::Transitioning);
let SessionReceiver::Direct(connection) = prev else {
self.receiver = prev;
return;
};
let lanes = connection.into_lanes(low_capacity);
let (sender, high, low) = lanes.split();
self.receiver = SessionReceiver::Lanes { sender, high, low };
}
pub fn sender(&self) -> StreamSender {
match &self.receiver {
SessionReceiver::Direct(conn) => conn.sender(),
SessionReceiver::Lanes { sender, .. } => sender.clone(),
SessionReceiver::Transitioning => unreachable!("sender() called during transition"),
}
}
pub fn positions(&self) -> Vec<PositionHandle> {
self.positions.values().cloned().collect()
}
pub fn positions_for_wallet_mint(&self, wallet: &str, mint: &str) -> Vec<PositionHandle> {
self.positions
.values()
.filter(|handle| handle.wallet_pubkey == wallet && handle.mint == mint)
.cloned()
.collect()
}
pub fn get_slippage_bands(&self, position_id: u64) -> Option<&Vec<lasersell_stream_proto::SlippageBandMsg>> {
match self.liquidity_cache.get(&position_id) {
Some(ServerMessage::LiquiditySnapshot { bands, .. }) => Some(bands),
_ => None,
}
}
pub fn get_max_sell_at_slippage(&self, position_id: u64, slippage_bps: u16) -> Option<u64> {
let bands = self.get_slippage_bands(position_id)?;
bands.iter().find(|b| b.slippage_bps == slippage_bps).map(|b| b.max_tokens)
}
pub fn get_liquidity_trend(&self, position_id: u64) -> Option<&str> {
match self.liquidity_cache.get(&position_id) {
Some(ServerMessage::LiquiditySnapshot { liquidity_trend, .. }) => Some(liquidity_trend.as_str()),
_ => None,
}
}
pub fn close_position(&self, handle: &PositionHandle) -> Result<(), StreamClientError> {
self.sender().close_position(handle)
}
pub fn close(mut self) {
self.cancel_all_deadlines();
}
pub fn request_exit_signal(
&self,
handle: &PositionHandle,
slippage_bps: Option<u16>,
) -> Result<(), StreamClientError> {
self.sender().request_exit_signal(handle, slippage_bps)
}
pub fn take_status_channel(
&mut self,
) -> Option<tokio::sync::mpsc::UnboundedReceiver<crate::stream::client::StreamConnectionStatus>>
{
match &mut self.receiver {
SessionReceiver::Direct(conn) => conn.take_status(),
SessionReceiver::Lanes { .. } | SessionReceiver::Transitioning => None,
}
}
pub fn update_strategy(
&mut self,
strategy: StrategyConfigMsg,
) -> Result<(), StreamClientError> {
validate_strategy_thresholds(&strategy, self.deadline_timeout_sec)?;
self.sender().update_strategy(strategy.clone())?;
self.strategy = strategy;
self.reschedule_all_deadlines();
Ok(())
}
pub fn update_strategy_with_deadline(
&mut self,
strategy: StrategyConfigMsg,
deadline_timeout_sec: u64,
) -> Result<(), StreamClientError> {
validate_strategy_thresholds(&strategy, deadline_timeout_sec)?;
self.sender().update_strategy(strategy.clone())?;
self.strategy = strategy;
self.deadline_timeout_sec = deadline_timeout_sec;
self.reschedule_all_deadlines();
Ok(())
}
pub fn update_strategy_optional(
&mut self,
target_profit_pct: Option<f64>,
stop_loss_pct: Option<f64>,
deadline_timeout_sec: Option<u64>,
) -> Result<(), StreamClientError> {
let strategy = strategy_config_from_optional(target_profit_pct, stop_loss_pct, None, None);
match deadline_timeout_sec {
Some(deadline) => self.update_strategy_with_deadline(strategy, deadline),
None => self.update_strategy(strategy),
}
}
pub async fn recv(&mut self) -> Option<StreamEvent> {
let message = match &mut self.receiver {
SessionReceiver::Direct(conn) => conn.recv().await?,
SessionReceiver::Lanes { high, low, .. } => {
tokio::select! {
biased;
msg = high.recv() => msg?,
msg = low.recv() => msg?,
}
}
SessionReceiver::Transitioning => return None,
};
Some(self.apply_message(message))
}
fn apply_message(&mut self, message: ServerMessage) -> StreamEvent {
match &message {
ServerMessage::PositionOpened {
position_id,
wallet_pubkey,
mint,
token_account,
token_program,
tokens,
entry_quote_units,
market_context,
token_name,
token_symbol,
token_decimals,
token_price_quote,
market_cap_quote,
pool_liquidity_quote,
opened_at_ms,
..
} => {
let handle = PositionHandle {
position_id: *position_id,
token_account: token_account.clone(),
wallet_pubkey: wallet_pubkey.clone(),
mint: mint.clone(),
token_program: token_program.clone(),
tokens: *tokens,
entry_quote_units: *entry_quote_units,
token_name: token_name.clone(),
token_symbol: token_symbol.clone(),
token_decimals: *token_decimals,
market_type: market_context.as_ref().map(|c| format!("{:?}", c.market_type).to_lowercase()),
launch_platform: None,
token_price_quote: *token_price_quote,
market_cap_quote: *market_cap_quote,
pool_liquidity_quote: *pool_liquidity_quote,
opened_at_ms: *opened_at_ms,
};
info!(event = "session_position_opened", position_id, mint = %mint, tokens);
self.positions.insert(*position_id, handle.clone());
self.opened_at
.insert(handle.token_account.clone(), Instant::now());
self.sync_open_tokens();
self.arm_deadline_for(&handle.token_account);
StreamEvent::PositionOpened { handle, message }
}
ServerMessage::PositionClosed {
position_id,
token_account,
..
} => {
info!(event = "session_position_closed", position_id);
let handle = self.remove_position(*position_id, token_account.as_deref());
let token = handle
.as_ref()
.map(|position| position.token_account.clone())
.or_else(|| token_account.clone());
if let Some(token_account) = token {
self.cancel_deadline_for(&token_account);
}
self.sync_open_tokens();
StreamEvent::PositionClosed { handle, message }
}
ServerMessage::ExitSignalWithTx {
position_id,
token_account,
..
} => {
info!(event = "session_exit_signal_received", position_id);
let handle = self.find_position(*position_id, token_account.as_deref());
StreamEvent::ExitSignalWithTx { handle, message }
}
ServerMessage::PnlUpdate { position_id, .. } => {
let handle = self.find_position(*position_id, None);
StreamEvent::PnlUpdate { handle, message }
}
ServerMessage::LiquiditySnapshot { position_id, .. } => {
self.liquidity_cache.insert(*position_id, message.clone());
let handle = self.find_position(*position_id, None);
StreamEvent::LiquiditySnapshot {
handle,
message,
}
}
ServerMessage::TradeTick { position_id, .. } => {
let handle = self.find_position(*position_id, None);
StreamEvent::TradeTick { handle, message }
}
ServerMessage::MirrorBuySignal { .. } => {
info!(event = "session_mirror_buy_signal_received");
StreamEvent::MirrorBuySignal { message }
}
ServerMessage::MirrorBuyFailed { .. } => {
warn!(event = "session_mirror_buy_failed");
StreamEvent::MirrorBuyFailed { message }
}
ServerMessage::MirrorWalletAutoDisabled { .. } => {
warn!(event = "session_mirror_wallet_auto_disabled");
StreamEvent::MirrorWalletAutoDisabled { message }
}
_ => StreamEvent::Message(message),
}
}
fn find_position(
&self,
position_id: u64,
token_account: Option<&str>,
) -> Option<PositionHandle> {
self.positions.get(&position_id).cloned().or_else(|| {
token_account.and_then(|account| {
self.positions
.values()
.find(|handle| handle.token_account == account)
.cloned()
})
})
}
fn remove_position(
&mut self,
position_id: u64,
token_account: Option<&str>,
) -> Option<PositionHandle> {
if let Some(handle) = self.positions.remove(&position_id) {
return Some(handle);
}
let account = token_account?;
let removed_id = self
.positions
.iter()
.find_map(|(id, handle)| (handle.token_account == account).then_some(*id))?;
self.positions.remove(&removed_id)
}
fn deadline_duration(&self) -> Option<Duration> {
(self.deadline_timeout_sec > 0).then_some(Duration::from_secs(self.deadline_timeout_sec))
}
fn arm_deadline_for(&mut self, token_account: &str) {
self.cancel_deadline_task_for(token_account);
let Some(deadline) = self.deadline_duration() else {
return;
};
let opened_at = *self
.opened_at
.entry(token_account.to_string())
.or_insert_with(Instant::now);
let now = Instant::now();
let remaining = opened_at
.checked_add(deadline)
.and_then(|deadline_at| deadline_at.checked_duration_since(now))
.unwrap_or_default();
if remaining.is_zero() {
info!(event = "session_deadline_fired", token_account);
self.try_request_exit_signal(token_account);
return;
}
debug!(event = "session_deadline_armed", token_account, remaining_ms = remaining.as_millis() as u64);
self.schedule_deadline_task(token_account.to_string(), remaining);
}
fn reschedule_all_deadlines(&mut self) {
self.cancel_all_deadline_tasks();
let Some(deadline) = self.deadline_duration() else {
return;
};
let now = Instant::now();
let mut token_accounts = HashSet::new();
for handle in self.positions.values() {
token_accounts.insert(handle.token_account.clone());
}
for token_account in token_accounts {
let opened_at = *self.opened_at.entry(token_account.clone()).or_insert(now);
let remaining = opened_at
.checked_add(deadline)
.and_then(|deadline_at| deadline_at.checked_duration_since(now))
.unwrap_or_default();
if remaining.is_zero() {
self.try_request_exit_signal(&token_account);
continue;
}
self.schedule_deadline_task(token_account, remaining);
}
}
fn schedule_deadline_task(&mut self, token_account: String, remaining: Duration) {
let sender = self.sender();
let open_tokens = Arc::clone(&self.open_tokens);
let token_for_map = token_account.clone();
let token_for_check = token_account.clone();
let task = tokio::spawn(async move {
tokio::time::sleep(remaining).await;
let is_open = open_tokens
.read()
.map(|tokens| tokens.contains(&token_for_check))
.unwrap_or(false);
if is_open {
info!(event = "session_deadline_fired", token_account = %token_account);
let _ = sender.request_exit_signal(token_account, None);
}
});
self.deadline_tasks.insert(token_for_map, task);
}
fn try_request_exit_signal(&self, token_account: &str) {
if !self.has_open_position_for_token(token_account) {
return;
}
let _ = self
.sender()
.request_exit_signal(token_account.to_string(), None);
}
fn has_open_position_for_token(&self, token_account: &str) -> bool {
self.positions
.values()
.any(|handle| handle.token_account == token_account)
}
fn cancel_deadline_for(&mut self, token_account: &str) {
self.cancel_deadline_task_for(token_account);
self.opened_at.remove(token_account);
}
fn cancel_deadline_task_for(&mut self, token_account: &str) {
if let Some(handle) = self.deadline_tasks.remove(token_account) {
handle.abort();
}
}
fn cancel_all_deadline_tasks(&mut self) {
for (_, handle) in self.deadline_tasks.drain() {
handle.abort();
}
}
fn cancel_all_deadlines(&mut self) {
self.cancel_all_deadline_tasks();
self.opened_at.clear();
self.sync_open_tokens();
}
fn sync_open_tokens(&self) {
if let Ok(mut guard) = self.open_tokens.write() {
guard.clear();
for handle in self.positions.values() {
guard.insert(handle.token_account.clone());
}
}
}
}
impl Drop for StreamSession {
fn drop(&mut self) {
self.cancel_all_deadline_tasks();
}
}
fn default_strategy() -> StrategyConfigMsg {
StrategyConfigMsg {
target_profit_pct: 0.0,
stop_loss_pct: 0.0,
trailing_stop_pct: 0.0,
sell_on_graduation: false,
take_profit_levels: Vec::new(),
liquidity_guard: false,
breakeven_trail_pct: 0.0,
}
}