1use std::collections::VecDeque;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use rust_decimal::Decimal;
6use tesser_rpc::conversions::from_decimal_proto;
7use tesser_rpc::proto::{
8 self, CancelAllResponse, Event, GetStatusResponse, OrderSnapshot, PortfolioSnapshot,
9};
10
11const LOG_CAPACITY: usize = 200;
12
13#[derive(Clone)]
14pub struct MonitorConfig {
15 pub control_addr: String,
16 pub tick_rate: Duration,
17}
18
19impl MonitorConfig {
20 pub fn new(control_addr: String, tick_rate: Duration) -> Self {
21 Self {
22 control_addr,
23 tick_rate,
24 }
25 }
26}
27
28pub struct MonitorApp {
29 config: MonitorConfig,
30 status: Option<GetStatusResponse>,
31 portfolio: Option<PortfolioSnapshot>,
32 orders: Vec<OrderSnapshot>,
33 logs: VecDeque<LogEntry>,
34 last_error: Option<String>,
35 last_snapshot_at: Option<DateTime<Utc>>,
36 last_event_at: Option<DateTime<Utc>>,
37 stream_connected: bool,
38 cancel_in_progress: bool,
39 should_quit: bool,
40 overlay: CommandOverlay,
41 overlay_error: Option<String>,
42}
43
44impl MonitorApp {
45 pub fn new(config: MonitorConfig) -> Self {
46 Self {
47 config,
48 status: None,
49 portfolio: None,
50 orders: Vec::new(),
51 logs: VecDeque::with_capacity(LOG_CAPACITY),
52 last_error: None,
53 last_snapshot_at: None,
54 last_event_at: None,
55 stream_connected: false,
56 cancel_in_progress: false,
57 should_quit: false,
58 overlay: CommandOverlay::Hidden,
59 overlay_error: None,
60 }
61 }
62
63 pub fn control_addr(&self) -> &str {
64 &self.config.control_addr
65 }
66
67 pub fn tick_rate(&self) -> Duration {
68 self.config.tick_rate
69 }
70
71 pub fn should_quit(&self) -> bool {
72 self.should_quit
73 }
74
75 pub fn request_quit(&mut self) {
76 self.should_quit = true;
77 }
78
79 pub fn on_status(&mut self, status: GetStatusResponse) {
80 self.status = Some(status);
81 self.last_snapshot_at = Some(Utc::now());
82 self.clear_error();
83 }
84
85 pub fn on_portfolio(&mut self, snapshot: PortfolioSnapshot) {
86 self.portfolio = Some(snapshot);
87 self.last_snapshot_at = Some(Utc::now());
88 self.clear_error();
89 }
90
91 pub fn on_orders(&mut self, orders: Vec<OrderSnapshot>) {
92 self.orders = orders;
93 self.last_snapshot_at = Some(Utc::now());
94 self.clear_error();
95 }
96
97 pub fn on_stream_event(&mut self, event: Event) {
98 self.last_event_at = Some(Utc::now());
99 if let Some(entry) = LogEntry::from_event(event) {
100 self.push_log(entry);
101 }
102 }
103
104 pub fn log(&self) -> impl DoubleEndedIterator<Item = &LogEntry> {
105 self.logs.iter()
106 }
107
108 pub fn orders(&self) -> &[OrderSnapshot] {
109 &self.orders
110 }
111
112 pub fn status(&self) -> Option<&GetStatusResponse> {
113 self.status.as_ref()
114 }
115
116 pub fn portfolio(&self) -> Option<&PortfolioSnapshot> {
117 self.portfolio.as_ref()
118 }
119
120 pub fn sub_accounts(&self) -> Option<&[proto::SubAccountSnapshot]> {
121 self.portfolio.as_ref().map(|p| p.sub_accounts.as_slice())
122 }
123
124 pub fn balances(&self) -> Option<&[proto::CashBalance]> {
125 self.portfolio.as_ref().map(|p| p.balances.as_slice())
126 }
127
128 pub fn positions(&self) -> Option<&[proto::Position]> {
129 self.portfolio.as_ref().map(|p| p.positions.as_slice())
130 }
131
132 pub fn equity(&self) -> Option<Decimal> {
133 self.portfolio
134 .as_ref()
135 .and_then(|p| decimal_from_option(p.equity.as_ref()))
136 }
137
138 pub fn realized_pnl(&self) -> Option<Decimal> {
139 self.portfolio
140 .as_ref()
141 .and_then(|p| decimal_from_option(p.realized_pnl.as_ref()))
142 }
143
144 pub fn initial_equity(&self) -> Option<Decimal> {
145 self.portfolio
146 .as_ref()
147 .and_then(|p| decimal_from_option(p.initial_equity.as_ref()))
148 }
149
150 pub fn reporting_currency(&self) -> Option<&str> {
151 self.portfolio
152 .as_ref()
153 .map(|p| p.reporting_currency.as_str())
154 }
155
156 pub fn last_error(&self) -> Option<&str> {
157 self.last_error.as_deref()
158 }
159
160 pub fn set_error(&mut self, msg: impl Into<String>) {
161 let message = msg.into();
162 self.last_error = Some(message.clone());
163 self.push_log(LogEntry {
164 timestamp: Utc::now(),
165 category: LogCategory::Error,
166 message,
167 });
168 }
169
170 pub fn clear_error(&mut self) {
171 self.last_error = None;
172 }
173
174 pub fn last_snapshot_at(&self) -> Option<DateTime<Utc>> {
175 self.last_snapshot_at
176 }
177
178 pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
179 self.last_event_at
180 }
181
182 pub fn set_stream_connected(&mut self, connected: bool) {
183 self.stream_connected = connected;
184 }
185
186 pub fn stream_connected(&self) -> bool {
187 self.stream_connected
188 }
189
190 pub fn set_cancel_in_progress(&mut self, active: bool) {
191 self.cancel_in_progress = active;
192 }
193
194 pub fn cancel_in_progress(&self) -> bool {
195 self.cancel_in_progress
196 }
197
198 pub fn overlay(&self) -> &CommandOverlay {
199 &self.overlay
200 }
201
202 pub fn overlay_error(&self) -> Option<&str> {
203 self.overlay_error.as_deref()
204 }
205
206 pub fn overlay_visible(&self) -> bool {
207 !matches!(self.overlay, CommandOverlay::Hidden)
208 }
209
210 pub fn open_command_palette(&mut self) {
211 self.overlay = CommandOverlay::Palette;
212 self.overlay_error = None;
213 }
214
215 pub fn close_overlay(&mut self) {
216 self.overlay = CommandOverlay::Hidden;
217 self.overlay_error = None;
218 }
219
220 pub fn begin_cancel_confirmation(&mut self) {
221 self.overlay = CommandOverlay::Confirm {
222 buffer: String::new(),
223 };
224 self.overlay_error = None;
225 }
226
227 pub fn append_confirmation_char(&mut self, ch: char) {
228 if let CommandOverlay::Confirm { buffer } = &mut self.overlay {
229 buffer.push(ch);
230 }
231 }
232
233 pub fn backspace_confirmation(&mut self) {
234 if let CommandOverlay::Confirm { buffer } = &mut self.overlay {
235 buffer.pop();
236 }
237 }
238
239 pub fn confirmation_buffer(&self) -> Option<&str> {
240 match &self.overlay {
241 CommandOverlay::Confirm { buffer } => Some(buffer.as_str()),
242 _ => None,
243 }
244 }
245
246 pub fn confirmation_matches(&self) -> bool {
247 self.confirmation_buffer()
248 .map(|buf| buf.trim().eq_ignore_ascii_case("cancel all"))
249 .unwrap_or(false)
250 }
251
252 pub fn set_overlay_error(&mut self, msg: impl Into<String>) {
253 self.overlay_error = Some(msg.into());
254 }
255
256 pub fn toggle_command_palette(&mut self) {
257 if matches!(self.overlay, CommandOverlay::Palette) {
258 self.close_overlay();
259 } else {
260 self.open_command_palette();
261 }
262 }
263
264 pub fn record_info(&mut self, msg: impl Into<String>) {
265 self.push_log(LogEntry::info(msg));
266 }
267
268 pub fn record_cancel_result(&mut self, response: CancelAllResponse) {
269 self.cancel_in_progress = false;
270 let message = format!(
271 "CancelAll completed: {} orders, {} algos",
272 response.cancelled_orders, response.cancelled_algorithms
273 );
274 self.push_log(LogEntry {
275 timestamp: Utc::now(),
276 category: LogCategory::Info,
277 message,
278 });
279 }
280
281 fn push_log(&mut self, entry: LogEntry) {
282 if self.logs.len() == LOG_CAPACITY {
283 self.logs.pop_front();
284 }
285 self.logs.push_back(entry);
286 }
287}
288
289#[derive(Clone, Copy, Debug)]
290pub enum LogCategory {
291 Signal,
292 Fill,
293 Order,
294 Info,
295 Error,
296}
297
298#[derive(Clone, Debug)]
299pub struct LogEntry {
300 pub timestamp: DateTime<Utc>,
301 pub category: LogCategory,
302 pub message: String,
303}
304
305impl LogEntry {
306 pub fn info(msg: impl Into<String>) -> Self {
307 Self {
308 timestamp: Utc::now(),
309 category: LogCategory::Info,
310 message: msg.into(),
311 }
312 }
313
314 pub fn timestamp_short(&self) -> String {
315 self.timestamp.format("%H:%M:%S").to_string()
316 }
317
318 pub fn from_event(event: Event) -> Option<Self> {
319 use tesser_rpc::proto::event::Payload;
320
321 let payload = event.payload?;
322 let timestamp = Utc::now();
323
324 match payload {
325 Payload::Signal(signal) => Some(Self {
326 timestamp,
327 category: LogCategory::Signal,
328 message: format!(
329 "Signal {} {} (conf {:.2})",
330 signal.symbol,
331 signal_kind(signal.kind),
332 signal.confidence
333 ),
334 }),
335 Payload::Fill(fill) => Some(Self {
336 timestamp,
337 category: LogCategory::Fill,
338 message: format!(
339 "Fill {} {} {} @ {} (fee {})",
340 fill.symbol,
341 side_label(fill.side),
342 decimal_to_string(fill.fill_quantity.as_ref()),
343 decimal_to_string(fill.fill_price.as_ref()),
344 decimal_to_string(fill.fee.as_ref())
345 ),
346 }),
347 Payload::Order(order) => Some(Self {
348 timestamp,
349 category: LogCategory::Order,
350 message: format!(
351 "Order {} {} {} {}/{} @ {}",
352 order.id,
353 order.symbol,
354 order_status(order.status),
355 decimal_to_string(order.filled_quantity.as_ref()),
356 decimal_to_string(order.quantity.as_ref()),
357 decimal_to_string(order.avg_fill_price.as_ref())
358 ),
359 }),
360 _ => None,
361 }
362 }
363}
364
365fn decimal_from_option(proto: Option<&proto::Decimal>) -> Option<Decimal> {
366 proto.map(|inner| from_decimal_proto(inner.clone()))
367}
368
369fn decimal_to_string(proto: Option<&proto::Decimal>) -> String {
370 decimal_from_option(proto)
371 .map(|d| d.normalize().to_string())
372 .unwrap_or_else(|| "-".to_string())
373}
374
375fn signal_kind(kind: i32) -> &'static str {
376 match proto::signal::Kind::try_from(kind).unwrap_or(proto::signal::Kind::Unspecified) {
377 proto::signal::Kind::EnterLong => "ENTER_LONG",
378 proto::signal::Kind::ExitLong => "EXIT_LONG",
379 proto::signal::Kind::EnterShort => "ENTER_SHORT",
380 proto::signal::Kind::ExitShort => "EXIT_SHORT",
381 proto::signal::Kind::Flatten => "FLATTEN",
382 proto::signal::Kind::Unspecified => "UNKNOWN",
383 }
384}
385
386fn side_label(side: i32) -> &'static str {
387 match proto::Side::try_from(side).unwrap_or(proto::Side::Unspecified) {
388 proto::Side::Buy => "BUY",
389 proto::Side::Sell => "SELL",
390 proto::Side::Unspecified => "NA",
391 }
392}
393
394fn order_status(status: i32) -> &'static str {
395 match proto::OrderStatus::try_from(status).unwrap_or(proto::OrderStatus::Unspecified) {
396 proto::OrderStatus::PendingNew => "PENDING",
397 proto::OrderStatus::Accepted => "ACCEPTED",
398 proto::OrderStatus::PartiallyFilled => "PARTIAL",
399 proto::OrderStatus::Filled => "FILLED",
400 proto::OrderStatus::Canceled => "CANCELLED",
401 proto::OrderStatus::Rejected => "REJECTED",
402 proto::OrderStatus::Unspecified => "UNKNOWN",
403 }
404}
405
406#[derive(Clone)]
407pub enum CommandOverlay {
408 Hidden,
409 Palette,
410 Confirm { buffer: String },
411}