use std::time::Duration;
use anyhow::Result;
use clap::Parser;
use tokio::{select, time::interval};
use tracing::{info, instrument, warn};
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
use veilnet::{Connection, DHTAddr, connection::Veilid, datagram::socket::Socket};
#[derive(Parser)]
#[command(name = "vcat")]
#[command(about = "A netcat-like utility for Veilid networks")]
#[command(version)]
struct Cli {
#[arg(short, long)]
listen: Option<u16>,
#[arg(short, long)]
verbose: bool,
address: Option<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
initialize_stdout_logging(cli.verbose);
let conn = Veilid::new().await?;
if let Some(port) = cli.listen {
listener(conn, port).await
} else if let Some(address) = cli.address {
dialer(conn, &address).await
} else {
eprintln!("Error: Must specify either -l to listen or provide an address to connect to");
std::process::exit(1);
}
}
#[instrument(skip_all)]
async fn listener<C: Connection + Clone + Send + Sync + 'static>(conn: C, port: u16) -> Result<()> {
let mut sock = Socket::new(conn, None, port).await?;
info!(addr = %sock.addr(), "listening");
loop {
select! {
res = sock.recv_from() => {
let (addr, dgram) = match res {
Ok(val) => val,
Err(err) => {
warn!(?err, "recv_from");
continue;
}
};
let peer_count = u32::from_be_bytes(match dgram.try_into() {
Ok(arr) => arr,
Err(err) => {
warn!(?err, "invalid count dgram from peer");
continue;
}
});
let count = peer_count + 1;
match sock.send_to(&addr, &count.to_be_bytes()).await {
Ok(_) => {
info!("got {peer_count} from {addr} sent {count}");
}
Err(err) => {
warn!(?err, "send_to {addr}");
}
}
}
}
}
}
#[instrument(skip_all)]
async fn dialer<C>(conn: C, addr_str: &str) -> Result<()>
where
C: Connection + Clone + Send + Sync + 'static,
{
let mut sock = Socket::new(conn, None, 0).await?;
let addr: DHTAddr = addr_str.parse()?;
let mut count: u32 = 0;
let mut send_interval = interval(Duration::from_secs(30));
loop {
let count_bytes = count.to_be_bytes();
select! {
res = sock.recv_from() => {
let (addr, dgram) = match res {
Ok(val) => val,
Err(err) => {
warn!(?err, "recv_from");
continue;
}
};
let peer_count = u32::from_be_bytes(match dgram.try_into() {
Ok(arr) => arr,
Err(err) => {
warn!(?err, "invalid count dgram from peer");
continue;
}
});
count = peer_count + 1;
match sock.send_to(&addr, &count.to_be_bytes()).await {
Ok(_) => {
info!("got {peer_count} from {addr} sent {count}");
}
Err(err) => {
warn!(?err, "send_to {addr}");
}
}
send_interval.reset();
}
_ = send_interval.tick() => {
match sock.send_to(&addr, &count_bytes).await {
Ok(()) => {
info!("sent {count} to {addr}");
}
Err(err) => {
warn!(?err, "sent_to {addr}")
}
}
}
}
}
}
fn initialize_stdout_logging(verbose: bool) {
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_target(true)
.with_level(true)
.with_ansi(false)
.with_writer(std::io::stdout),
)
.with(env_filter(verbose))
.init();
}
fn env_filter(verbose: bool) -> EnvFilter {
if std::env::var("RUST_LOG").is_ok() {
EnvFilter::builder().from_env_lossy()
} else if verbose {
"debug".parse().unwrap()
} else {
"info".parse().unwrap()
}
}