1use std::net::SocketAddr;
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::Arc;
4
5use anyhow::Result;
6use tokio::sync::{mpsc, Mutex};
7use tokio::task::JoinHandle;
8use tokio_stream::wrappers::ReceiverStream;
9use tonic::transport::Server;
10use tonic::{Request, Response, Status};
11use tracing::{debug, info, warn};
12
13use tesser_events::{Event as RuntimeEvent, EventBus};
14use tesser_execution::OrderOrchestrator;
15use tesser_portfolio::{LiveState, Portfolio};
16use tesser_rpc::conversions::to_decimal_proto;
17use tesser_rpc::proto::control_service_server::{ControlService, ControlServiceServer};
18use tesser_rpc::proto::{
19 self, CancelAllRequest, CancelAllResponse, Event, GetOpenOrdersRequest, GetOpenOrdersResponse,
20 GetPortfolioRequest, GetPortfolioResponse, GetStatusRequest, GetStatusResponse, MonitorRequest,
21 OrderSnapshot, PortfolioSnapshot,
22};
23
24use crate::live::ShutdownSignal;
25
26pub fn spawn_control_plane(
28 addr: SocketAddr,
29 portfolio: Arc<Mutex<Portfolio>>,
30 orchestrator: Arc<OrderOrchestrator>,
31 persisted: Arc<Mutex<LiveState>>,
32 last_data_timestamp: Arc<AtomicI64>,
33 event_bus: Arc<EventBus>,
34 shutdown: ShutdownSignal,
35) -> JoinHandle<()> {
36 let service = ControlGrpcService::new(
37 portfolio,
38 orchestrator,
39 persisted,
40 last_data_timestamp,
41 event_bus,
42 shutdown.clone(),
43 );
44 info!(%addr, "starting control plane gRPC server");
45 tokio::spawn(async move {
46 if let Err(err) = Server::builder()
47 .add_service(ControlServiceServer::new(service))
48 .serve_with_shutdown(addr, async move { shutdown.wait().await })
49 .await
50 {
51 warn!(error = %err, "control plane server exited with error");
52 }
53 })
54}
55
56struct ControlGrpcService {
57 portfolio: Arc<Mutex<Portfolio>>,
58 orchestrator: Arc<OrderOrchestrator>,
59 persisted: Arc<Mutex<LiveState>>,
60 last_data_timestamp: Arc<AtomicI64>,
61 event_bus: Arc<EventBus>,
62 shutdown: ShutdownSignal,
63}
64
65impl ControlGrpcService {
66 fn new(
67 portfolio: Arc<Mutex<Portfolio>>,
68 orchestrator: Arc<OrderOrchestrator>,
69 persisted: Arc<Mutex<LiveState>>,
70 last_data_timestamp: Arc<AtomicI64>,
71 event_bus: Arc<EventBus>,
72 shutdown: ShutdownSignal,
73 ) -> Self {
74 Self {
75 portfolio,
76 orchestrator,
77 persisted,
78 last_data_timestamp,
79 event_bus,
80 shutdown,
81 }
82 }
83
84 fn last_data_timestamp(&self) -> Option<prost_types::Timestamp> {
85 let secs = self.last_data_timestamp.load(Ordering::SeqCst);
86 if secs <= 0 {
87 return None;
88 }
89 Some(prost_types::Timestamp {
90 seconds: secs,
91 nanos: 0,
92 })
93 }
94
95 async fn cancel_all_impl(&self) -> Result<(u32, u32)> {
96 let algo_ids: Vec<_> = self
97 .orchestrator
98 .algorithm_statuses()
99 .keys()
100 .copied()
101 .collect();
102 let mut cancelled_algorithms = 0u32;
103 for algo_id in algo_ids {
104 match self.orchestrator.cancel_algo(&algo_id).await {
105 Ok(_) => cancelled_algorithms += 1,
106 Err(err) => warn!(algo = %algo_id, error = %err, "failed to cancel algorithm"),
107 }
108 }
109
110 let open_orders = {
111 let state = self.persisted.lock().await;
112 state.open_orders.clone()
113 };
114 let client = self.orchestrator.execution_engine().client();
115 let mut cancelled_orders = 0u32;
116 for order in open_orders {
117 match client
118 .cancel_order(order.id.clone(), &order.request.symbol)
119 .await
120 {
121 Ok(_) => cancelled_orders += 1,
122 Err(err) => warn!(order_id = %order.id, error = %err, "failed to cancel order"),
123 }
124 }
125 Ok((cancelled_orders, cancelled_algorithms))
126 }
127}
128
129#[tonic::async_trait]
130impl ControlService for ControlGrpcService {
131 type MonitorStream = ReceiverStream<Result<Event, Status>>;
132
133 async fn get_portfolio(
134 &self,
135 _request: Request<GetPortfolioRequest>,
136 ) -> Result<Response<GetPortfolioResponse>, Status> {
137 let snapshot: PortfolioSnapshot = {
138 let guard = self.portfolio.lock().await;
139 PortfolioSnapshot::from(&*guard)
140 };
141 Ok(Response::new(GetPortfolioResponse {
142 portfolio: Some(snapshot),
143 }))
144 }
145
146 async fn get_open_orders(
147 &self,
148 _request: Request<GetOpenOrdersRequest>,
149 ) -> Result<Response<GetOpenOrdersResponse>, Status> {
150 let orders = {
151 let state = self.persisted.lock().await;
152 state.open_orders.clone()
153 };
154 let proto_orders: Vec<OrderSnapshot> = orders.iter().map(OrderSnapshot::from).collect();
155 Ok(Response::new(GetOpenOrdersResponse {
156 orders: proto_orders,
157 }))
158 }
159
160 async fn get_status(
161 &self,
162 _request: Request<GetStatusRequest>,
163 ) -> Result<Response<GetStatusResponse>, Status> {
164 let (equity, liquidate_only) = {
165 let guard = self.portfolio.lock().await;
166 (guard.equity(), guard.liquidate_only())
167 };
168 let response = GetStatusResponse {
169 shutdown: self.shutdown.triggered(),
170 liquidate_only,
171 active_algorithms: self.orchestrator.active_algorithms_count() as u32,
172 last_data_timestamp: self.last_data_timestamp(),
173 equity: Some(to_decimal_proto(equity)),
174 };
175 Ok(Response::new(response))
176 }
177
178 async fn cancel_all(
179 &self,
180 _request: Request<CancelAllRequest>,
181 ) -> Result<Response<CancelAllResponse>, Status> {
182 match self.cancel_all_impl().await {
183 Ok((orders, algos)) => Ok(Response::new(CancelAllResponse {
184 cancelled_orders: orders,
185 cancelled_algorithms: algos,
186 })),
187 Err(err) => Err(Status::internal(err.to_string())),
188 }
189 }
190
191 async fn monitor(
192 &self,
193 _request: Request<MonitorRequest>,
194 ) -> Result<Response<Self::MonitorStream>, Status> {
195 let mut stream = self.event_bus.subscribe();
196 info!("monitor subscriber connected");
197 let (tx, rx) = mpsc::channel(256);
198 tokio::spawn(async move {
199 loop {
200 match stream.recv().await {
201 Ok(event) => {
202 let label = event_label(&event);
203 debug!(kind = label, "monitor captured event");
204 if let Some(proto) = event_to_proto(event) {
205 if tx.send(Ok(proto)).await.is_err() {
206 warn!(kind = label, "monitor stream receiver dropped during send");
207 break;
208 } else {
209 debug!(kind = label, "monitor event forwarded to client");
210 }
211 } else {
212 debug!(kind = label, "monitor event skipped (no proto mapping)");
213 }
214 }
215 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
216 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
217 warn!(lag, "monitor stream lagged; dropping events");
218 continue;
219 }
220 }
221 }
222 });
223 Ok(Response::new(ReceiverStream::new(rx)))
224 }
225}
226
227fn event_to_proto(event: RuntimeEvent) -> Option<proto::Event> {
228 use tesser_rpc::proto::event::Payload;
229
230 match event {
231 RuntimeEvent::Tick(evt) => Some(proto::Event {
232 payload: Some(Payload::Tick(evt.tick.into())),
233 }),
234 RuntimeEvent::Candle(evt) => Some(proto::Event {
235 payload: Some(Payload::Candle(evt.candle.into())),
236 }),
237 RuntimeEvent::Signal(evt) => Some(proto::Event {
238 payload: Some(Payload::Signal(evt.signal.into())),
239 }),
240 RuntimeEvent::Fill(evt) => Some(proto::Event {
241 payload: Some(Payload::Fill(evt.fill.into())),
242 }),
243 RuntimeEvent::OrderUpdate(evt) => Some(proto::Event {
244 payload: Some(Payload::Order(evt.order.into())),
245 }),
246 RuntimeEvent::OrderBook(book) => {
247 debug!(symbol = %book.order_book.symbol, "monitor dropping order book event");
248 None
249 }
250 }
251}
252
253fn event_label(event: &RuntimeEvent) -> &'static str {
254 match event {
255 RuntimeEvent::Tick(_) => "tick",
256 RuntimeEvent::Candle(_) => "candle",
257 RuntimeEvent::Signal(_) => "signal",
258 RuntimeEvent::Fill(_) => "fill",
259 RuntimeEvent::OrderUpdate(_) => "order",
260 RuntimeEvent::OrderBook(_) => "order_book",
261 }
262}