tesser_cli/
control.rs

1use std::net::SocketAddr;
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::Arc;
4
5use anyhow::Result;
6use tokio::sync::Mutex;
7use tokio::task::JoinHandle;
8use tonic::transport::Server;
9use tonic::{Request, Response, Status};
10use tracing::{info, warn};
11
12use tesser_execution::OrderOrchestrator;
13use tesser_portfolio::{LiveState, Portfolio};
14use tesser_rpc::conversions::to_decimal_proto;
15use tesser_rpc::proto::control_service_server::{ControlService, ControlServiceServer};
16use tesser_rpc::proto::{
17    CancelAllRequest, CancelAllResponse, GetOpenOrdersRequest, GetOpenOrdersResponse,
18    GetPortfolioRequest, GetPortfolioResponse, GetStatusRequest, GetStatusResponse, OrderSnapshot,
19    PortfolioSnapshot,
20};
21
22use crate::live::ShutdownSignal;
23
24/// Launch the Control Plane gRPC server alongside the live runtime.
25pub fn spawn_control_plane(
26    addr: SocketAddr,
27    portfolio: Arc<Mutex<Portfolio>>,
28    orchestrator: Arc<OrderOrchestrator>,
29    persisted: Arc<Mutex<LiveState>>,
30    last_data_timestamp: Arc<AtomicI64>,
31    shutdown: ShutdownSignal,
32) -> JoinHandle<()> {
33    let service = ControlGrpcService::new(
34        portfolio,
35        orchestrator,
36        persisted,
37        last_data_timestamp,
38        shutdown.clone(),
39    );
40    info!(%addr, "starting control plane gRPC server");
41    tokio::spawn(async move {
42        if let Err(err) = Server::builder()
43            .add_service(ControlServiceServer::new(service))
44            .serve_with_shutdown(addr, async move { shutdown.wait().await })
45            .await
46        {
47            warn!(error = %err, "control plane server exited with error");
48        }
49    })
50}
51
52struct ControlGrpcService {
53    portfolio: Arc<Mutex<Portfolio>>,
54    orchestrator: Arc<OrderOrchestrator>,
55    persisted: Arc<Mutex<LiveState>>,
56    last_data_timestamp: Arc<AtomicI64>,
57    shutdown: ShutdownSignal,
58}
59
60impl ControlGrpcService {
61    fn new(
62        portfolio: Arc<Mutex<Portfolio>>,
63        orchestrator: Arc<OrderOrchestrator>,
64        persisted: Arc<Mutex<LiveState>>,
65        last_data_timestamp: Arc<AtomicI64>,
66        shutdown: ShutdownSignal,
67    ) -> Self {
68        Self {
69            portfolio,
70            orchestrator,
71            persisted,
72            last_data_timestamp,
73            shutdown,
74        }
75    }
76
77    fn last_data_timestamp(&self) -> Option<prost_types::Timestamp> {
78        let secs = self.last_data_timestamp.load(Ordering::SeqCst);
79        if secs <= 0 {
80            return None;
81        }
82        Some(prost_types::Timestamp {
83            seconds: secs,
84            nanos: 0,
85        })
86    }
87
88    async fn cancel_all_impl(&self) -> Result<(u32, u32)> {
89        let algo_ids: Vec<_> = self
90            .orchestrator
91            .algorithm_statuses()
92            .keys()
93            .copied()
94            .collect();
95        let mut cancelled_algorithms = 0u32;
96        for algo_id in algo_ids {
97            match self.orchestrator.cancel_algo(&algo_id).await {
98                Ok(_) => cancelled_algorithms += 1,
99                Err(err) => warn!(algo = %algo_id, error = %err, "failed to cancel algorithm"),
100            }
101        }
102
103        let open_orders = {
104            let state = self.persisted.lock().await;
105            state.open_orders.clone()
106        };
107        let client = self.orchestrator.execution_engine().client();
108        let mut cancelled_orders = 0u32;
109        for order in open_orders {
110            match client
111                .cancel_order(order.id.clone(), &order.request.symbol)
112                .await
113            {
114                Ok(_) => cancelled_orders += 1,
115                Err(err) => warn!(order_id = %order.id, error = %err, "failed to cancel order"),
116            }
117        }
118        Ok((cancelled_orders, cancelled_algorithms))
119    }
120}
121
122#[tonic::async_trait]
123impl ControlService for ControlGrpcService {
124    async fn get_portfolio(
125        &self,
126        _request: Request<GetPortfolioRequest>,
127    ) -> Result<Response<GetPortfolioResponse>, Status> {
128        let snapshot: PortfolioSnapshot = {
129            let guard = self.portfolio.lock().await;
130            PortfolioSnapshot::from(&*guard)
131        };
132        Ok(Response::new(GetPortfolioResponse {
133            portfolio: Some(snapshot),
134        }))
135    }
136
137    async fn get_open_orders(
138        &self,
139        _request: Request<GetOpenOrdersRequest>,
140    ) -> Result<Response<GetOpenOrdersResponse>, Status> {
141        let orders = {
142            let state = self.persisted.lock().await;
143            state.open_orders.clone()
144        };
145        let proto_orders: Vec<OrderSnapshot> = orders.iter().map(OrderSnapshot::from).collect();
146        Ok(Response::new(GetOpenOrdersResponse {
147            orders: proto_orders,
148        }))
149    }
150
151    async fn get_status(
152        &self,
153        _request: Request<GetStatusRequest>,
154    ) -> Result<Response<GetStatusResponse>, Status> {
155        let (equity, liquidate_only) = {
156            let guard = self.portfolio.lock().await;
157            (guard.equity(), guard.liquidate_only())
158        };
159        let response = GetStatusResponse {
160            shutdown: self.shutdown.triggered(),
161            liquidate_only,
162            active_algorithms: self.orchestrator.active_algorithms_count() as u32,
163            last_data_timestamp: self.last_data_timestamp(),
164            equity: Some(to_decimal_proto(equity)),
165        };
166        Ok(Response::new(response))
167    }
168
169    async fn cancel_all(
170        &self,
171        _request: Request<CancelAllRequest>,
172    ) -> Result<Response<CancelAllResponse>, Status> {
173        match self.cancel_all_impl().await {
174            Ok((orders, algos)) => Ok(Response::new(CancelAllResponse {
175                cancelled_orders: orders,
176                cancelled_algorithms: algos,
177            })),
178            Err(err) => Err(Status::internal(err.to_string())),
179        }
180    }
181}