1use std::any::Any;
2use std::net::SocketAddr;
3use std::sync::atomic::{AtomicI64, Ordering};
4use std::sync::Arc;
5
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use tokio::sync::{mpsc, Mutex};
9use tokio::task::JoinHandle;
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::Server;
12use tonic::{Request, Response, Status};
13use tracing::{debug, info, warn};
14
15use tesser_core::ExitStrategy;
16use tesser_events::{Event as RuntimeEvent, EventBus};
17use tesser_execution::OrderOrchestrator;
18use tesser_portfolio::{LiveState, Portfolio};
19use tesser_rpc::conversions::to_decimal_proto;
20use tesser_rpc::proto::control_service_server::{ControlService, ControlServiceServer};
21use tesser_rpc::proto::{
22 self, CancelAllRequest, CancelAllResponse, Event, GetOpenOrdersRequest, GetOpenOrdersResponse,
23 GetPortfolioRequest, GetPortfolioResponse, GetStatusRequest, GetStatusResponse,
24 ListManagedTradesRequest, ListManagedTradesResponse, ManagedTradeInfo, MonitorRequest,
25 OrderSnapshot, PortfolioSnapshot, UpdateTradeExitStrategyRequest,
26 UpdateTradeExitStrategyResponse,
27};
28use tesser_strategy::{PairTradeSnapshot, PairsTradingArbitrage, Strategy, StrategyResult};
29use uuid::Uuid;
30
31use crate::live::ShutdownSignal;
32
33pub struct ControlPlaneComponents {
34 pub portfolio: Arc<Mutex<Portfolio>>,
35 pub orchestrator: Arc<OrderOrchestrator>,
36 pub persisted: Arc<Mutex<LiveState>>,
37 pub last_data_timestamp: Arc<AtomicI64>,
38 pub event_bus: Arc<EventBus>,
39 pub strategy: Arc<Mutex<Box<dyn Strategy>>>,
40 pub shutdown: ShutdownSignal,
41}
42
43pub fn spawn_control_plane(addr: SocketAddr, components: ControlPlaneComponents) -> JoinHandle<()> {
45 let ControlPlaneComponents {
46 portfolio,
47 orchestrator,
48 persisted,
49 last_data_timestamp,
50 event_bus,
51 strategy,
52 shutdown,
53 } = components;
54 let service = ControlGrpcService::new(
55 portfolio,
56 orchestrator,
57 persisted,
58 last_data_timestamp,
59 event_bus,
60 strategy,
61 shutdown.clone(),
62 );
63 info!(%addr, "starting control plane gRPC server");
64 tokio::spawn(async move {
65 if let Err(err) = Server::builder()
66 .add_service(ControlServiceServer::new(service))
67 .serve_with_shutdown(addr, async move { shutdown.wait().await })
68 .await
69 {
70 warn!(error = %err, "control plane server exited with error");
71 }
72 })
73}
74
75struct ControlGrpcService {
76 portfolio: Arc<Mutex<Portfolio>>,
77 orchestrator: Arc<OrderOrchestrator>,
78 persisted: Arc<Mutex<LiveState>>,
79 last_data_timestamp: Arc<AtomicI64>,
80 event_bus: Arc<EventBus>,
81 strategy: Arc<Mutex<Box<dyn Strategy>>>,
82 shutdown: ShutdownSignal,
83}
84
85impl ControlGrpcService {
86 fn new(
87 portfolio: Arc<Mutex<Portfolio>>,
88 orchestrator: Arc<OrderOrchestrator>,
89 persisted: Arc<Mutex<LiveState>>,
90 last_data_timestamp: Arc<AtomicI64>,
91 event_bus: Arc<EventBus>,
92 strategy: Arc<Mutex<Box<dyn Strategy>>>,
93 shutdown: ShutdownSignal,
94 ) -> Self {
95 Self {
96 portfolio,
97 orchestrator,
98 persisted,
99 last_data_timestamp,
100 event_bus,
101 strategy,
102 shutdown,
103 }
104 }
105
106 fn last_data_timestamp(&self) -> Option<prost_types::Timestamp> {
107 let secs = self.last_data_timestamp.load(Ordering::SeqCst);
108 if secs <= 0 {
109 return None;
110 }
111 Some(prost_types::Timestamp {
112 seconds: secs,
113 nanos: 0,
114 })
115 }
116
117 async fn cancel_all_impl(&self) -> Result<(u32, u32)> {
118 let algo_ids: Vec<_> = self
119 .orchestrator
120 .algorithm_statuses()
121 .keys()
122 .copied()
123 .collect();
124 let mut cancelled_algorithms = 0u32;
125 for algo_id in algo_ids {
126 match self.orchestrator.cancel_algo(&algo_id).await {
127 Ok(_) => cancelled_algorithms += 1,
128 Err(err) => warn!(algo = %algo_id, error = %err, "failed to cancel algorithm"),
129 }
130 }
131
132 let open_orders = {
133 let state = self.persisted.lock().await;
134 state.open_orders.clone()
135 };
136 let client = self.orchestrator.execution_engine().client();
137 let mut cancelled_orders = 0u32;
138 for order in open_orders {
139 let symbol = order.request.symbol;
140 match client.cancel_order(order.id.clone(), symbol).await {
141 Ok(_) => cancelled_orders += 1,
142 Err(err) => warn!(order_id = %order.id, error = %err, "failed to cancel order"),
143 }
144 }
145 Ok((cancelled_orders, cancelled_algorithms))
146 }
147
148 async fn with_pairs_strategy<R>(
149 &self,
150 f: impl FnOnce(&mut PairsTradingArbitrage) -> StrategyResult<R>,
151 ) -> Result<R, Status> {
152 let mut guard = self.strategy.lock().await;
153 let any = (&mut **guard) as &mut dyn Any;
154 let Some(pairs) = any.downcast_mut::<PairsTradingArbitrage>() else {
155 return Err(Status::failed_precondition(
156 "active strategy does not expose managed trades",
157 ));
158 };
159 f(pairs).map_err(|err| Status::internal(err.to_string()))
160 }
161
162 #[allow(clippy::result_large_err)]
163 fn snapshot_to_proto(snapshot: PairTradeSnapshot) -> Result<ManagedTradeInfo, Status> {
164 let exit_strategy_json = serde_json::to_string(&snapshot.exit_strategy)
165 .map_err(|err| Status::internal(format!("failed to encode exit strategy: {err}")))?;
166 Ok(ManagedTradeInfo {
167 trade_id: snapshot.trade_id.to_string(),
168 symbol_a: snapshot.symbols[0].to_string(),
169 symbol_b: snapshot.symbols[1].to_string(),
170 direction: format!("{:?}", snapshot.direction),
171 entry_timestamp: Some(timestamp_from_datetime(snapshot.entry_timestamp)),
172 entry_z: Some(to_decimal_proto(snapshot.entry_z_score)),
173 candles_held: snapshot.candles_held,
174 exit_strategy_json,
175 })
176 }
177}
178
179#[tonic::async_trait]
180impl ControlService for ControlGrpcService {
181 type MonitorStream = ReceiverStream<Result<Event, Status>>;
182
183 async fn get_portfolio(
184 &self,
185 _request: Request<GetPortfolioRequest>,
186 ) -> Result<Response<GetPortfolioResponse>, Status> {
187 let snapshot: PortfolioSnapshot = {
188 let guard = self.portfolio.lock().await;
189 PortfolioSnapshot::from(&*guard)
190 };
191 Ok(Response::new(GetPortfolioResponse {
192 portfolio: Some(snapshot),
193 }))
194 }
195
196 async fn get_open_orders(
197 &self,
198 _request: Request<GetOpenOrdersRequest>,
199 ) -> Result<Response<GetOpenOrdersResponse>, Status> {
200 let orders = {
201 let state = self.persisted.lock().await;
202 state.open_orders.clone()
203 };
204 let proto_orders: Vec<OrderSnapshot> = orders.iter().map(OrderSnapshot::from).collect();
205 Ok(Response::new(GetOpenOrdersResponse {
206 orders: proto_orders,
207 }))
208 }
209
210 async fn get_status(
211 &self,
212 _request: Request<GetStatusRequest>,
213 ) -> Result<Response<GetStatusResponse>, Status> {
214 let (equity, liquidate_only) = {
215 let guard = self.portfolio.lock().await;
216 (guard.equity(), guard.liquidate_only())
217 };
218 let response = GetStatusResponse {
219 shutdown: self.shutdown.triggered(),
220 liquidate_only,
221 active_algorithms: self.orchestrator.active_algorithms_count() as u32,
222 last_data_timestamp: self.last_data_timestamp(),
223 equity: Some(to_decimal_proto(equity)),
224 };
225 Ok(Response::new(response))
226 }
227
228 async fn cancel_all(
229 &self,
230 _request: Request<CancelAllRequest>,
231 ) -> Result<Response<CancelAllResponse>, Status> {
232 match self.cancel_all_impl().await {
233 Ok((orders, algos)) => Ok(Response::new(CancelAllResponse {
234 cancelled_orders: orders,
235 cancelled_algorithms: algos,
236 })),
237 Err(err) => Err(Status::internal(err.to_string())),
238 }
239 }
240
241 async fn list_managed_trades(
242 &self,
243 _request: Request<ListManagedTradesRequest>,
244 ) -> Result<Response<ListManagedTradesResponse>, Status> {
245 let snapshots = self
246 .with_pairs_strategy(|strategy| Ok(strategy.managed_trades()))
247 .await?;
248 let mut trades = Vec::with_capacity(snapshots.len());
249 for snapshot in snapshots {
250 trades.push(Self::snapshot_to_proto(snapshot)?);
251 }
252 Ok(Response::new(ListManagedTradesResponse { trades }))
253 }
254
255 async fn update_trade_exit_strategy(
256 &self,
257 request: Request<UpdateTradeExitStrategyRequest>,
258 ) -> Result<Response<UpdateTradeExitStrategyResponse>, Status> {
259 let payload = request.into_inner();
260 let trade_id = Uuid::parse_str(&payload.trade_id)
261 .map_err(|err| Status::invalid_argument(format!("invalid trade_id: {err}")))?;
262 let new_strategy: ExitStrategy =
263 serde_json::from_str(&payload.new_strategy_json).map_err(|err| {
264 Status::invalid_argument(format!("invalid exit strategy json: {err}"))
265 })?;
266 self.with_pairs_strategy(|strategy| {
267 strategy
268 .update_trade_exit_strategy(trade_id, new_strategy.clone())
269 .map(|_| ())
270 })
271 .await?;
272 Ok(Response::new(UpdateTradeExitStrategyResponse {
273 success: true,
274 error_message: String::new(),
275 }))
276 }
277
278 async fn monitor(
279 &self,
280 _request: Request<MonitorRequest>,
281 ) -> Result<Response<Self::MonitorStream>, Status> {
282 let mut stream = self.event_bus.subscribe();
283 info!("monitor subscriber connected");
284 let (tx, rx) = mpsc::channel(256);
285 tokio::spawn(async move {
286 loop {
287 match stream.recv().await {
288 Ok(event) => {
289 let label = event_label(&event);
290 debug!(kind = label, "monitor captured event");
291 if let Some(proto) = event_to_proto(event) {
292 if tx.send(Ok(proto)).await.is_err() {
293 warn!(kind = label, "monitor stream receiver dropped during send");
294 break;
295 } else {
296 debug!(kind = label, "monitor event forwarded to client");
297 }
298 } else {
299 debug!(kind = label, "monitor event skipped (no proto mapping)");
300 }
301 }
302 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
303 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
304 warn!(lag, "monitor stream lagged; dropping events");
305 continue;
306 }
307 }
308 }
309 });
310 Ok(Response::new(ReceiverStream::new(rx)))
311 }
312}
313
314fn event_to_proto(event: RuntimeEvent) -> Option<proto::Event> {
315 use tesser_rpc::proto::event::Payload;
316
317 match event {
318 RuntimeEvent::Tick(evt) => Some(proto::Event {
319 payload: Some(Payload::Tick(evt.tick.into())),
320 }),
321 RuntimeEvent::Candle(evt) => Some(proto::Event {
322 payload: Some(Payload::Candle(evt.candle.into())),
323 }),
324 RuntimeEvent::Signal(evt) => Some(proto::Event {
325 payload: Some(Payload::Signal(evt.signal.into())),
326 }),
327 RuntimeEvent::Fill(evt) => Some(proto::Event {
328 payload: Some(Payload::Fill(evt.fill.into())),
329 }),
330 RuntimeEvent::OrderUpdate(evt) => Some(proto::Event {
331 payload: Some(Payload::Order(evt.order.into())),
332 }),
333 RuntimeEvent::OrderBook(book) => {
334 debug!(symbol = %book.order_book.symbol, "monitor dropping order book event");
335 None
336 }
337 }
338}
339
340fn event_label(event: &RuntimeEvent) -> &'static str {
341 match event {
342 RuntimeEvent::Tick(_) => "tick",
343 RuntimeEvent::Candle(_) => "candle",
344 RuntimeEvent::Signal(_) => "signal",
345 RuntimeEvent::Fill(_) => "fill",
346 RuntimeEvent::OrderUpdate(_) => "order",
347 RuntimeEvent::OrderBook(_) => "order_book",
348 }
349}
350
351fn timestamp_from_datetime(ts: DateTime<Utc>) -> prost_types::Timestamp {
352 prost_types::Timestamp {
353 seconds: ts.timestamp(),
354 nanos: ts.timestamp_subsec_nanos() as i32,
355 }
356}