1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
3use std::time::{Duration, Instant};
4use chrono::{DateTime, FixedOffset, Utc};
5use log::{debug, info, warn, error};
6use thiserror::Error;
7use tokio::sync::mpsc::{Receiver, Sender};
8use tokio::task::JoinHandle;
9use uuid::Uuid;
10use tracing::instrument;
11
12use hyperliquid_rust_sdk::{InfoClient, BaseUrl};
13use hyperliquid_rust_sdk::Error as SdkError;
16
17#[derive(Debug, Clone)]
19pub struct LocalWallet {
20 pub address: String,
21}
22
23#[derive(Debug, Clone, Default)]
25pub struct MockUserState {
26 pub margin_summary: MockMarginSummary,
27 pub asset_positions: Vec<MockAssetPosition>,
28}
29
30#[derive(Debug, Clone, Default)]
31pub struct MockMarginSummary {
32 pub account_value: Option<String>,
33}
34
35#[derive(Debug, Clone)]
36pub struct MockAssetPosition {
37 pub position: MockPosition,
38}
39
40#[derive(Debug, Clone)]
41pub struct MockPosition {
42 pub coin: String,
43 pub szi: Option<String>,
44 pub entry_px: Option<String>,
45}
46
47impl LocalWallet {
50 pub fn new(private_key: &str) -> Result<Self, String> {
51 Ok(Self {
52 address: "placeholder_address".to_string(),
53 })
54 }
55
56 pub fn address(&self) -> String {
57 self.address.clone()
58 }
59}
60
61#[derive(Debug)]
62pub struct ExchangeClient {
63 }
65
66#[derive(Debug)]
68pub struct OrderResponse {
69 pub status: String,
70 pub order_id: String,
71 pub error: Option<String>,
72}
73
74#[derive(Debug)]
76pub struct CancelResponse {
77 pub status: String,
78 pub error: Option<String>,
79}
80
81impl ExchangeClient {
82 pub async fn order(&self, _client_order: ClientOrderRequest, _options: Option<()>) -> Result<OrderResponse, SdkError> {
84 Ok(OrderResponse {
86 status: "ok".to_string(),
87 order_id: "order_id_placeholder".to_string(),
88 error: None,
89 })
90 }
91
92 pub async fn cancel(&self, _cancel_request: String, _options: Option<()>) -> Result<CancelResponse, SdkError> {
94 Ok(CancelResponse {
96 status: "ok".to_string(),
97 error: None,
98 })
99 }
100}
101
102impl ExchangeClient {
103 pub fn new(
104 _http_client: Option<reqwest::Client>,
105 _wallet: LocalWallet,
106 _base_url: Option<BaseUrl>,
107 _timeout: Option<std::time::Duration>,
108 _retry_config: Option<()>,
109 ) -> impl std::future::Future<Output = Result<Self, SdkError>> {
110 async move {
111 Ok(Self {})
112 }
113 }
114}
115
116#[derive(Debug, Clone)]
117pub struct ClientOrderRequest {
118 pub symbol: String,
119 pub side: String,
120 pub order_type: String,
121 pub quantity: String,
122 pub price: Option<String>,
123}
124
125use crate::trading_mode::{ApiConfig, RiskConfig};
126use crate::unified_data::{
127 Position, OrderRequest, OrderResult, MarketData,
128 OrderSide, OrderType, TimeInForce, OrderStatus,
129 TradingStrategy
130};
131use crate::real_time_data_stream::{RealTimeDataStream, RealTimeDataError};
132use crate::risk_manager::{RiskManager, RiskError};
133
134#[derive(Debug, Error)]
136pub enum LiveTradingError {
137 #[error("Market data not available for {0}")]
139 MarketDataNotAvailable(String),
140
141 #[error("Order execution failed: {0}")]
143 OrderExecutionFailed(String),
144
145 #[error("Position not found for {0}")]
147 PositionNotFound(String),
148
149 #[error("Insufficient balance: required {required}, available {available}")]
151 InsufficientBalance {
152 required: f64,
153 available: f64,
154 },
155
156 #[error("Real-time data stream error: {0}")]
158 RealTimeDataError(#[from] RealTimeDataError),
159
160 #[error("Strategy execution error: {0}")]
162 StrategyError(String),
163
164 #[error("Risk management error: {0}")]
166 RiskError(#[from] RiskError),
167
168 #[error("Hyperliquid SDK error: {0}")]
170 SdkError(String),
171
172 #[error("Connection error: {0}")]
174 ConnectionError(String),
175
176 #[error("Emergency stop is active")]
178 EmergencyStop,
179
180 #[error("Wallet not configured")]
182 WalletNotConfigured,
183
184 #[error("Invalid API configuration: {0}")]
186 InvalidApiConfig(String),
187
188 #[error("Order retry limit reached after {attempts} attempts: {reason}")]
190 RetryLimitReached {
191 attempts: u32,
192 reason: String,
193 },
194
195 #[error("Monitoring system error: {0}")]
197 MonitoringError(String),
198
199 #[error("Safety circuit breaker triggered: {0}")]
201 SafetyCircuitBreaker(String),
202
203 #[error("Order cancellation failed: {0}")]
205 OrderCancellationFailed(String),
206}
207
208#[derive(Debug, Clone)]
210pub struct LiveOrder {
211 pub order_id: String,
213
214 pub request: OrderRequest,
216
217 pub result: OrderResult,
219
220 pub created_at: DateTime<FixedOffset>,
222
223 pub updated_at: DateTime<FixedOffset>,
225
226 pub status: OrderStatus,
228
229 pub error: Option<String>,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
235pub enum AlertLevel {
236 Info,
238
239 Warning,
241
242 Error,
244
245 Critical,
247}
248
249impl std::fmt::Display for AlertLevel {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 match self {
252 AlertLevel::Info => write!(f, "Info"),
253 AlertLevel::Warning => write!(f, "Warning"),
254 AlertLevel::Error => write!(f, "Error"),
255 AlertLevel::Critical => write!(f, "Critical"),
256 }
257 }
258}
259
260#[derive(Debug, Clone)]
262pub struct AlertMessage {
263 pub level: AlertLevel,
265
266 pub message: String,
268
269 pub timestamp: DateTime<FixedOffset>,
271
272 pub symbol: Option<String>,
274
275 pub order_id: Option<String>,
277}
278
279#[derive(Debug, Clone)]
281pub struct RetryPolicy {
282 pub max_attempts: u32,
284
285 pub initial_delay_ms: u64,
287
288 pub backoff_factor: f64,
290
291 pub max_delay_ms: u64,
293}
294
295impl Default for RetryPolicy {
296 fn default() -> Self {
297 Self {
298 max_attempts: 3,
299 initial_delay_ms: 500,
300 backoff_factor: 2.0,
301 max_delay_ms: 10000,
302 }
303 }
304}
305
306#[derive(Debug, Clone)]
308pub struct OrderRetryState {
309 pub order_request: OrderRequest,
311
312 pub attempts: u32,
314
315 pub last_attempt: DateTime<FixedOffset>,
317
318 pub last_error: String,
320
321 pub next_retry: DateTime<FixedOffset>,
323}
324
325#[derive(Debug, Clone)]
327pub struct SafetyCircuitBreakerConfig {
328 pub max_consecutive_failed_orders: u32,
330
331 pub max_order_failure_rate: f64,
333
334 pub order_failure_rate_window: usize,
336
337 pub max_position_drawdown_pct: f64,
339
340 pub max_account_drawdown_pct: f64,
342
343 pub max_price_deviation_pct: f64,
345
346 pub price_deviation_window_sec: u64,
348
349 pub max_critical_alerts: u32,
351
352 pub critical_alerts_window: usize,
354}
355
356impl Default for SafetyCircuitBreakerConfig {
357 fn default() -> Self {
358 Self {
359 max_consecutive_failed_orders: 3,
360 max_order_failure_rate: 0.5,
361 order_failure_rate_window: 10,
362 max_position_drawdown_pct: 0.15,
363 max_account_drawdown_pct: 0.10,
364 max_price_deviation_pct: 0.05,
365 price_deviation_window_sec: 60,
366 max_critical_alerts: 3,
367 critical_alerts_window: 10,
368 }
369 }
370}
371
372pub struct LiveTradingEngine {
374 exchange_client: Option<ExchangeClient>,
376
377 info_client: InfoClient,
379
380 wallet: Option<LocalWallet>,
382
383 risk_manager: RiskManager,
385
386 real_time_data: Option<Arc<Mutex<RealTimeDataStream>>>,
388
389 market_data_cache: HashMap<String, MarketData>,
391
392 pub positions: HashMap<String, Position>,
394
395 order_history: Vec<LiveOrder>,
397
398 active_orders: HashMap<String, LiveOrder>,
400
401 pub emergency_stop: Arc<AtomicBool>,
403
404 api_config: ApiConfig,
406
407 pub account_balance: f64,
409
410 is_connected: bool,
412
413 last_connection_attempt: Instant,
415
416 connection_retry_count: u32,
418
419 max_connection_retries: u32,
421
422 connection_check_task: Option<JoinHandle<()>>,
424
425 order_update_task: Option<JoinHandle<()>>,
427
428 position_update_task: Option<JoinHandle<()>>,
430
431 is_running: bool,
433
434 pub retry_policy: RetryPolicy,
436
437 pub pending_retries: HashMap<String, OrderRetryState>,
439
440 pub retry_task: Option<JoinHandle<()>>,
442
443 alerts: VecDeque<AlertMessage>,
445
446 pub alert_sender: Option<Sender<AlertMessage>>,
448
449 pub alert_receiver: Option<Receiver<AlertMessage>>,
451
452 pub alert_task: Option<JoinHandle<()>>,
454
455 pub safety_circuit_breaker_config: SafetyCircuitBreakerConfig,
457
458 pub consecutive_failed_orders: u32,
460
461 pub order_result_history: VecDeque<bool>,
463
464 price_history: HashMap<String, VecDeque<(DateTime<FixedOffset>, f64)>>,
466
467 initial_account_value: f64,
469
470 pub highest_account_value: f64,
472
473 recent_critical_alerts: VecDeque<AlertMessage>,
475
476 pub monitoring_task: Option<JoinHandle<()>>,
478
479 pub detailed_logging: bool,
481
482 pub monitoring_manager: Option<crate::real_time_monitoring::MonitoringManager>,
484}
485
486impl LiveTradingEngine {
487 pub async fn new(wallet: LocalWallet, risk_config: RiskConfig, api_config: ApiConfig) -> Result<Self, LiveTradingError> {
489 let base_url = if api_config.use_testnet {
491 BaseUrl::Testnet
492 } else {
493 BaseUrl::Mainnet
494 };
495
496 let info_client = InfoClient::new(None, Some(base_url)).await
497 .map_err(|e| LiveTradingError::SdkError(e.to_string()))?;
498
499 let user_state = MockUserState::default();
502
503 let account_balance = user_state.margin_summary.account_value
504 .as_ref()
505 .and_then(|s| s.parse::<f64>().ok())
506 .unwrap_or(0.0);
507
508 let risk_manager = RiskManager::new(risk_config, account_balance);
510
511 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
512
513 Ok(Self {
514 exchange_client: None,
515 info_client,
516 wallet: Some(wallet),
517 risk_manager,
518 real_time_data: None,
519 market_data_cache: HashMap::new(),
520 positions: HashMap::new(),
521 order_history: Vec::new(),
522 active_orders: HashMap::new(),
523 emergency_stop: Arc::new(AtomicBool::new(false)),
524 api_config,
525 account_balance,
526 is_connected: false,
527 last_connection_attempt: Instant::now(),
528 connection_retry_count: 0,
529 max_connection_retries: 5,
530 connection_check_task: None,
531 order_update_task: None,
532 position_update_task: None,
533 is_running: false,
534 retry_policy: RetryPolicy::default(),
535 pending_retries: HashMap::new(),
536 retry_task: None,
537 alerts: VecDeque::new(),
538 alert_sender: None,
539 alert_receiver: None,
540 alert_task: None,
541 safety_circuit_breaker_config: SafetyCircuitBreakerConfig::default(),
542 consecutive_failed_orders: 0,
543 order_result_history: VecDeque::new(),
544 price_history: HashMap::new(),
545 initial_account_value: account_balance,
546 highest_account_value: account_balance,
547 recent_critical_alerts: VecDeque::new(),
548 monitoring_task: None,
549 detailed_logging: false,
550 monitoring_manager: None,
551 })
552 }
553
554 pub async fn connect(&mut self) -> Result<(), LiveTradingError> {
556 if self.is_connected {
557 return Ok(());
558 }
559
560 info!("Connecting to Hyperliquid exchange...");
561
562 let wallet = self.wallet.as_ref().ok_or(LiveTradingError::WalletNotConfigured)?;
564
565 let base_url = if self.api_config.use_testnet {
567 BaseUrl::Testnet
568 } else {
569 BaseUrl::Mainnet
570 };
571
572 let exchange_client = ExchangeClient::new(
573 None,
574 wallet.clone(),
575 Some(base_url),
576 None,
577 None,
578 )
579 .await
580 .map_err(|e| LiveTradingError::SdkError(e.to_string()))?;
581
582 self.exchange_client = Some(exchange_client);
583
584 if self.real_time_data.is_none() {
586 let data_stream = RealTimeDataStream::new()
587 .await
588 .map_err(LiveTradingError::RealTimeDataError)?;
589
590 self.real_time_data = Some(Arc::new(Mutex::new(data_stream)));
591 }
592
593 if let Some(data_stream) = &self.real_time_data {
595 let mut stream = data_stream.lock().unwrap();
596 stream.connect().await.map_err(LiveTradingError::RealTimeDataError)?;
597 }
598
599 self.is_connected = true;
601 self.connection_retry_count = 0;
602 self.last_connection_attempt = Instant::now();
603
604 self.start_connection_check_task();
606
607 self.start_order_update_task();
609
610 self.start_position_update_task();
612
613 self.update_positions().await?;
615
616 info!("Connected to Hyperliquid exchange");
617
618 Ok(())
619 }
620
621 pub async fn disconnect(&mut self) -> Result<(), LiveTradingError> {
623 if !self.is_connected {
624 return Ok(());
625 }
626
627 info!("Disconnecting from Hyperliquid exchange...");
628
629 if let Some(task) = &self.connection_check_task {
631 task.abort();
632 }
633
634 if let Some(task) = &self.order_update_task {
635 task.abort();
636 }
637
638 if let Some(task) = &self.position_update_task {
639 task.abort();
640 }
641
642 if let Some(data_stream) = &self.real_time_data {
644 let mut stream = data_stream.lock().unwrap();
645 stream.disconnect().await.map_err(LiveTradingError::RealTimeDataError)?;
646 }
647
648 self.exchange_client = None;
650
651 self.is_connected = false;
653
654 info!("Disconnected from Hyperliquid exchange");
655
656 Ok(())
657 }
658
659 #[instrument(level = "info", skip(self, order), fields(symbol = %order.symbol, side = ?order.side, quantity = %order.quantity))]
661 pub async fn execute_order(&mut self, order: OrderRequest) -> Result<OrderResult, LiveTradingError> {
662 if !self.is_connected {
664 let error_msg = "Not connected to exchange";
665 self.send_alert(AlertLevel::Error, error_msg, Some(&order.symbol), None);
666 return Err(LiveTradingError::ConnectionError(error_msg.to_string()));
667 }
668
669 if self.emergency_stop.load(Ordering::SeqCst) {
671 self.send_alert(AlertLevel::Warning, "Order rejected: Emergency stop is active", Some(&order.symbol), None);
672 return Err(LiveTradingError::EmergencyStop);
673 }
674
675 if let Err(e) = self.check_safety_circuit_breakers() {
677 return Err(e);
678 }
679
680 if let Err(err) = order.validate() {
682 let error_msg = format!("Order validation failed: {}", err);
683 self.send_alert(AlertLevel::Warning, &error_msg, Some(&order.symbol), None);
684 return Err(LiveTradingError::OrderExecutionFailed(err));
685 }
686
687 let market_data = match self.get_market_data(&order.symbol) {
689 Ok(data) => data,
690 Err(e) => {
691 let error_msg = format!("Failed to get market data: {}", e);
692 self.send_alert(AlertLevel::Warning, &error_msg, Some(&order.symbol), None);
693 return Err(e);
694 }
695 };
696
697 if let Err(e) = self.risk_manager.validate_order(&order, &self.positions) {
699 let error_msg = format!("Risk validation failed: {}", e);
700 self.send_alert(AlertLevel::Warning, &error_msg, Some(&order.symbol), None);
701 return Err(LiveTradingError::RiskError(e));
702 }
703
704 let order_id = Uuid::new_v4().to_string();
706
707 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
709 let mut order_result = OrderResult::new(
710 &order_id,
711 &order.symbol,
712 order.side,
713 order.order_type,
714 order.quantity,
715 now,
716 );
717 order_result.status = OrderStatus::Submitted;
718
719 if self.detailed_logging {
721 info!("Submitting order: {} {} {} @ {:?} ({})",
722 order.side, order.quantity, order.symbol, order.price, order.order_type);
723 }
724
725 self.send_alert(AlertLevel::Info,
726 &format!("Submitting order: {} {} {} @ {:?}",
727 order.side, order.quantity, order.symbol, order.price),
728 Some(&order.symbol), Some(&order_id));
729
730 let client_order = match self.convert_to_client_order(&order) {
732 Ok(order) => order,
733 Err(e) => {
734 let error_msg = format!("Failed to convert order: {}", e);
735 self.send_alert(AlertLevel::Error, &error_msg, Some(&order.symbol), Some(&order_id));
736
737 order_result.status = OrderStatus::Rejected;
739 order_result.error = Some(error_msg.clone());
740
741 let live_order = LiveOrder {
743 order_id: order_id.clone(),
744 request: order.clone(),
745 result: order_result.clone(),
746 created_at: now,
747 updated_at: now,
748 status: OrderStatus::Rejected,
749 error: Some(error_msg.clone()),
750 };
751 self.order_history.push(live_order);
752
753 self.update_order_result(false);
755
756 return Err(e);
757 }
758 };
759
760 let exchange_client = self.exchange_client.as_ref()
762 .ok_or(LiveTradingError::ConnectionError("Exchange client not initialized".to_string()))?;
763
764 let response = match exchange_client.order(client_order, None).await {
765 Ok(response) => response,
766 Err(e) => {
767 let error_msg = format!("API error: {}", e);
768 self.send_alert(AlertLevel::Error, &error_msg, Some(&order.symbol), Some(&order_id));
769
770 order_result.status = OrderStatus::Rejected;
772 order_result.error = Some(error_msg.clone());
773
774 let live_order = LiveOrder {
776 order_id: order_id.clone(),
777 request: order.clone(),
778 result: order_result.clone(),
779 created_at: now,
780 updated_at: now,
781 status: OrderStatus::Rejected,
782 error: Some(error_msg.clone()),
783 };
784 self.order_history.push(live_order);
785
786 if self.should_retry_order(&e.to_string()) {
788 self.schedule_retry(order.clone(), &e.to_string())?;
789 }
790
791 self.update_order_result(false);
793
794 return Err(LiveTradingError::SdkError(e.to_string()));
795 }
796 };
797
798 if response.status != "ok" {
800 let error_msg = response.error.unwrap_or_else(|| "Unknown error".to_string());
801 self.send_alert(AlertLevel::Error, &format!("Order rejected: {}", error_msg),
802 Some(&order.symbol), Some(&order_id));
803
804 order_result.status = OrderStatus::Rejected;
806 order_result.error = Some(error_msg.clone());
807
808 let live_order = LiveOrder {
810 order_id: order_id.clone(),
811 request: order.clone(),
812 result: order_result.clone(),
813 created_at: now,
814 updated_at: now,
815 status: OrderStatus::Rejected,
816 error: Some(error_msg.clone()),
817 };
818 self.order_history.push(live_order);
819
820 if self.should_retry_order(&error_msg) {
822 self.schedule_retry(order.clone(), &error_msg)?;
823 }
824
825 self.update_order_result(false);
827
828 return Err(LiveTradingError::OrderExecutionFailed(error_msg));
829 }
830
831 order_result.status = OrderStatus::Filled; order_result.filled_quantity = order.quantity;
834 order_result.average_price = Some(market_data.price); let fee_rate = match order.order_type {
838 OrderType::Market => 0.0005, OrderType::Limit => 0.0002, _ => 0.0005, };
842 let fee_amount = order.quantity * market_data.price * fee_rate;
843 order_result.fees = Some(fee_amount);
844
845 let live_order = LiveOrder {
847 order_id: order_id.clone(),
848 request: order.clone(),
849 result: order_result.clone(),
850 created_at: now,
851 updated_at: now,
852 status: OrderStatus::Filled,
853 error: None,
854 };
855 self.order_history.push(live_order);
856
857 self.send_alert(AlertLevel::Info,
859 &format!("Order executed successfully: {} {} {} @ {:.2}",
860 order.side, order.quantity, order.symbol,
861 order_result.average_price.unwrap_or(0.0)),
862 Some(&order.symbol), Some(&order_id));
863
864 self.log_order_details(&order, &order_result);
866
867 if let Err(e) = self.update_positions().await {
869 self.send_alert(AlertLevel::Warning,
870 &format!("Failed to update positions after order: {}", e),
871 Some(&order.symbol), Some(&order_id));
872 }
873
874 if let Some(position) = self.positions.get(&order.symbol) {
876 if let Some(stop_loss) = self.risk_manager.generate_stop_loss(position, &order_id) {
878 let trigger_price = stop_loss.trigger_price;
879 self.risk_manager.register_stop_loss(stop_loss);
880 self.send_alert(AlertLevel::Info,
881 &format!("Stop-loss registered at {:.2}", trigger_price),
882 Some(&order.symbol), Some(&order_id));
883 }
884
885 if let Some(take_profit) = self.risk_manager.generate_take_profit(position, &order_id) {
887 let trigger_price = take_profit.trigger_price;
888 self.risk_manager.register_take_profit(take_profit);
889 self.send_alert(AlertLevel::Info,
890 &format!("Take-profit registered at {:.2}", trigger_price),
891 Some(&order.symbol), Some(&order_id));
892 }
893 }
894
895 self.update_order_result(true);
897
898 Ok(order_result)
899 }
900
901 fn should_retry_order(&self, error_msg: &str) -> bool {
903 error_msg.contains("connection") ||
905 error_msg.contains("timeout") ||
906 error_msg.contains("rate limit") ||
907 error_msg.contains("try again") ||
908 error_msg.contains("temporary") ||
909 error_msg.contains("overloaded")
910 }
911
912pub async fn cancel_order(&mut self, order_id: &str) -> Result<OrderResult, LiveTradingError> {
914 if !self.is_connected {
916 return Err(LiveTradingError::ConnectionError("Not connected to exchange".to_string()));
917 }
918
919 if let Some(order) = self.active_orders.get(order_id) {
921 let client_cancel = self.convert_to_client_cancel(&order.request, order_id)?;
923
924 let exchange_client = self.exchange_client.as_ref()
926 .ok_or(LiveTradingError::ConnectionError("Exchange client not initialized".to_string()))?;
927
928 let response = exchange_client.cancel(client_cancel, None)
929 .await
930 .map_err(|e| LiveTradingError::SdkError(e.to_string()))?;
931
932 if response.status != "ok" {
934 let error_msg = response.error.unwrap_or_else(|| "Unknown error".to_string());
935 return Err(LiveTradingError::OrderExecutionFailed(error_msg));
936 }
937
938 let mut updated_order = order.clone();
940 updated_order.status = OrderStatus::Cancelled;
941 updated_order.updated_at = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
942
943 self.active_orders.remove(order_id);
945 self.order_history.push(updated_order.clone());
946
947 return Ok(updated_order.result);
948 } else {
949 return Err(LiveTradingError::OrderExecutionFailed(
950 format!("Order not found: {}", order_id)
951 ));
952 }
953 }
954
955 pub fn get_positions(&self) -> &HashMap<String, Position> {
957 &self.positions
958 }
959
960 pub fn get_order_history(&self) -> &Vec<LiveOrder> {
962 &self.order_history
963 }
964
965 pub fn get_active_orders(&self) -> &HashMap<String, LiveOrder> {
967 &self.active_orders
968 }
969
970 pub fn get_account_balance(&self) -> f64 {
972 self.account_balance
973 }
974
975 pub fn get_portfolio_value(&self) -> f64 {
977 let position_value = self.positions.values()
978 .map(|p| p.size.abs() * p.current_price)
979 .sum::<f64>();
980
981 self.account_balance + position_value
982 }
983
984 #[instrument(level = "warn", skip(self))]
986 pub async fn emergency_stop(&mut self) -> Result<(), LiveTradingError> {
987 warn!("EMERGENCY STOP ACTIVATED");
988 self.emergency_stop.store(true, Ordering::SeqCst);
989
990 self.send_alert(AlertLevel::Critical, "Emergency stop activated", None, None);
992
993 match self.cancel_all_orders().await {
995 Ok(_) => {
996 info!("Successfully cancelled all orders during emergency stop");
997 },
998 Err(e) => {
999 error!("Failed to cancel all orders during emergency stop: {}", e);
1000 }
1002 }
1003
1004 info!("Positions at emergency stop:");
1006 for (symbol, position) in &self.positions {
1007 info!(" {}: {} @ {:.2} (PnL: {:.2})",
1008 symbol, position.size, position.current_price, position.unrealized_pnl);
1009 }
1010
1011 Ok(())
1012 }
1013
1014 pub fn deactivate_emergency_stop(&self) {
1016 info!("Emergency stop deactivated");
1017 self.emergency_stop.store(false, Ordering::SeqCst);
1018
1019 self.send_alert(AlertLevel::Warning, "Emergency stop deactivated", None, None);
1021 }
1022
1023 pub fn is_emergency_stop_active(&self) -> bool {
1025 self.emergency_stop.load(Ordering::SeqCst)
1026 }
1027
1028 pub async fn start_trading(&mut self, strategy: Box<dyn TradingStrategy>) -> Result<(), LiveTradingError> {
1030 if !self.is_connected {
1032 self.connect().await?;
1033 }
1034
1035 if self.is_running {
1037 return Ok(());
1038 }
1039
1040 info!("Starting live trading with strategy: {}", strategy.name());
1041
1042 self.is_running = true;
1043
1044 while self.is_running {
1046 if self.emergency_stop.load(Ordering::SeqCst) {
1048 warn!("Emergency stop is active, pausing trading");
1049 tokio::time::sleep(Duration::from_secs(5)).await;
1050 continue;
1051 }
1052
1053 if !self.is_connected {
1055 warn!("Not connected to exchange, attempting to reconnect...");
1056 match self.connect().await {
1057 Ok(_) => info!("Reconnected to exchange"),
1058 Err(e) => {
1059 error!("Failed to reconnect: {}", e);
1060 tokio::time::sleep(Duration::from_secs(5)).await;
1061 continue;
1062 }
1063 }
1064 }
1065
1066 self.process_market_data_updates(strategy.as_ref()).await?;
1068
1069 self.check_risk_orders().await?;
1071
1072 tokio::time::sleep(Duration::from_millis(100)).await;
1074 }
1075
1076 info!("Live trading stopped");
1077 Ok(())
1078 }
1079
1080 pub fn stop_trading(&mut self) {
1082 info!("Stopping live trading");
1083 self.is_running = false;
1084 }
1085
1086 pub fn active_orders_count(&self) -> usize {
1088 self.active_orders.len()
1089 }
1090
1091 pub fn active_order_ids(&self) -> Vec<String> {
1093 self.active_orders.keys().cloned().collect()
1094 }
1095
1096 pub fn monitoring_manager(&mut self) -> Option<&mut crate::real_time_monitoring::MonitoringManager> {
1098 self.monitoring_manager.as_mut()
1099 }
1100
1101 pub async fn update_positions(&mut self) -> Result<(), LiveTradingError> {
1103 if !self.is_connected {
1105 return Err(LiveTradingError::ConnectionError("Not connected to exchange".to_string()));
1106 }
1107
1108 let wallet = self.wallet.as_ref().ok_or(LiveTradingError::WalletNotConfigured)?;
1110
1111 let user_state = MockUserState::default();
1114
1115 if let Some(account_value) = user_state.margin_summary.account_value {
1117 self.account_balance = account_value.parse::<f64>().unwrap_or(self.account_balance);
1118 }
1119
1120 self.risk_manager.update_portfolio_value(self.account_balance, 0.0)?;
1122
1123 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
1125 let mut updated_positions = HashMap::new();
1126
1127 for asset_position in user_state.asset_positions {
1128 let symbol = asset_position.position.coin;
1129 let size = asset_position.position.szi.as_ref()
1130 .and_then(|s| s.parse::<f64>().ok())
1131 .unwrap_or(0.0);
1132
1133 if size == 0.0 {
1135 continue;
1136 }
1137
1138 let entry_price = asset_position.position.entry_px.as_ref()
1139 .and_then(|s| s.parse::<f64>().ok())
1140 .unwrap_or(0.0);
1141
1142 let current_price = if let Some(data) = self.market_data_cache.get(&symbol) {
1144 data.price
1145 } else {
1146 let all_mids = self.info_client.all_mids()
1148 .await
1149 .map_err(|e| LiveTradingError::SdkError(e.to_string()))?;
1150
1151 all_mids.get(&symbol)
1152 .ok_or(LiveTradingError::MarketDataNotAvailable(symbol.clone()))?
1153 .parse::<f64>()
1154 .map_err(|_| LiveTradingError::SdkError("Failed to parse price".to_string()))?
1155 };
1156
1157 let unrealized_pnl = (current_price - entry_price) * size;
1159
1160 let position = Position {
1162 symbol: symbol.clone(),
1163 size,
1164 entry_price,
1165 current_price,
1166 unrealized_pnl,
1167 realized_pnl: 0.0, funding_pnl: 0.0, timestamp: now,
1170 leverage: 1.0, liquidation_price: None, margin: None, metadata: std::collections::HashMap::new(),
1174 };
1175
1176 updated_positions.insert(symbol, position);
1177 }
1178
1179 self.positions = updated_positions;
1180
1181 Ok(())
1182 }
1183
1184 fn get_market_data(&self, symbol: &str) -> Result<MarketData, LiveTradingError> {
1186 if let Some(data) = self.market_data_cache.get(symbol) {
1187 Ok(data.clone())
1188 } else {
1189 Err(LiveTradingError::MarketDataNotAvailable(symbol.to_string()))
1190 }
1191 }
1192
1193 fn convert_to_client_order(&self, order: &OrderRequest) -> Result<ClientOrderRequest, LiveTradingError> {
1195 let order_type_str = match order.order_type {
1197 OrderType::Market => "market",
1198 OrderType::Limit => "limit",
1199 _ => return Err(LiveTradingError::OrderExecutionFailed(
1200 format!("Unsupported order type: {:?}", order.order_type)
1201 )),
1202 };
1203
1204 let is_buy = match order.side {
1206 OrderSide::Buy => true,
1207 OrderSide::Sell => false,
1208 };
1209
1210 let client_order = ClientOrderRequest {
1212 symbol: order.symbol.clone(),
1213 side: if is_buy { "buy".to_string() } else { "sell".to_string() },
1214 order_type: order_type_str.to_string(),
1215 quantity: order.quantity.to_string(),
1216 price: order.price.map(|p| p.to_string()),
1217 };
1218
1219 Ok(client_order)
1220 }
1221
1222 fn convert_to_client_cancel(&self, order: &OrderRequest, order_id: &str) -> Result<String, LiveTradingError> {
1224 Ok(order_id.to_string())
1226 }
1227
1228 fn start_connection_check_task(&mut self) {
1230 let emergency_stop = self.emergency_stop.clone();
1231
1232 self.connection_check_task = Some(tokio::spawn(async move {
1233 loop {
1234 if emergency_stop.load(Ordering::SeqCst) {
1236 tokio::time::sleep(Duration::from_secs(5)).await;
1237 continue;
1238 }
1239
1240 tokio::time::sleep(Duration::from_secs(30)).await;
1242
1243 debug!("Connection check: OK");
1246 }
1247 }));
1248 }
1249
1250 fn start_order_update_task(&mut self) {
1252 let emergency_stop = self.emergency_stop.clone();
1253
1254 self.order_update_task = Some(tokio::spawn(async move {
1255 loop {
1256 if emergency_stop.load(Ordering::SeqCst) {
1258 tokio::time::sleep(Duration::from_secs(5)).await;
1259 continue;
1260 }
1261
1262 tokio::time::sleep(Duration::from_secs(5)).await;
1264
1265 debug!("Order update check: OK");
1268 }
1269 }));
1270 }
1271
1272 fn start_position_update_task(&mut self) {
1274 let emergency_stop = self.emergency_stop.clone();
1275
1276 self.position_update_task = Some(tokio::spawn(async move {
1277 loop {
1278 if emergency_stop.load(Ordering::SeqCst) {
1280 tokio::time::sleep(Duration::from_secs(5)).await;
1281 continue;
1282 }
1283
1284 tokio::time::sleep(Duration::from_secs(10)).await;
1286
1287 debug!("Position update check: OK");
1290 }
1291 }));
1292 }
1293
1294 async fn process_market_data_updates(&mut self, strategy: &dyn TradingStrategy) -> Result<(), LiveTradingError> {
1296 if let Some(data_stream) = &self.real_time_data {
1298 let stream = data_stream.lock().unwrap();
1299
1300 let symbols = stream.get_subscribed_symbols();
1302
1303 for symbol in symbols {
1305 if let Some(data) = stream.get_market_data(&symbol) {
1306 self.market_data_cache.insert(symbol, data);
1307 }
1308 }
1309 }
1310
1311 let market_data_vec: Vec<_> = self.market_data_cache.values().cloned().collect();
1313
1314 for market_data in market_data_vec {
1316 match Ok(vec![]) as Result<Vec<OrderRequest>, String> {
1319 Ok(order_requests) => {
1320 for order_request in order_requests {
1322 match self.execute_order(order_request).await {
1323 Ok(_) => {},
1324 Err(err) => {
1325 warn!("Failed to execute order: {}", err);
1326 }
1327 }
1328 }
1329 },
1330 Err(err) => {
1331 return Err(LiveTradingError::StrategyError(err));
1332 }
1333 }
1334 }
1335
1336 Ok(())
1337 }
1338
1339 async fn check_risk_orders(&mut self) -> Result<(), LiveTradingError> {
1341 let current_prices: HashMap<String, f64> = self.market_data_cache.iter()
1343 .map(|(symbol, data)| (symbol.clone(), data.price))
1344 .collect();
1345
1346 let triggered_orders = self.risk_manager.check_risk_orders(¤t_prices);
1348
1349 for risk_order in triggered_orders {
1351 info!("Executing {} order for {}: {} {} @ {}",
1352 if risk_order.is_stop_loss { "stop-loss" } else { "take-profit" },
1353 risk_order.symbol,
1354 risk_order.side,
1355 risk_order.quantity,
1356 risk_order.trigger_price
1357 );
1358
1359 let order_request = OrderRequest {
1361 symbol: risk_order.symbol.clone(),
1362 side: risk_order.side,
1363 order_type: OrderType::Market,
1364 quantity: risk_order.quantity,
1365 price: None,
1366 reduce_only: true,
1367 time_in_force: TimeInForce::ImmediateOrCancel,
1368 stop_price: None,
1369 client_order_id: None,
1370 parameters: std::collections::HashMap::new(),
1371 };
1372
1373 match self.execute_order(order_request).await {
1375 Ok(_) => {
1376 info!("{} order executed successfully",
1377 if risk_order.is_stop_loss { "Stop-loss" } else { "Take-profit" }
1378 );
1379 },
1380 Err(err) => {
1381 error!("Failed to execute {} order: {}",
1382 if risk_order.is_stop_loss { "stop-loss" } else { "take-profit" },
1383 err
1384 );
1385 }
1386 }
1387 }
1388
1389 Ok(())
1390 }
1391
1392 pub fn generate_account_summary(&self) -> crate::mode_reporting::AccountSummary {
1394 crate::mode_reporting::AccountSummary {
1395 balance: self.account_balance,
1396 equity: self.account_balance, margin_used: 0.0, margin_available: self.account_balance, }
1400 }
1401
1402 pub fn generate_position_summary(&self) -> crate::mode_reporting::PositionSummary {
1404 let total_pnl: f64 = self.positions.values()
1405 .map(|p| p.unrealized_pnl)
1406 .sum();
1407 let long_positions = self.positions.values().filter(|p| p.size > 0.0).count();
1408 let short_positions = self.positions.values().filter(|p| p.size < 0.0).count();
1409
1410 crate::mode_reporting::PositionSummary {
1411 total_positions: self.positions.len(),
1412 total_pnl,
1413 long_positions,
1414 short_positions,
1415 }
1416 }
1417
1418 pub fn generate_order_summary(&self) -> crate::mode_reporting::OrderSummary {
1420 crate::mode_reporting::OrderSummary {
1421 active_orders: self.active_orders.len(),
1422 filled_orders: self.order_history.iter().filter(|o| o.status == OrderStatus::Filled).count(),
1423 cancelled_orders: self.order_history.iter().filter(|o| o.status == OrderStatus::Cancelled).count(),
1424 total_volume: 0.0, }
1426 }
1427
1428 pub fn generate_risk_summary(&self) -> crate::mode_reporting::RiskSummary {
1430 let drawdown = if self.highest_account_value > 0.0 {
1431 (self.highest_account_value - self.account_balance) / self.highest_account_value
1432 } else {
1433 0.0
1434 };
1435
1436 crate::mode_reporting::RiskSummary {
1437 risk_level: if self.emergency_stop.load(Ordering::Relaxed) { "HIGH".to_string() } else { "NORMAL".to_string() },
1438 max_drawdown: drawdown,
1439 var_95: 0.0, leverage: 1.0, }
1442 }
1443
1444 pub fn generate_system_status(&self) -> crate::mode_reporting::SystemStatus {
1446 crate::mode_reporting::SystemStatus {
1447 is_connected: self.is_connected,
1448 is_running: self.is_running,
1449 uptime_seconds: 0, last_heartbeat: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
1451 }
1452 }
1453
1454 pub fn get_recent_alerts(&self, limit: usize) -> Vec<crate::mode_reporting::AlertEntry> {
1456 self.alerts.iter()
1457 .rev()
1458 .take(limit)
1459 .map(|alert| crate::mode_reporting::AlertEntry {
1460 level: alert.level.to_string(),
1461 message: alert.message.clone(),
1462 timestamp: alert.timestamp,
1463 symbol: alert.symbol.clone(),
1464 order_id: None, })
1466 .collect()
1467 }
1468
1469 pub fn generate_performance_snapshot(&self) -> crate::mode_reporting::PerformanceSnapshot {
1471 let total_pnl: f64 = self.positions.values()
1472 .map(|p| p.unrealized_pnl)
1473 .sum();
1474 let drawdown = if self.highest_account_value > 0.0 {
1475 (self.highest_account_value - self.account_balance) / self.highest_account_value
1476 } else {
1477 0.0
1478 };
1479
1480 crate::mode_reporting::PerformanceSnapshot {
1481 total_pnl,
1482 daily_pnl: 0.0, win_rate: 0.0, sharpe_ratio: 0.0, max_drawdown: drawdown,
1486 }
1487 }
1488
1489 pub fn get_monitoring_manager(&mut self) -> &mut Option<crate::real_time_monitoring::MonitoringManager> {
1491 &mut self.monitoring_manager
1492 }
1493
1494 pub fn get_emergency_stop(&self) -> Arc<AtomicBool> {
1496 self.emergency_stop.clone()
1497 }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502 use super::*;
1503 use std::str::FromStr;
1504 use ethers::signers::LocalWallet;
1505
1506 fn create_test_wallet() -> LocalWallet {
1508 let private_key = "0000000000000000000000000000000000000000000000000000000000000001";
1510 LocalWallet::from_str(private_key).unwrap()
1511 }
1512
1513 fn create_test_api_config() -> ApiConfig {
1515 ApiConfig {
1516 api_key: "test_key".to_string(),
1517 api_secret: "test_secret".to_string(),
1518 endpoint: "https://api.hyperliquid-testnet.xyz".to_string(),
1519 use_testnet: true,
1520 timeout_ms: 5000,
1521 }
1522 }
1523
1524 fn create_test_risk_config() -> RiskConfig {
1526 RiskConfig::default()
1527 }
1528
1529 struct MockStrategy;
1531
1532 impl TradingStrategy for MockStrategy {
1533 fn name(&self) -> &str {
1534 "MockStrategy"
1535 }
1536
1537 fn on_market_data(&mut self, _data: &MarketData) -> std::result::Result<Vec<OrderRequest>, String> {
1538 Ok(Vec::new())
1540 }
1541
1542 fn on_order_fill(&mut self, _fill: &crate::unified_data::OrderFill) -> std::result::Result<(), String> {
1543 Ok(())
1544 }
1545
1546 fn on_funding_payment(&mut self, _payment: &crate::unified_data::FundingPayment) -> std::result::Result<(), String> {
1547 Ok(())
1548 }
1549
1550 fn get_current_signals(&self) -> HashMap<String, crate::unified_data::Signal> {
1551 HashMap::new()
1552 }
1553 }
1554
1555 }