tesser_cli/
control.rs

1use std::any::Any;
2use std::net::SocketAddr;
3use std::sync::atomic::{AtomicI64, Ordering};
4use std::sync::Arc;
5
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use tokio::sync::{mpsc, Mutex};
9use tokio::task::JoinHandle;
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::Server;
12use tonic::{Request, Response, Status};
13use tracing::{debug, info, warn};
14
15use tesser_core::ExitStrategy;
16use tesser_events::{Event as RuntimeEvent, EventBus};
17use tesser_execution::OrderOrchestrator;
18use tesser_portfolio::{LiveState, Portfolio};
19use tesser_rpc::conversions::to_decimal_proto;
20use tesser_rpc::proto::control_service_server::{ControlService, ControlServiceServer};
21use tesser_rpc::proto::{
22    self, CancelAllRequest, CancelAllResponse, Event, GetOpenOrdersRequest, GetOpenOrdersResponse,
23    GetPortfolioRequest, GetPortfolioResponse, GetStatusRequest, GetStatusResponse,
24    ListManagedTradesRequest, ListManagedTradesResponse, ManagedTradeInfo, MonitorRequest,
25    OrderSnapshot, PortfolioSnapshot, UpdateTradeExitStrategyRequest,
26    UpdateTradeExitStrategyResponse,
27};
28use tesser_strategy::{PairTradeSnapshot, PairsTradingArbitrage, Strategy, StrategyResult};
29use uuid::Uuid;
30
31use crate::live::ShutdownSignal;
32
33pub struct ControlPlaneComponents {
34    pub portfolio: Arc<Mutex<Portfolio>>,
35    pub orchestrator: Arc<OrderOrchestrator>,
36    pub persisted: Arc<Mutex<LiveState>>,
37    pub last_data_timestamp: Arc<AtomicI64>,
38    pub event_bus: Arc<EventBus>,
39    pub strategy: Arc<Mutex<Box<dyn Strategy>>>,
40    pub shutdown: ShutdownSignal,
41}
42
43/// Launch the Control Plane gRPC server alongside the live runtime.
44pub fn spawn_control_plane(addr: SocketAddr, components: ControlPlaneComponents) -> JoinHandle<()> {
45    let ControlPlaneComponents {
46        portfolio,
47        orchestrator,
48        persisted,
49        last_data_timestamp,
50        event_bus,
51        strategy,
52        shutdown,
53    } = components;
54    let service = ControlGrpcService::new(
55        portfolio,
56        orchestrator,
57        persisted,
58        last_data_timestamp,
59        event_bus,
60        strategy,
61        shutdown.clone(),
62    );
63    info!(%addr, "starting control plane gRPC server");
64    tokio::spawn(async move {
65        if let Err(err) = Server::builder()
66            .add_service(ControlServiceServer::new(service))
67            .serve_with_shutdown(addr, async move { shutdown.wait().await })
68            .await
69        {
70            warn!(error = %err, "control plane server exited with error");
71        }
72    })
73}
74
75struct ControlGrpcService {
76    portfolio: Arc<Mutex<Portfolio>>,
77    orchestrator: Arc<OrderOrchestrator>,
78    persisted: Arc<Mutex<LiveState>>,
79    last_data_timestamp: Arc<AtomicI64>,
80    event_bus: Arc<EventBus>,
81    strategy: Arc<Mutex<Box<dyn Strategy>>>,
82    shutdown: ShutdownSignal,
83}
84
85impl ControlGrpcService {
86    fn new(
87        portfolio: Arc<Mutex<Portfolio>>,
88        orchestrator: Arc<OrderOrchestrator>,
89        persisted: Arc<Mutex<LiveState>>,
90        last_data_timestamp: Arc<AtomicI64>,
91        event_bus: Arc<EventBus>,
92        strategy: Arc<Mutex<Box<dyn Strategy>>>,
93        shutdown: ShutdownSignal,
94    ) -> Self {
95        Self {
96            portfolio,
97            orchestrator,
98            persisted,
99            last_data_timestamp,
100            event_bus,
101            strategy,
102            shutdown,
103        }
104    }
105
106    fn last_data_timestamp(&self) -> Option<prost_types::Timestamp> {
107        let secs = self.last_data_timestamp.load(Ordering::SeqCst);
108        if secs <= 0 {
109            return None;
110        }
111        Some(prost_types::Timestamp {
112            seconds: secs,
113            nanos: 0,
114        })
115    }
116
117    async fn cancel_all_impl(&self) -> Result<(u32, u32)> {
118        let algo_ids: Vec<_> = self
119            .orchestrator
120            .algorithm_statuses()
121            .keys()
122            .copied()
123            .collect();
124        let mut cancelled_algorithms = 0u32;
125        for algo_id in algo_ids {
126            match self.orchestrator.cancel_algo(&algo_id).await {
127                Ok(_) => cancelled_algorithms += 1,
128                Err(err) => warn!(algo = %algo_id, error = %err, "failed to cancel algorithm"),
129            }
130        }
131
132        let open_orders = {
133            let state = self.persisted.lock().await;
134            state.open_orders.clone()
135        };
136        let client = self.orchestrator.execution_engine().client();
137        let mut cancelled_orders = 0u32;
138        for order in open_orders {
139            let symbol = order.request.symbol;
140            match client.cancel_order(order.id.clone(), symbol).await {
141                Ok(_) => cancelled_orders += 1,
142                Err(err) => warn!(order_id = %order.id, error = %err, "failed to cancel order"),
143            }
144        }
145        Ok((cancelled_orders, cancelled_algorithms))
146    }
147
148    async fn with_pairs_strategy<R>(
149        &self,
150        f: impl FnOnce(&mut PairsTradingArbitrage) -> StrategyResult<R>,
151    ) -> Result<R, Status> {
152        let mut guard = self.strategy.lock().await;
153        let any = (&mut **guard) as &mut dyn Any;
154        let Some(pairs) = any.downcast_mut::<PairsTradingArbitrage>() else {
155            return Err(Status::failed_precondition(
156                "active strategy does not expose managed trades",
157            ));
158        };
159        f(pairs).map_err(|err| Status::internal(err.to_string()))
160    }
161
162    #[allow(clippy::result_large_err)]
163    fn snapshot_to_proto(snapshot: PairTradeSnapshot) -> Result<ManagedTradeInfo, Status> {
164        let exit_strategy_json = serde_json::to_string(&snapshot.exit_strategy)
165            .map_err(|err| Status::internal(format!("failed to encode exit strategy: {err}")))?;
166        Ok(ManagedTradeInfo {
167            trade_id: snapshot.trade_id.to_string(),
168            symbol_a: snapshot.symbols[0].to_string(),
169            symbol_b: snapshot.symbols[1].to_string(),
170            direction: format!("{:?}", snapshot.direction),
171            entry_timestamp: Some(timestamp_from_datetime(snapshot.entry_timestamp)),
172            entry_z: Some(to_decimal_proto(snapshot.entry_z_score)),
173            candles_held: snapshot.candles_held,
174            exit_strategy_json,
175        })
176    }
177}
178
179#[tonic::async_trait]
180impl ControlService for ControlGrpcService {
181    type MonitorStream = ReceiverStream<Result<Event, Status>>;
182
183    async fn get_portfolio(
184        &self,
185        _request: Request<GetPortfolioRequest>,
186    ) -> Result<Response<GetPortfolioResponse>, Status> {
187        let snapshot: PortfolioSnapshot = {
188            let guard = self.portfolio.lock().await;
189            PortfolioSnapshot::from(&*guard)
190        };
191        Ok(Response::new(GetPortfolioResponse {
192            portfolio: Some(snapshot),
193        }))
194    }
195
196    async fn get_open_orders(
197        &self,
198        _request: Request<GetOpenOrdersRequest>,
199    ) -> Result<Response<GetOpenOrdersResponse>, Status> {
200        let orders = {
201            let state = self.persisted.lock().await;
202            state.open_orders.clone()
203        };
204        let proto_orders: Vec<OrderSnapshot> = orders.iter().map(OrderSnapshot::from).collect();
205        Ok(Response::new(GetOpenOrdersResponse {
206            orders: proto_orders,
207        }))
208    }
209
210    async fn get_status(
211        &self,
212        _request: Request<GetStatusRequest>,
213    ) -> Result<Response<GetStatusResponse>, Status> {
214        let (equity, liquidate_only) = {
215            let guard = self.portfolio.lock().await;
216            (guard.equity(), guard.liquidate_only())
217        };
218        let response = GetStatusResponse {
219            shutdown: self.shutdown.triggered(),
220            liquidate_only,
221            active_algorithms: self.orchestrator.active_algorithms_count() as u32,
222            last_data_timestamp: self.last_data_timestamp(),
223            equity: Some(to_decimal_proto(equity)),
224        };
225        Ok(Response::new(response))
226    }
227
228    async fn cancel_all(
229        &self,
230        _request: Request<CancelAllRequest>,
231    ) -> Result<Response<CancelAllResponse>, Status> {
232        match self.cancel_all_impl().await {
233            Ok((orders, algos)) => Ok(Response::new(CancelAllResponse {
234                cancelled_orders: orders,
235                cancelled_algorithms: algos,
236            })),
237            Err(err) => Err(Status::internal(err.to_string())),
238        }
239    }
240
241    async fn list_managed_trades(
242        &self,
243        _request: Request<ListManagedTradesRequest>,
244    ) -> Result<Response<ListManagedTradesResponse>, Status> {
245        let snapshots = self
246            .with_pairs_strategy(|strategy| Ok(strategy.managed_trades()))
247            .await?;
248        let mut trades = Vec::with_capacity(snapshots.len());
249        for snapshot in snapshots {
250            trades.push(Self::snapshot_to_proto(snapshot)?);
251        }
252        Ok(Response::new(ListManagedTradesResponse { trades }))
253    }
254
255    async fn update_trade_exit_strategy(
256        &self,
257        request: Request<UpdateTradeExitStrategyRequest>,
258    ) -> Result<Response<UpdateTradeExitStrategyResponse>, Status> {
259        let payload = request.into_inner();
260        let trade_id = Uuid::parse_str(&payload.trade_id)
261            .map_err(|err| Status::invalid_argument(format!("invalid trade_id: {err}")))?;
262        let new_strategy: ExitStrategy =
263            serde_json::from_str(&payload.new_strategy_json).map_err(|err| {
264                Status::invalid_argument(format!("invalid exit strategy json: {err}"))
265            })?;
266        self.with_pairs_strategy(|strategy| {
267            strategy
268                .update_trade_exit_strategy(trade_id, new_strategy.clone())
269                .map(|_| ())
270        })
271        .await?;
272        Ok(Response::new(UpdateTradeExitStrategyResponse {
273            success: true,
274            error_message: String::new(),
275        }))
276    }
277
278    async fn monitor(
279        &self,
280        _request: Request<MonitorRequest>,
281    ) -> Result<Response<Self::MonitorStream>, Status> {
282        let mut stream = self.event_bus.subscribe();
283        info!("monitor subscriber connected");
284        let (tx, rx) = mpsc::channel(256);
285        tokio::spawn(async move {
286            loop {
287                match stream.recv().await {
288                    Ok(event) => {
289                        let label = event_label(&event);
290                        debug!(kind = label, "monitor captured event");
291                        if let Some(proto) = event_to_proto(event) {
292                            if tx.send(Ok(proto)).await.is_err() {
293                                warn!(kind = label, "monitor stream receiver dropped during send");
294                                break;
295                            } else {
296                                debug!(kind = label, "monitor event forwarded to client");
297                            }
298                        } else {
299                            debug!(kind = label, "monitor event skipped (no proto mapping)");
300                        }
301                    }
302                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
303                    Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
304                        warn!(lag, "monitor stream lagged; dropping events");
305                        continue;
306                    }
307                }
308            }
309        });
310        Ok(Response::new(ReceiverStream::new(rx)))
311    }
312}
313
314fn event_to_proto(event: RuntimeEvent) -> Option<proto::Event> {
315    use tesser_rpc::proto::event::Payload;
316
317    match event {
318        RuntimeEvent::Tick(evt) => Some(proto::Event {
319            payload: Some(Payload::Tick(evt.tick.into())),
320        }),
321        RuntimeEvent::Candle(evt) => Some(proto::Event {
322            payload: Some(Payload::Candle(evt.candle.into())),
323        }),
324        RuntimeEvent::Signal(evt) => Some(proto::Event {
325            payload: Some(Payload::Signal(evt.signal.into())),
326        }),
327        RuntimeEvent::Fill(evt) => Some(proto::Event {
328            payload: Some(Payload::Fill(evt.fill.into())),
329        }),
330        RuntimeEvent::OrderUpdate(evt) => Some(proto::Event {
331            payload: Some(Payload::Order(evt.order.into())),
332        }),
333        RuntimeEvent::OrderBook(book) => {
334            debug!(symbol = %book.order_book.symbol, "monitor dropping order book event");
335            None
336        }
337    }
338}
339
340fn event_label(event: &RuntimeEvent) -> &'static str {
341    match event {
342        RuntimeEvent::Tick(_) => "tick",
343        RuntimeEvent::Candle(_) => "candle",
344        RuntimeEvent::Signal(_) => "signal",
345        RuntimeEvent::Fill(_) => "fill",
346        RuntimeEvent::OrderUpdate(_) => "order",
347        RuntimeEvent::OrderBook(_) => "order_book",
348    }
349}
350
351fn timestamp_from_datetime(ts: DateTime<Utc>) -> prost_types::Timestamp {
352    prost_types::Timestamp {
353        seconds: ts.timestamp(),
354        nanos: ts.timestamp_subsec_nanos() as i32,
355    }
356}