1use std::net::SocketAddr;
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::Arc;
4
5use anyhow::Result;
6use tokio::sync::Mutex;
7use tokio::task::JoinHandle;
8use tonic::transport::Server;
9use tonic::{Request, Response, Status};
10use tracing::{info, warn};
11
12use tesser_execution::OrderOrchestrator;
13use tesser_portfolio::{LiveState, Portfolio};
14use tesser_rpc::conversions::to_decimal_proto;
15use tesser_rpc::proto::control_service_server::{ControlService, ControlServiceServer};
16use tesser_rpc::proto::{
17 CancelAllRequest, CancelAllResponse, GetOpenOrdersRequest, GetOpenOrdersResponse,
18 GetPortfolioRequest, GetPortfolioResponse, GetStatusRequest, GetStatusResponse, OrderSnapshot,
19 PortfolioSnapshot,
20};
21
22use crate::live::ShutdownSignal;
23
24pub fn spawn_control_plane(
26 addr: SocketAddr,
27 portfolio: Arc<Mutex<Portfolio>>,
28 orchestrator: Arc<OrderOrchestrator>,
29 persisted: Arc<Mutex<LiveState>>,
30 last_data_timestamp: Arc<AtomicI64>,
31 shutdown: ShutdownSignal,
32) -> JoinHandle<()> {
33 let service = ControlGrpcService::new(
34 portfolio,
35 orchestrator,
36 persisted,
37 last_data_timestamp,
38 shutdown.clone(),
39 );
40 info!(%addr, "starting control plane gRPC server");
41 tokio::spawn(async move {
42 if let Err(err) = Server::builder()
43 .add_service(ControlServiceServer::new(service))
44 .serve_with_shutdown(addr, async move { shutdown.wait().await })
45 .await
46 {
47 warn!(error = %err, "control plane server exited with error");
48 }
49 })
50}
51
52struct ControlGrpcService {
53 portfolio: Arc<Mutex<Portfolio>>,
54 orchestrator: Arc<OrderOrchestrator>,
55 persisted: Arc<Mutex<LiveState>>,
56 last_data_timestamp: Arc<AtomicI64>,
57 shutdown: ShutdownSignal,
58}
59
60impl ControlGrpcService {
61 fn new(
62 portfolio: Arc<Mutex<Portfolio>>,
63 orchestrator: Arc<OrderOrchestrator>,
64 persisted: Arc<Mutex<LiveState>>,
65 last_data_timestamp: Arc<AtomicI64>,
66 shutdown: ShutdownSignal,
67 ) -> Self {
68 Self {
69 portfolio,
70 orchestrator,
71 persisted,
72 last_data_timestamp,
73 shutdown,
74 }
75 }
76
77 fn last_data_timestamp(&self) -> Option<prost_types::Timestamp> {
78 let secs = self.last_data_timestamp.load(Ordering::SeqCst);
79 if secs <= 0 {
80 return None;
81 }
82 Some(prost_types::Timestamp {
83 seconds: secs,
84 nanos: 0,
85 })
86 }
87
88 async fn cancel_all_impl(&self) -> Result<(u32, u32)> {
89 let algo_ids: Vec<_> = self
90 .orchestrator
91 .algorithm_statuses()
92 .keys()
93 .copied()
94 .collect();
95 let mut cancelled_algorithms = 0u32;
96 for algo_id in algo_ids {
97 match self.orchestrator.cancel_algo(&algo_id).await {
98 Ok(_) => cancelled_algorithms += 1,
99 Err(err) => warn!(algo = %algo_id, error = %err, "failed to cancel algorithm"),
100 }
101 }
102
103 let open_orders = {
104 let state = self.persisted.lock().await;
105 state.open_orders.clone()
106 };
107 let client = self.orchestrator.execution_engine().client();
108 let mut cancelled_orders = 0u32;
109 for order in open_orders {
110 match client
111 .cancel_order(order.id.clone(), &order.request.symbol)
112 .await
113 {
114 Ok(_) => cancelled_orders += 1,
115 Err(err) => warn!(order_id = %order.id, error = %err, "failed to cancel order"),
116 }
117 }
118 Ok((cancelled_orders, cancelled_algorithms))
119 }
120}
121
122#[tonic::async_trait]
123impl ControlService for ControlGrpcService {
124 async fn get_portfolio(
125 &self,
126 _request: Request<GetPortfolioRequest>,
127 ) -> Result<Response<GetPortfolioResponse>, Status> {
128 let snapshot: PortfolioSnapshot = {
129 let guard = self.portfolio.lock().await;
130 PortfolioSnapshot::from(&*guard)
131 };
132 Ok(Response::new(GetPortfolioResponse {
133 portfolio: Some(snapshot),
134 }))
135 }
136
137 async fn get_open_orders(
138 &self,
139 _request: Request<GetOpenOrdersRequest>,
140 ) -> Result<Response<GetOpenOrdersResponse>, Status> {
141 let orders = {
142 let state = self.persisted.lock().await;
143 state.open_orders.clone()
144 };
145 let proto_orders: Vec<OrderSnapshot> = orders.iter().map(OrderSnapshot::from).collect();
146 Ok(Response::new(GetOpenOrdersResponse {
147 orders: proto_orders,
148 }))
149 }
150
151 async fn get_status(
152 &self,
153 _request: Request<GetStatusRequest>,
154 ) -> Result<Response<GetStatusResponse>, Status> {
155 let (equity, liquidate_only) = {
156 let guard = self.portfolio.lock().await;
157 (guard.equity(), guard.liquidate_only())
158 };
159 let response = GetStatusResponse {
160 shutdown: self.shutdown.triggered(),
161 liquidate_only,
162 active_algorithms: self.orchestrator.active_algorithms_count() as u32,
163 last_data_timestamp: self.last_data_timestamp(),
164 equity: Some(to_decimal_proto(equity)),
165 };
166 Ok(Response::new(response))
167 }
168
169 async fn cancel_all(
170 &self,
171 _request: Request<CancelAllRequest>,
172 ) -> Result<Response<CancelAllResponse>, Status> {
173 match self.cancel_all_impl().await {
174 Ok((orders, algos)) => Ok(Response::new(CancelAllResponse {
175 cancelled_orders: orders,
176 cancelled_algorithms: algos,
177 })),
178 Err(err) => Err(Status::internal(err.to_string())),
179 }
180 }
181}