1use crossterm::{
4 cursor,
5 cursor::{MoveTo, MoveToColumn, MoveToNextLine},
6 event::{poll, read, Event, KeyCode, KeyEvent},
7 execute, queue,
8 style::{Print, Stylize},
9 terminal,
10 terminal::{disable_raw_mode, enable_raw_mode, ClearType},
11};
12use futures::{future, stream::FuturesUnordered, FutureExt, StreamExt};
13use std::{
14 collections::{HashMap, HashSet},
15 fmt::{Display, Write},
16 hash::Hash,
17 io::{stdout, Error},
18 time::Duration,
19};
20use tokio::sync::{broadcast, broadcast::error::TryRecvError, watch};
21
22use aggligator::{
23 control::Control,
24 exec,
25 id::ConnId,
26 transport::{ConnectingTransport, LinkError, LinkTagBox},
27};
28
29pub fn watch_tags(
33 transports: impl IntoIterator<Item = Box<dyn ConnectingTransport>>,
34) -> watch::Receiver<HashSet<LinkTagBox>> {
35 let (tags_tx, tags_rx) = watch::channel(HashSet::new());
36
37 let mut transport_tasks = FuturesUnordered::new();
39 let mut transport_tags: Vec<watch::Receiver<HashSet<LinkTagBox>>> = Vec::new();
40 for transport in transports {
41 let (tx, rx) = watch::channel(HashSet::new());
42 transport_tags.push(rx);
43 transport_tasks.push(async move { transport.link_tags(tx).await });
44 }
45
46 exec::spawn(async move {
47 loop {
48 transport_tags.retain(|tt| tt.has_changed().is_ok());
50
51 let mut all_tags = HashSet::new();
53 for tt in &mut transport_tags {
54 let tags = tt.borrow_and_update();
55 for tag in &*tags {
56 all_tags.insert(tag.clone());
57 }
58 }
59 tags_tx.send_if_modified(|tags| {
60 if *tags == all_tags {
61 false
62 } else {
63 *tags = all_tags;
64 true
65 }
66 });
67
68 if transport_tags.is_empty() {
70 break;
71 }
72
73 let tags_changed = future::select_all(transport_tags.iter_mut().map(|tt| tt.changed().boxed()));
75
76 tokio::select! {
78 _ = tags_changed => (),
79 Some(_) = transport_tasks.next() => (),
80 () = tags_tx.closed() => break,
81 };
82 }
83 });
84
85 tags_rx
86}
87
88pub fn interactive_monitor<TX, RX, TAG>(
110 mut header_rx: watch::Receiver<String>, mut control_rx: broadcast::Receiver<(Control<TX, RX, TAG>, String)>,
111 time_stats_idx: usize, mut tags_rx: Option<watch::Receiver<HashSet<TAG>>>,
112 mut tag_error_rx: Option<broadcast::Receiver<LinkError<TAG>>>,
113 disabled_tags_tx: Option<watch::Sender<HashSet<TAG>>>,
114) -> Result<(), Error>
115where
116 TAG: Display + Hash + PartialEq + Eq + Clone + 'static,
117{
118 const STATS_COL: u16 = 35;
119
120 let mut controls: Vec<(Control<TX, RX, TAG>, String)> = Vec::new();
121 let mut errors: HashMap<(ConnId, TAG), String> = HashMap::new();
122 let mut disabled: HashSet<TAG> = HashSet::new();
123 let mut toggle_link_block: Option<usize> = None;
124 let mut interval = Duration::from_secs(3);
125
126 enable_raw_mode()?;
127
128 'main: loop {
129 controls.retain(|c| !c.0.is_terminated());
131 loop {
132 match control_rx.try_recv() {
133 Ok(control_info) => {
134 if controls.iter().all(|c| c.0.id() != control_info.0.id()) {
135 interval = control_info.0.cfg().stats_intervals[time_stats_idx];
136 controls.push(control_info);
137 }
138 }
139 Err(TryRecvError::Empty) => break,
140 Err(TryRecvError::Closed) if controls.is_empty() => break 'main,
141 Err(TryRecvError::Closed) => break,
142 Err(TryRecvError::Lagged(_)) => tracing::warn!("monitor lost incoming connection"),
143 }
144 }
145 if let Some(tag_error_rx) = tag_error_rx.as_mut() {
146 while let Ok(LinkError { id, tag, error }) = tag_error_rx.try_recv() {
147 if let Some(id) = id {
148 errors.insert((id, tag), error.to_string());
149 }
150 }
151 }
152 if let Some(disabled_tags) = disabled_tags_tx.as_ref() {
153 disabled_tags.send_replace(disabled.clone());
154 }
155 let mut tags: Option<Vec<_>> =
156 tags_rx.as_mut().map(|rx| rx.borrow_and_update().clone().into_iter().collect());
157 if let Some(tags) = &mut tags {
158 tags.sort_by_key(|tag| tag.to_string());
159 }
160
161 execute!(stdout(), terminal::Clear(ClearType::All), cursor::MoveTo(0, 0)).unwrap();
163 let (_cols, rows) = terminal::size().unwrap();
164
165 {
167 let header = header_rx.borrow_and_update();
168 queue!(stdout(), Print(&*header), MoveToNextLine(1)).unwrap();
169 }
170 queue!(stdout(), Print("━".repeat(80).grey()), MoveToNextLine(1)).unwrap();
171 queue!(
172 stdout(),
173 MoveToColumn(STATS_COL),
174 Print(" TX speed RX speed TXed RXed"),
175 MoveToNextLine(1)
176 )
177 .unwrap();
178 queue!(stdout(), Print("━".repeat(80).grey()), MoveToNextLine(2)).unwrap();
179
180 for (control, info) in &controls {
182 let conn_id = control.id();
188
189 let links = control.links();
191 let tag_links: Vec<_> = match &tags {
192 Some(tags) => {
193 let mut tag_links: Vec<_> =
194 tags.iter().map(|tag| (tag, links.iter().find(|link| link.tag() == tag))).collect();
195 for link in &links {
196 if !tag_links.iter().any(|(tag, _)| *tag == link.tag()) {
197 tag_links.push((link.tag(), Some(link)));
198 }
199 }
200 tag_links
201 }
202 None => links.iter().map(|link| (link.tag(), Some(link))).collect(),
203 };
204
205 let mut conn_sent = 0;
207 let mut conn_recved = 0;
208 let mut conn_tx_speed = 0.;
209 let mut conn_rx_speed = 0.;
210 for link in &links {
211 let stats = link.stats();
212 conn_sent += stats.total_sent;
213 conn_recved += stats.total_recved;
214 if let Some(ts) = stats.time_stats.get(time_stats_idx) {
215 conn_tx_speed += ts.send_speed();
216 conn_rx_speed += ts.recv_speed();
217 }
218
219 if disabled.contains(link.tag()) {
220 link.start_disconnect();
221 }
222 }
223
224 let stats = control.stats();
226 let mut short_id = conn_id.to_string();
227 short_id.truncate(8);
228 queue!(
229 stdout(),
230 Print("Connection ".cyan()),
231 Print(short_id.bold().magenta()),
232 Print(" "),
233 Print(format_duration(stats.established.map(|e| e.elapsed()).unwrap_or_default())),
234 MoveToColumn(STATS_COL),
235 Print(format_speed(conn_tx_speed)),
236 Print(" "),
237 Print(format_speed(conn_rx_speed)),
238 Print(" "),
239 Print(format_bytes(conn_sent)),
240 Print(" "),
241 Print(format_bytes(conn_recved)),
242 MoveToNextLine(1),
243 )
244 .unwrap();
245 queue!(
246 stdout(),
247 Print("TX:".cyan()),
248 Print(" avail ".cyan()),
249 Print(format_bytes(stats.send_space as _)),
250 Print(" unack ".cyan()),
251 Print(format_bytes(stats.sent_unacked as _)),
252 Print(" uncsmable ".cyan()),
253 Print(format_bytes(stats.sent_unconsumable as _)),
254 Print(" uncsmed ".cyan()),
255 Print(format_bytes(stats.sent_unconsumed as _)),
256 MoveToNextLine(1),
257 Print("RX:".cyan()),
258 MoveToColumn(62),
259 Print(" uncsmed ".cyan()),
260 Print(format_bytes(stats.recved_unconsumed as _)),
261 MoveToNextLine(1),
262 )
263 .unwrap();
264 if !info.is_empty() {
265 queue!(stdout(), Print(info), MoveToNextLine(1)).unwrap();
266 }
267 queue!(stdout(), MoveToNextLine(1)).unwrap();
268
269 for (n, (tag, link)) in tag_links.iter().enumerate() {
271 queue!(
272 stdout(),
273 Print(" "),
274 Print(format!("{}{}", format!("{n:1}").blue(), ". ".cyan())),
275 Print(format!("{:<66}", tag.to_string()).cyan()),
276 Print(
277 format!(
278 " {:>8}",
279 link.map(|l| String::from_utf8_lossy(l.remote_user_data()).to_string())
280 .unwrap_or_default()
281 .chars()
282 .take(8)
283 .collect::<String>()
284 )
285 .cyan()
286 ),
287 MoveToNextLine(1),
288 Print(" "),
289 )
290 .unwrap();
291
292 if disabled.contains(tag) {
293 queue!(stdout(), Print("disabled".red())).unwrap();
294 } else if let Some(link) = link {
295 let stats = link.stats();
296 match (link.not_working_reason(), link.not_working_since()) {
297 (Some(reason), Some(since)) => {
298 queue!(
299 stdout(),
300 Print("unconfirmed ".dark_yellow()),
301 Print(format_duration(since.elapsed())),
302 Print(": ".grey()),
303 Print(reason.to_string().blue())
304 )
305 .unwrap();
306 }
307 _ => queue!(
308 stdout(),
309 Print("connected ".green()),
310 Print(format_duration(stats.established.elapsed())),
311 )
312 .unwrap(),
313 }
314
315 if toggle_link_block == Some(n) {
316 link.set_blocked(!link.is_blocked());
317 }
318
319 if link.is_blocked() {
320 queue!(stdout(), Print(" blocked".red())).unwrap();
321 } else if link.is_remotely_blocked() {
322 queue!(stdout(), Print(" remotely blocked".red())).unwrap();
323 }
324
325 let hangs = link.stats().hangs;
326 if hangs > 0 {
327 queue!(stdout(), Print(format!(" ({hangs})").grey())).unwrap();
328 }
329 } else if let Some(err) = errors.get(&(conn_id, (*tag).clone())) {
330 queue!(stdout(), Print(format!("{err:40}").red())).unwrap();
331 }
332 queue!(stdout(), MoveToNextLine(1)).unwrap();
333
334 if let Some(link) = link {
335 let stats = link.stats();
336
337 let mut tx_speed = 0.;
338 let mut rx_speed = 0.;
339 if let Some(ts) = stats.time_stats.get(time_stats_idx) {
340 tx_speed = ts.send_speed();
341 rx_speed = ts.recv_speed();
342 }
343
344 queue!(
345 stdout(),
346 Print(" "),
347 Print(format!("{} {}", format!("{:4}", stats.roundtrip.as_millis()).blue(), "ms".grey())),
348 Print(" "),
349 Print(format_bytes(stats.sent_unacked)),
350 Print(" /".cyan()),
351 Print(format_bytes(stats.unacked_limit)),
352 MoveToColumn(STATS_COL),
353 Print(format_speed(tx_speed)),
354 Print(" "),
355 Print(format_speed(rx_speed)),
356 Print(" "),
357 Print(format_bytes(stats.total_sent)),
358 Print(" "),
359 Print(format_bytes(stats.total_recved)),
360 MoveToNextLine(2),
361 )
362 .unwrap();
363 } else {
364 queue!(stdout(), MoveToNextLine(1)).unwrap();
365 }
366 }
367
368 queue!(stdout(), MoveToNextLine(1), Print("━".repeat(80).grey()), MoveToNextLine(2)).unwrap();
370 }
371
372 execute!(
374 stdout(),
375 MoveTo(0, rows - 2),
376 Print("Press 0-9 to toggle a link, q to quit.".cyan()),
377 MoveToNextLine(1)
378 )
379 .unwrap();
380
381 toggle_link_block = None;
383 if poll(interval)? {
384 match read()? {
385 Event::Key(KeyEvent { code: KeyCode::Char(c), .. }) if c.is_ascii_digit() => {
386 let n = c.to_digit(10).unwrap();
387 if disabled_tags_tx.is_some() {
388 if let Some(tag) = tags.and_then(|tags| tags.get(n as usize).cloned()) {
389 if !disabled.remove(&tag) {
390 disabled.insert(tag);
391 }
392 }
393 } else {
394 toggle_link_block = Some(n as usize);
395 }
396 }
397 Event::Key(KeyEvent { code: KeyCode::Char('q'), .. }) => break,
398 _ => (),
399 }
400 }
401 }
402
403 disable_raw_mode()?;
404 Ok(())
405}
406
407const KB: u64 = 1024;
408const MB: u64 = KB * KB;
409const GB: u64 = MB * KB;
410const TB: u64 = GB * KB;
411
412pub fn format_bytes(bytes: u64) -> String {
414 let (factor, unit, n) = if bytes >= TB {
415 (TB, "TB", 1)
416 } else if bytes >= GB {
417 (GB, "GB", 1)
418 } else if bytes >= MB {
419 (MB, "MB", 1)
420 } else if bytes >= KB {
421 (KB, "KB", 1)
422 } else {
423 (1, "B ", 0)
424 };
425
426 format!("{} {}", format!("{:6.n$}", bytes as f32 / factor as f32, n = n).blue(), unit.grey())
427}
428
429pub fn format_speed(speed: f64) -> String {
431 let (factor, unit, n) = if speed >= TB as f64 {
432 (TB, "TB/s", 1)
433 } else if speed >= GB as f64 {
434 (GB, "GB/s", 1)
435 } else if speed >= MB as f64 {
436 (MB, "MB/s", 1)
437 } else if speed >= KB as f64 {
438 (KB, "KB/s", 1)
439 } else {
440 (1, "B/s ", 0)
441 };
442
443 format!("{} {}", format!("{:6.n$}", speed / factor as f64, n = n).blue(), unit.grey())
444}
445
446pub fn format_duration(dur: Duration) -> String {
448 let mut time = dur.as_secs();
449 let hours = time / 3600;
450 time -= hours * 3600;
451 let minutes = time / 60;
452 time -= minutes * 60;
453 let seconds = time;
454
455 let mut output = String::new();
456
457 if hours > 0 {
458 write!(output, "{}{}", format!("{hours:2}").blue(), "h".grey()).unwrap();
459 } else {
460 write!(output, " ").unwrap();
461 }
462
463 if hours > 0 || minutes > 0 {
464 write!(output, "{}{}", format!("{minutes:2}").blue(), "m".grey()).unwrap();
465 } else {
466 write!(output, " ").unwrap();
467 }
468
469 write!(output, "{}{}", format!("{seconds:2}").blue(), "s".grey()).unwrap();
470
471 output
472}