aggligator_monitor/
monitor.rs

1//! Interactive connection and link monitor.
2
3use crossterm::{
4    cursor,
5    cursor::{MoveTo, MoveToColumn, MoveToNextLine},
6    event::{poll, read, Event, KeyCode, KeyEvent},
7    execute, queue,
8    style::{Print, Stylize},
9    terminal,
10    terminal::{disable_raw_mode, enable_raw_mode, ClearType},
11};
12use futures::{future, stream::FuturesUnordered, FutureExt, StreamExt};
13use std::{
14    collections::{HashMap, HashSet},
15    fmt::{Display, Write},
16    hash::Hash,
17    io::{stdout, Error},
18    time::Duration,
19};
20use tokio::sync::{broadcast, broadcast::error::TryRecvError, watch};
21
22use aggligator::{
23    control::Control,
24    exec,
25    id::ConnId,
26    transport::{ConnectingTransport, LinkError, LinkTagBox},
27};
28
29/// Watches the available tags of the specified transports.
30///
31/// The output of this function can be passed as `tags_rx` to [`interactive_monitor`].
32pub fn watch_tags(
33    transports: impl IntoIterator<Item = Box<dyn ConnectingTransport>>,
34) -> watch::Receiver<HashSet<LinkTagBox>> {
35    let (tags_tx, tags_rx) = watch::channel(HashSet::new());
36
37    // Start tag getting task for each transport.
38    let mut transport_tasks = FuturesUnordered::new();
39    let mut transport_tags: Vec<watch::Receiver<HashSet<LinkTagBox>>> = Vec::new();
40    for transport in transports {
41        let (tx, rx) = watch::channel(HashSet::new());
42        transport_tags.push(rx);
43        transport_tasks.push(async move { transport.link_tags(tx).await });
44    }
45
46    exec::spawn(async move {
47        loop {
48            // Remove channels from terminated transports.
49            transport_tags.retain(|tt| tt.has_changed().is_ok());
50
51            // Collect and publish tags from all transports.
52            let mut all_tags = HashSet::new();
53            for tt in &mut transport_tags {
54                let tags = tt.borrow_and_update();
55                for tag in &*tags {
56                    all_tags.insert(tag.clone());
57                }
58            }
59            tags_tx.send_if_modified(|tags| {
60                if *tags == all_tags {
61                    false
62                } else {
63                    *tags = all_tags;
64                    true
65                }
66            });
67
68            // Quit when no transports are left.
69            if transport_tags.is_empty() {
70                break;
71            }
72
73            // Monitor all transport tags for changes.
74            let tags_changed = future::select_all(transport_tags.iter_mut().map(|tt| tt.changed().boxed()));
75
76            // Wait for changes.
77            tokio::select! {
78                _ = tags_changed => (),
79                Some(_) = transport_tasks.next() => (),
80                () = tags_tx.closed() => break,
81            };
82        }
83    });
84
85    tags_rx
86}
87
88/// Runs the interactive connection and link monitor.
89///
90/// The channel `header_rx` is used to receive and update the header line to display on top of the screen.
91///
92/// The channel `control_rx` is used to receive newly established connections
93/// that should be displayed. Terminated connections are removed automatically.
94///
95/// `time_stats_idx` specifies the index of the time interval
96/// in [`Cfg::stats_intervals`](aggligator::cfg::Cfg::stats_intervals)
97/// to use for displaying the link statistics.
98///
99/// The optional channel `tags_rx` is used to receive available link tags that should
100/// be displayed even if no link is using them.
101///
102/// The optional channel `tag_error_rx` is used to receive error messages from failed
103/// connection attempts that should be displayed.
104///
105/// The optional channel `disabled_tags_tx` is used to send the set of link tags
106/// disabled interactively by the user. If not present, the user cannot disable link tags.
107///
108/// This function returns when the channel `control_rx` is closed or the user presses `q`.
109pub fn interactive_monitor<TX, RX, TAG>(
110    mut header_rx: watch::Receiver<String>, mut control_rx: broadcast::Receiver<(Control<TX, RX, TAG>, String)>,
111    time_stats_idx: usize, mut tags_rx: Option<watch::Receiver<HashSet<TAG>>>,
112    mut tag_error_rx: Option<broadcast::Receiver<LinkError<TAG>>>,
113    disabled_tags_tx: Option<watch::Sender<HashSet<TAG>>>,
114) -> Result<(), Error>
115where
116    TAG: Display + Hash + PartialEq + Eq + Clone + 'static,
117{
118    const STATS_COL: u16 = 35;
119
120    let mut controls: Vec<(Control<TX, RX, TAG>, String)> = Vec::new();
121    let mut errors: HashMap<(ConnId, TAG), String> = HashMap::new();
122    let mut disabled: HashSet<TAG> = HashSet::new();
123    let mut toggle_link_block: Option<usize> = None;
124    let mut interval = Duration::from_secs(3);
125
126    enable_raw_mode()?;
127
128    'main: loop {
129        // Update data.
130        controls.retain(|c| !c.0.is_terminated());
131        loop {
132            match control_rx.try_recv() {
133                Ok(control_info) => {
134                    if controls.iter().all(|c| c.0.id() != control_info.0.id()) {
135                        interval = control_info.0.cfg().stats_intervals[time_stats_idx];
136                        controls.push(control_info);
137                    }
138                }
139                Err(TryRecvError::Empty) => break,
140                Err(TryRecvError::Closed) if controls.is_empty() => break 'main,
141                Err(TryRecvError::Closed) => break,
142                Err(TryRecvError::Lagged(_)) => tracing::warn!("monitor lost incoming connection"),
143            }
144        }
145        if let Some(tag_error_rx) = tag_error_rx.as_mut() {
146            while let Ok(LinkError { id, tag, error }) = tag_error_rx.try_recv() {
147                if let Some(id) = id {
148                    errors.insert((id, tag), error.to_string());
149                }
150            }
151        }
152        if let Some(disabled_tags) = disabled_tags_tx.as_ref() {
153            disabled_tags.send_replace(disabled.clone());
154        }
155        let mut tags: Option<Vec<_>> =
156            tags_rx.as_mut().map(|rx| rx.borrow_and_update().clone().into_iter().collect());
157        if let Some(tags) = &mut tags {
158            tags.sort_by_key(|tag| tag.to_string());
159        }
160
161        // Clear display.
162        execute!(stdout(), terminal::Clear(ClearType::All), cursor::MoveTo(0, 0)).unwrap();
163        let (_cols, rows) = terminal::size().unwrap();
164
165        // Header.
166        {
167            let header = header_rx.borrow_and_update();
168            queue!(stdout(), Print(&*header), MoveToNextLine(1)).unwrap();
169        }
170        queue!(stdout(), Print("━".repeat(80).grey()), MoveToNextLine(1)).unwrap();
171        queue!(
172            stdout(),
173            MoveToColumn(STATS_COL),
174            Print("  TX speed    RX speed      TXed      RXed"),
175            MoveToNextLine(1)
176        )
177        .unwrap();
178        queue!(stdout(), Print("━".repeat(80).grey()), MoveToNextLine(2)).unwrap();
179
180        // Connections.
181        for (control, info) in &controls {
182            // Display:
183            // conn_id - age - total speeds - total data
184            //   tag num - tag name - enabled/disabled - connected or error
185            //   current speeds - ping - txed unacked/limit - total data
186
187            let conn_id = control.id();
188
189            // Sort links by tags.
190            let links = control.links();
191            let tag_links: Vec<_> = match &tags {
192                Some(tags) => {
193                    let mut tag_links: Vec<_> =
194                        tags.iter().map(|tag| (tag, links.iter().find(|link| link.tag() == tag))).collect();
195                    for link in &links {
196                        if !tag_links.iter().any(|(tag, _)| *tag == link.tag()) {
197                            tag_links.push((link.tag(), Some(link)));
198                        }
199                    }
200                    tag_links
201                }
202                None => links.iter().map(|link| (link.tag(), Some(link))).collect(),
203            };
204
205            // Calculate connection totals and disconnect disabled links.
206            let mut conn_sent = 0;
207            let mut conn_recved = 0;
208            let mut conn_tx_speed = 0.;
209            let mut conn_rx_speed = 0.;
210            for link in &links {
211                let stats = link.stats();
212                conn_sent += stats.total_sent;
213                conn_recved += stats.total_recved;
214                if let Some(ts) = stats.time_stats.get(time_stats_idx) {
215                    conn_tx_speed += ts.send_speed();
216                    conn_rx_speed += ts.recv_speed();
217                }
218
219                if disabled.contains(link.tag()) {
220                    link.start_disconnect();
221                }
222            }
223
224            // Connection lines.
225            let stats = control.stats();
226            let mut short_id = conn_id.to_string();
227            short_id.truncate(8);
228            queue!(
229                stdout(),
230                Print("Connection ".cyan()),
231                Print(short_id.bold().magenta()),
232                Print("  "),
233                Print(format_duration(stats.established.map(|e| e.elapsed()).unwrap_or_default())),
234                MoveToColumn(STATS_COL),
235                Print(format_speed(conn_tx_speed)),
236                Print(" "),
237                Print(format_speed(conn_rx_speed)),
238                Print("   "),
239                Print(format_bytes(conn_sent)),
240                Print(" "),
241                Print(format_bytes(conn_recved)),
242                MoveToNextLine(1),
243            )
244            .unwrap();
245            queue!(
246                stdout(),
247                Print("TX:".cyan()),
248                Print("  avail ".cyan()),
249                Print(format_bytes(stats.send_space as _)),
250                Print("   unack ".cyan()),
251                Print(format_bytes(stats.sent_unacked as _)),
252                Print("   uncsmable ".cyan()),
253                Print(format_bytes(stats.sent_unconsumable as _)),
254                Print("   uncsmed ".cyan()),
255                Print(format_bytes(stats.sent_unconsumed as _)),
256                MoveToNextLine(1),
257                Print("RX:".cyan()),
258                MoveToColumn(62),
259                Print(" uncsmed ".cyan()),
260                Print(format_bytes(stats.recved_unconsumed as _)),
261                MoveToNextLine(1),
262            )
263            .unwrap();
264            if !info.is_empty() {
265                queue!(stdout(), Print(info), MoveToNextLine(1)).unwrap();
266            }
267            queue!(stdout(), MoveToNextLine(1)).unwrap();
268
269            // Link lines for connection.
270            for (n, (tag, link)) in tag_links.iter().enumerate() {
271                queue!(
272                    stdout(),
273                    Print("  "),
274                    Print(format!("{}{}", format!("{n:1}").blue(), ". ".cyan())),
275                    Print(format!("{:<66}", tag.to_string()).cyan()),
276                    Print(
277                        format!(
278                            " {:>8}",
279                            link.map(|l| String::from_utf8_lossy(l.remote_user_data()).to_string())
280                                .unwrap_or_default()
281                                .chars()
282                                .take(8)
283                                .collect::<String>()
284                        )
285                        .cyan()
286                    ),
287                    MoveToNextLine(1),
288                    Print("     "),
289                )
290                .unwrap();
291
292                if disabled.contains(tag) {
293                    queue!(stdout(), Print("disabled".red())).unwrap();
294                } else if let Some(link) = link {
295                    let stats = link.stats();
296                    match (link.not_working_reason(), link.not_working_since()) {
297                        (Some(reason), Some(since)) => {
298                            queue!(
299                                stdout(),
300                                Print("unconfirmed ".dark_yellow()),
301                                Print(format_duration(since.elapsed())),
302                                Print(": ".grey()),
303                                Print(reason.to_string().blue())
304                            )
305                            .unwrap();
306                        }
307                        _ => queue!(
308                            stdout(),
309                            Print("connected ".green()),
310                            Print(format_duration(stats.established.elapsed())),
311                        )
312                        .unwrap(),
313                    }
314
315                    if toggle_link_block == Some(n) {
316                        link.set_blocked(!link.is_blocked());
317                    }
318
319                    if link.is_blocked() {
320                        queue!(stdout(), Print(" blocked".red())).unwrap();
321                    } else if link.is_remotely_blocked() {
322                        queue!(stdout(), Print(" remotely blocked".red())).unwrap();
323                    }
324
325                    let hangs = link.stats().hangs;
326                    if hangs > 0 {
327                        queue!(stdout(), Print(format!(" ({hangs})").grey())).unwrap();
328                    }
329                } else if let Some(err) = errors.get(&(conn_id, (*tag).clone())) {
330                    queue!(stdout(), Print(format!("{err:40}").red())).unwrap();
331                }
332                queue!(stdout(), MoveToNextLine(1)).unwrap();
333
334                if let Some(link) = link {
335                    let stats = link.stats();
336
337                    let mut tx_speed = 0.;
338                    let mut rx_speed = 0.;
339                    if let Some(ts) = stats.time_stats.get(time_stats_idx) {
340                        tx_speed = ts.send_speed();
341                        rx_speed = ts.recv_speed();
342                    }
343
344                    queue!(
345                        stdout(),
346                        Print("    "),
347                        Print(format!("{} {}", format!("{:4}", stats.roundtrip.as_millis()).blue(), "ms".grey())),
348                        Print(" "),
349                        Print(format_bytes(stats.sent_unacked)),
350                        Print(" /".cyan()),
351                        Print(format_bytes(stats.unacked_limit)),
352                        MoveToColumn(STATS_COL),
353                        Print(format_speed(tx_speed)),
354                        Print(" "),
355                        Print(format_speed(rx_speed)),
356                        Print("   "),
357                        Print(format_bytes(stats.total_sent)),
358                        Print(" "),
359                        Print(format_bytes(stats.total_recved)),
360                        MoveToNextLine(2),
361                    )
362                    .unwrap();
363                } else {
364                    queue!(stdout(), MoveToNextLine(1)).unwrap();
365                }
366            }
367
368            // Seperation line.
369            queue!(stdout(), MoveToNextLine(1), Print("━".repeat(80).grey()), MoveToNextLine(2)).unwrap();
370        }
371
372        // Usage line.
373        execute!(
374            stdout(),
375            MoveTo(0, rows - 2),
376            Print("Press 0-9 to toggle a link, q to quit.".cyan()),
377            MoveToNextLine(1)
378        )
379        .unwrap();
380
381        // Handle user events.
382        toggle_link_block = None;
383        if poll(interval)? {
384            match read()? {
385                Event::Key(KeyEvent { code: KeyCode::Char(c), .. }) if c.is_ascii_digit() => {
386                    let n = c.to_digit(10).unwrap();
387                    if disabled_tags_tx.is_some() {
388                        if let Some(tag) = tags.and_then(|tags| tags.get(n as usize).cloned()) {
389                            if !disabled.remove(&tag) {
390                                disabled.insert(tag);
391                            }
392                        }
393                    } else {
394                        toggle_link_block = Some(n as usize);
395                    }
396                }
397                Event::Key(KeyEvent { code: KeyCode::Char('q'), .. }) => break,
398                _ => (),
399            }
400        }
401    }
402
403    disable_raw_mode()?;
404    Ok(())
405}
406
407const KB: u64 = 1024;
408const MB: u64 = KB * KB;
409const GB: u64 = MB * KB;
410const TB: u64 = GB * KB;
411
412/// Formats a byte count.
413pub fn format_bytes(bytes: u64) -> String {
414    let (factor, unit, n) = if bytes >= TB {
415        (TB, "TB", 1)
416    } else if bytes >= GB {
417        (GB, "GB", 1)
418    } else if bytes >= MB {
419        (MB, "MB", 1)
420    } else if bytes >= KB {
421        (KB, "KB", 1)
422    } else {
423        (1, "B ", 0)
424    };
425
426    format!("{} {}", format!("{:6.n$}", bytes as f32 / factor as f32, n = n).blue(), unit.grey())
427}
428
429/// Formats a speed.
430pub fn format_speed(speed: f64) -> String {
431    let (factor, unit, n) = if speed >= TB as f64 {
432        (TB, "TB/s", 1)
433    } else if speed >= GB as f64 {
434        (GB, "GB/s", 1)
435    } else if speed >= MB as f64 {
436        (MB, "MB/s", 1)
437    } else if speed >= KB as f64 {
438        (KB, "KB/s", 1)
439    } else {
440        (1, "B/s ", 0)
441    };
442
443    format!("{} {}", format!("{:6.n$}", speed / factor as f64, n = n).blue(), unit.grey())
444}
445
446/// Formats a duration.
447pub fn format_duration(dur: Duration) -> String {
448    let mut time = dur.as_secs();
449    let hours = time / 3600;
450    time -= hours * 3600;
451    let minutes = time / 60;
452    time -= minutes * 60;
453    let seconds = time;
454
455    let mut output = String::new();
456
457    if hours > 0 {
458        write!(output, "{}{}", format!("{hours:2}").blue(), "h".grey()).unwrap();
459    } else {
460        write!(output, "   ").unwrap();
461    }
462
463    if hours > 0 || minutes > 0 {
464        write!(output, "{}{}", format!("{minutes:2}").blue(), "m".grey()).unwrap();
465    } else {
466        write!(output, "   ").unwrap();
467    }
468
469    write!(output, "{}{}", format!("{seconds:2}").blue(), "s".grey()).unwrap();
470
471    output
472}