1use std::collections::VecDeque;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use rust_decimal::Decimal;
6use tesser_rpc::conversions::from_decimal_proto;
7use tesser_rpc::proto::{
8 self, CancelAllResponse, Event, GetStatusResponse, OrderSnapshot, PortfolioSnapshot,
9};
10
11const LOG_CAPACITY: usize = 200;
12
13#[derive(Clone)]
14pub struct MonitorConfig {
15 pub control_addr: String,
16 pub tick_rate: Duration,
17}
18
19impl MonitorConfig {
20 pub fn new(control_addr: String, tick_rate: Duration) -> Self {
21 Self {
22 control_addr,
23 tick_rate,
24 }
25 }
26}
27
28pub struct MonitorApp {
29 config: MonitorConfig,
30 status: Option<GetStatusResponse>,
31 portfolio: Option<PortfolioSnapshot>,
32 orders: Vec<OrderSnapshot>,
33 logs: VecDeque<LogEntry>,
34 last_error: Option<String>,
35 last_snapshot_at: Option<DateTime<Utc>>,
36 last_event_at: Option<DateTime<Utc>>,
37 stream_connected: bool,
38 cancel_in_progress: bool,
39 should_quit: bool,
40 overlay: CommandOverlay,
41 overlay_error: Option<String>,
42}
43
44impl MonitorApp {
45 pub fn new(config: MonitorConfig) -> Self {
46 Self {
47 config,
48 status: None,
49 portfolio: None,
50 orders: Vec::new(),
51 logs: VecDeque::with_capacity(LOG_CAPACITY),
52 last_error: None,
53 last_snapshot_at: None,
54 last_event_at: None,
55 stream_connected: false,
56 cancel_in_progress: false,
57 should_quit: false,
58 overlay: CommandOverlay::Hidden,
59 overlay_error: None,
60 }
61 }
62
63 pub fn control_addr(&self) -> &str {
64 &self.config.control_addr
65 }
66
67 pub fn tick_rate(&self) -> Duration {
68 self.config.tick_rate
69 }
70
71 pub fn should_quit(&self) -> bool {
72 self.should_quit
73 }
74
75 pub fn request_quit(&mut self) {
76 self.should_quit = true;
77 }
78
79 pub fn on_status(&mut self, status: GetStatusResponse) {
80 self.status = Some(status);
81 self.last_snapshot_at = Some(Utc::now());
82 self.clear_error();
83 }
84
85 pub fn on_portfolio(&mut self, snapshot: PortfolioSnapshot) {
86 self.portfolio = Some(snapshot);
87 self.last_snapshot_at = Some(Utc::now());
88 self.clear_error();
89 }
90
91 pub fn on_orders(&mut self, orders: Vec<OrderSnapshot>) {
92 self.orders = orders;
93 self.last_snapshot_at = Some(Utc::now());
94 self.clear_error();
95 }
96
97 pub fn on_stream_event(&mut self, event: Event) {
98 self.last_event_at = Some(Utc::now());
99 if let Some(entry) = LogEntry::from_event(event) {
100 self.push_log(entry);
101 }
102 }
103
104 pub fn log(&self) -> impl DoubleEndedIterator<Item = &LogEntry> {
105 self.logs.iter()
106 }
107
108 pub fn orders(&self) -> &[OrderSnapshot] {
109 &self.orders
110 }
111
112 pub fn status(&self) -> Option<&GetStatusResponse> {
113 self.status.as_ref()
114 }
115
116 pub fn portfolio(&self) -> Option<&PortfolioSnapshot> {
117 self.portfolio.as_ref()
118 }
119
120 pub fn positions(&self) -> Option<&[proto::Position]> {
121 self.portfolio.as_ref().map(|p| p.positions.as_slice())
122 }
123
124 pub fn equity(&self) -> Option<Decimal> {
125 self.portfolio
126 .as_ref()
127 .and_then(|p| decimal_from_option(p.equity.as_ref()))
128 }
129
130 pub fn realized_pnl(&self) -> Option<Decimal> {
131 self.portfolio
132 .as_ref()
133 .and_then(|p| decimal_from_option(p.realized_pnl.as_ref()))
134 }
135
136 pub fn initial_equity(&self) -> Option<Decimal> {
137 self.portfolio
138 .as_ref()
139 .and_then(|p| decimal_from_option(p.initial_equity.as_ref()))
140 }
141
142 pub fn reporting_currency(&self) -> Option<&str> {
143 self.portfolio
144 .as_ref()
145 .map(|p| p.reporting_currency.as_str())
146 }
147
148 pub fn last_error(&self) -> Option<&str> {
149 self.last_error.as_deref()
150 }
151
152 pub fn set_error(&mut self, msg: impl Into<String>) {
153 let message = msg.into();
154 self.last_error = Some(message.clone());
155 self.push_log(LogEntry {
156 timestamp: Utc::now(),
157 category: LogCategory::Error,
158 message,
159 });
160 }
161
162 pub fn clear_error(&mut self) {
163 self.last_error = None;
164 }
165
166 pub fn last_snapshot_at(&self) -> Option<DateTime<Utc>> {
167 self.last_snapshot_at
168 }
169
170 pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
171 self.last_event_at
172 }
173
174 pub fn set_stream_connected(&mut self, connected: bool) {
175 self.stream_connected = connected;
176 }
177
178 pub fn stream_connected(&self) -> bool {
179 self.stream_connected
180 }
181
182 pub fn set_cancel_in_progress(&mut self, active: bool) {
183 self.cancel_in_progress = active;
184 }
185
186 pub fn cancel_in_progress(&self) -> bool {
187 self.cancel_in_progress
188 }
189
190 pub fn overlay(&self) -> &CommandOverlay {
191 &self.overlay
192 }
193
194 pub fn overlay_error(&self) -> Option<&str> {
195 self.overlay_error.as_deref()
196 }
197
198 pub fn overlay_visible(&self) -> bool {
199 !matches!(self.overlay, CommandOverlay::Hidden)
200 }
201
202 pub fn open_command_palette(&mut self) {
203 self.overlay = CommandOverlay::Palette;
204 self.overlay_error = None;
205 }
206
207 pub fn close_overlay(&mut self) {
208 self.overlay = CommandOverlay::Hidden;
209 self.overlay_error = None;
210 }
211
212 pub fn begin_cancel_confirmation(&mut self) {
213 self.overlay = CommandOverlay::Confirm {
214 buffer: String::new(),
215 };
216 self.overlay_error = None;
217 }
218
219 pub fn append_confirmation_char(&mut self, ch: char) {
220 if let CommandOverlay::Confirm { buffer } = &mut self.overlay {
221 buffer.push(ch);
222 }
223 }
224
225 pub fn backspace_confirmation(&mut self) {
226 if let CommandOverlay::Confirm { buffer } = &mut self.overlay {
227 buffer.pop();
228 }
229 }
230
231 pub fn confirmation_buffer(&self) -> Option<&str> {
232 match &self.overlay {
233 CommandOverlay::Confirm { buffer } => Some(buffer.as_str()),
234 _ => None,
235 }
236 }
237
238 pub fn confirmation_matches(&self) -> bool {
239 self.confirmation_buffer()
240 .map(|buf| buf.trim().eq_ignore_ascii_case("cancel all"))
241 .unwrap_or(false)
242 }
243
244 pub fn set_overlay_error(&mut self, msg: impl Into<String>) {
245 self.overlay_error = Some(msg.into());
246 }
247
248 pub fn toggle_command_palette(&mut self) {
249 if matches!(self.overlay, CommandOverlay::Palette) {
250 self.close_overlay();
251 } else {
252 self.open_command_palette();
253 }
254 }
255
256 pub fn record_info(&mut self, msg: impl Into<String>) {
257 self.push_log(LogEntry::info(msg));
258 }
259
260 pub fn record_cancel_result(&mut self, response: CancelAllResponse) {
261 self.cancel_in_progress = false;
262 let message = format!(
263 "CancelAll completed: {} orders, {} algos",
264 response.cancelled_orders, response.cancelled_algorithms
265 );
266 self.push_log(LogEntry {
267 timestamp: Utc::now(),
268 category: LogCategory::Info,
269 message,
270 });
271 }
272
273 fn push_log(&mut self, entry: LogEntry) {
274 if self.logs.len() == LOG_CAPACITY {
275 self.logs.pop_front();
276 }
277 self.logs.push_back(entry);
278 }
279}
280
281#[derive(Clone, Copy, Debug)]
282pub enum LogCategory {
283 Signal,
284 Fill,
285 Order,
286 Info,
287 Error,
288}
289
290#[derive(Clone, Debug)]
291pub struct LogEntry {
292 pub timestamp: DateTime<Utc>,
293 pub category: LogCategory,
294 pub message: String,
295}
296
297impl LogEntry {
298 pub fn info(msg: impl Into<String>) -> Self {
299 Self {
300 timestamp: Utc::now(),
301 category: LogCategory::Info,
302 message: msg.into(),
303 }
304 }
305
306 pub fn timestamp_short(&self) -> String {
307 self.timestamp.format("%H:%M:%S").to_string()
308 }
309
310 pub fn from_event(event: Event) -> Option<Self> {
311 use tesser_rpc::proto::event::Payload;
312
313 let payload = event.payload?;
314 let timestamp = Utc::now();
315
316 match payload {
317 Payload::Signal(signal) => Some(Self {
318 timestamp,
319 category: LogCategory::Signal,
320 message: format!(
321 "Signal {} {} (conf {:.2})",
322 signal.symbol,
323 signal_kind(signal.kind),
324 signal.confidence
325 ),
326 }),
327 Payload::Fill(fill) => Some(Self {
328 timestamp,
329 category: LogCategory::Fill,
330 message: format!(
331 "Fill {} {} {} @ {} (fee {})",
332 fill.symbol,
333 side_label(fill.side),
334 decimal_to_string(fill.fill_quantity.as_ref()),
335 decimal_to_string(fill.fill_price.as_ref()),
336 decimal_to_string(fill.fee.as_ref())
337 ),
338 }),
339 Payload::Order(order) => Some(Self {
340 timestamp,
341 category: LogCategory::Order,
342 message: format!(
343 "Order {} {} {} {}/{} @ {}",
344 order.id,
345 order.symbol,
346 order_status(order.status),
347 decimal_to_string(order.filled_quantity.as_ref()),
348 decimal_to_string(order.quantity.as_ref()),
349 decimal_to_string(order.avg_fill_price.as_ref())
350 ),
351 }),
352 _ => None,
353 }
354 }
355}
356
357fn decimal_from_option(proto: Option<&proto::Decimal>) -> Option<Decimal> {
358 proto.map(|inner| from_decimal_proto(inner.clone()))
359}
360
361fn decimal_to_string(proto: Option<&proto::Decimal>) -> String {
362 decimal_from_option(proto)
363 .map(|d| d.normalize().to_string())
364 .unwrap_or_else(|| "-".to_string())
365}
366
367fn signal_kind(kind: i32) -> &'static str {
368 match proto::signal::Kind::try_from(kind).unwrap_or(proto::signal::Kind::Unspecified) {
369 proto::signal::Kind::EnterLong => "ENTER_LONG",
370 proto::signal::Kind::ExitLong => "EXIT_LONG",
371 proto::signal::Kind::EnterShort => "ENTER_SHORT",
372 proto::signal::Kind::ExitShort => "EXIT_SHORT",
373 proto::signal::Kind::Flatten => "FLATTEN",
374 proto::signal::Kind::Unspecified => "UNKNOWN",
375 }
376}
377
378fn side_label(side: i32) -> &'static str {
379 match proto::Side::try_from(side).unwrap_or(proto::Side::Unspecified) {
380 proto::Side::Buy => "BUY",
381 proto::Side::Sell => "SELL",
382 proto::Side::Unspecified => "NA",
383 }
384}
385
386fn order_status(status: i32) -> &'static str {
387 match proto::OrderStatus::try_from(status).unwrap_or(proto::OrderStatus::Unspecified) {
388 proto::OrderStatus::PendingNew => "PENDING",
389 proto::OrderStatus::Accepted => "ACCEPTED",
390 proto::OrderStatus::PartiallyFilled => "PARTIAL",
391 proto::OrderStatus::Filled => "FILLED",
392 proto::OrderStatus::Canceled => "CANCELLED",
393 proto::OrderStatus::Rejected => "REJECTED",
394 proto::OrderStatus::Unspecified => "UNKNOWN",
395 }
396}
397
398#[derive(Clone)]
399pub enum CommandOverlay {
400 Hidden,
401 Palette,
402 Confirm { buffer: String },
403}