tesser_cli/
control.rs

1use std::net::SocketAddr;
2use std::sync::atomic::{AtomicI64, Ordering};
3use std::sync::Arc;
4
5use anyhow::Result;
6use tokio::sync::{mpsc, Mutex};
7use tokio::task::JoinHandle;
8use tokio_stream::wrappers::ReceiverStream;
9use tonic::transport::Server;
10use tonic::{Request, Response, Status};
11use tracing::{debug, info, warn};
12
13use tesser_events::{Event as RuntimeEvent, EventBus};
14use tesser_execution::OrderOrchestrator;
15use tesser_portfolio::{LiveState, Portfolio};
16use tesser_rpc::conversions::to_decimal_proto;
17use tesser_rpc::proto::control_service_server::{ControlService, ControlServiceServer};
18use tesser_rpc::proto::{
19    self, CancelAllRequest, CancelAllResponse, Event, GetOpenOrdersRequest, GetOpenOrdersResponse,
20    GetPortfolioRequest, GetPortfolioResponse, GetStatusRequest, GetStatusResponse, MonitorRequest,
21    OrderSnapshot, PortfolioSnapshot,
22};
23
24use crate::live::ShutdownSignal;
25
26/// Launch the Control Plane gRPC server alongside the live runtime.
27pub fn spawn_control_plane(
28    addr: SocketAddr,
29    portfolio: Arc<Mutex<Portfolio>>,
30    orchestrator: Arc<OrderOrchestrator>,
31    persisted: Arc<Mutex<LiveState>>,
32    last_data_timestamp: Arc<AtomicI64>,
33    event_bus: Arc<EventBus>,
34    shutdown: ShutdownSignal,
35) -> JoinHandle<()> {
36    let service = ControlGrpcService::new(
37        portfolio,
38        orchestrator,
39        persisted,
40        last_data_timestamp,
41        event_bus,
42        shutdown.clone(),
43    );
44    info!(%addr, "starting control plane gRPC server");
45    tokio::spawn(async move {
46        if let Err(err) = Server::builder()
47            .add_service(ControlServiceServer::new(service))
48            .serve_with_shutdown(addr, async move { shutdown.wait().await })
49            .await
50        {
51            warn!(error = %err, "control plane server exited with error");
52        }
53    })
54}
55
56struct ControlGrpcService {
57    portfolio: Arc<Mutex<Portfolio>>,
58    orchestrator: Arc<OrderOrchestrator>,
59    persisted: Arc<Mutex<LiveState>>,
60    last_data_timestamp: Arc<AtomicI64>,
61    event_bus: Arc<EventBus>,
62    shutdown: ShutdownSignal,
63}
64
65impl ControlGrpcService {
66    fn new(
67        portfolio: Arc<Mutex<Portfolio>>,
68        orchestrator: Arc<OrderOrchestrator>,
69        persisted: Arc<Mutex<LiveState>>,
70        last_data_timestamp: Arc<AtomicI64>,
71        event_bus: Arc<EventBus>,
72        shutdown: ShutdownSignal,
73    ) -> Self {
74        Self {
75            portfolio,
76            orchestrator,
77            persisted,
78            last_data_timestamp,
79            event_bus,
80            shutdown,
81        }
82    }
83
84    fn last_data_timestamp(&self) -> Option<prost_types::Timestamp> {
85        let secs = self.last_data_timestamp.load(Ordering::SeqCst);
86        if secs <= 0 {
87            return None;
88        }
89        Some(prost_types::Timestamp {
90            seconds: secs,
91            nanos: 0,
92        })
93    }
94
95    async fn cancel_all_impl(&self) -> Result<(u32, u32)> {
96        let algo_ids: Vec<_> = self
97            .orchestrator
98            .algorithm_statuses()
99            .keys()
100            .copied()
101            .collect();
102        let mut cancelled_algorithms = 0u32;
103        for algo_id in algo_ids {
104            match self.orchestrator.cancel_algo(&algo_id).await {
105                Ok(_) => cancelled_algorithms += 1,
106                Err(err) => warn!(algo = %algo_id, error = %err, "failed to cancel algorithm"),
107            }
108        }
109
110        let open_orders = {
111            let state = self.persisted.lock().await;
112            state.open_orders.clone()
113        };
114        let client = self.orchestrator.execution_engine().client();
115        let mut cancelled_orders = 0u32;
116        for order in open_orders {
117            match client
118                .cancel_order(order.id.clone(), &order.request.symbol)
119                .await
120            {
121                Ok(_) => cancelled_orders += 1,
122                Err(err) => warn!(order_id = %order.id, error = %err, "failed to cancel order"),
123            }
124        }
125        Ok((cancelled_orders, cancelled_algorithms))
126    }
127}
128
129#[tonic::async_trait]
130impl ControlService for ControlGrpcService {
131    type MonitorStream = ReceiverStream<Result<Event, Status>>;
132
133    async fn get_portfolio(
134        &self,
135        _request: Request<GetPortfolioRequest>,
136    ) -> Result<Response<GetPortfolioResponse>, Status> {
137        let snapshot: PortfolioSnapshot = {
138            let guard = self.portfolio.lock().await;
139            PortfolioSnapshot::from(&*guard)
140        };
141        Ok(Response::new(GetPortfolioResponse {
142            portfolio: Some(snapshot),
143        }))
144    }
145
146    async fn get_open_orders(
147        &self,
148        _request: Request<GetOpenOrdersRequest>,
149    ) -> Result<Response<GetOpenOrdersResponse>, Status> {
150        let orders = {
151            let state = self.persisted.lock().await;
152            state.open_orders.clone()
153        };
154        let proto_orders: Vec<OrderSnapshot> = orders.iter().map(OrderSnapshot::from).collect();
155        Ok(Response::new(GetOpenOrdersResponse {
156            orders: proto_orders,
157        }))
158    }
159
160    async fn get_status(
161        &self,
162        _request: Request<GetStatusRequest>,
163    ) -> Result<Response<GetStatusResponse>, Status> {
164        let (equity, liquidate_only) = {
165            let guard = self.portfolio.lock().await;
166            (guard.equity(), guard.liquidate_only())
167        };
168        let response = GetStatusResponse {
169            shutdown: self.shutdown.triggered(),
170            liquidate_only,
171            active_algorithms: self.orchestrator.active_algorithms_count() as u32,
172            last_data_timestamp: self.last_data_timestamp(),
173            equity: Some(to_decimal_proto(equity)),
174        };
175        Ok(Response::new(response))
176    }
177
178    async fn cancel_all(
179        &self,
180        _request: Request<CancelAllRequest>,
181    ) -> Result<Response<CancelAllResponse>, Status> {
182        match self.cancel_all_impl().await {
183            Ok((orders, algos)) => Ok(Response::new(CancelAllResponse {
184                cancelled_orders: orders,
185                cancelled_algorithms: algos,
186            })),
187            Err(err) => Err(Status::internal(err.to_string())),
188        }
189    }
190
191    async fn monitor(
192        &self,
193        _request: Request<MonitorRequest>,
194    ) -> Result<Response<Self::MonitorStream>, Status> {
195        let mut stream = self.event_bus.subscribe();
196        info!("monitor subscriber connected");
197        let (tx, rx) = mpsc::channel(256);
198        tokio::spawn(async move {
199            loop {
200                match stream.recv().await {
201                    Ok(event) => {
202                        let label = event_label(&event);
203                        debug!(kind = label, "monitor captured event");
204                        if let Some(proto) = event_to_proto(event) {
205                            if tx.send(Ok(proto)).await.is_err() {
206                                warn!(kind = label, "monitor stream receiver dropped during send");
207                                break;
208                            } else {
209                                debug!(kind = label, "monitor event forwarded to client");
210                            }
211                        } else {
212                            debug!(kind = label, "monitor event skipped (no proto mapping)");
213                        }
214                    }
215                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
216                    Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
217                        warn!(lag, "monitor stream lagged; dropping events");
218                        continue;
219                    }
220                }
221            }
222        });
223        Ok(Response::new(ReceiverStream::new(rx)))
224    }
225}
226
227fn event_to_proto(event: RuntimeEvent) -> Option<proto::Event> {
228    use tesser_rpc::proto::event::Payload;
229
230    match event {
231        RuntimeEvent::Tick(evt) => Some(proto::Event {
232            payload: Some(Payload::Tick(evt.tick.into())),
233        }),
234        RuntimeEvent::Candle(evt) => Some(proto::Event {
235            payload: Some(Payload::Candle(evt.candle.into())),
236        }),
237        RuntimeEvent::Signal(evt) => Some(proto::Event {
238            payload: Some(Payload::Signal(evt.signal.into())),
239        }),
240        RuntimeEvent::Fill(evt) => Some(proto::Event {
241            payload: Some(Payload::Fill(evt.fill.into())),
242        }),
243        RuntimeEvent::OrderUpdate(evt) => Some(proto::Event {
244            payload: Some(Payload::Order(evt.order.into())),
245        }),
246        RuntimeEvent::OrderBook(book) => {
247            debug!(symbol = %book.order_book.symbol, "monitor dropping order book event");
248            None
249        }
250    }
251}
252
253fn event_label(event: &RuntimeEvent) -> &'static str {
254    match event {
255        RuntimeEvent::Tick(_) => "tick",
256        RuntimeEvent::Candle(_) => "candle",
257        RuntimeEvent::Signal(_) => "signal",
258        RuntimeEvent::Fill(_) => "fill",
259        RuntimeEvent::OrderUpdate(_) => "order",
260        RuntimeEvent::OrderBook(_) => "order_book",
261    }
262}