veilnet 0.4.4

Networking abstractions built on Veilid API primitives
Documentation
use std::time::Duration;

use anyhow::Result;
use clap::Parser;
use tokio::{select, time::interval};
use tracing::{info, instrument, warn};
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
use veilnet::{Connection, DHTAddr, connection::Veilid, datagram::socket::Socket};

#[derive(Parser)]
#[command(name = "vcat")]
#[command(about = "A netcat-like utility for Veilid networks")]
#[command(version)]
struct Cli {
    /// Listen for incoming connections on port
    #[arg(short, long)]
    listen: Option<u16>,

    /// Enable verbose output
    #[arg(short, long)]
    verbose: bool,

    /// Remote address to connect to (if not listening)
    address: Option<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
    let cli = Cli::parse();

    initialize_stdout_logging(cli.verbose);

    let conn = Veilid::new().await?;

    if let Some(port) = cli.listen {
        listener(conn, port).await
    } else if let Some(address) = cli.address {
        dialer(conn, &address).await
    } else {
        eprintln!("Error: Must specify either -l to listen or provide an address to connect to");
        std::process::exit(1);
    }
}

#[instrument(skip_all)]
async fn listener<C: Connection + Clone + Send + Sync + 'static>(conn: C, port: u16) -> Result<()> {
    let mut sock = Socket::new(conn, None, port).await?;
    info!(addr = %sock.addr(), "listening");

    loop {
        select! {
            res = sock.recv_from() => {
                let (addr, dgram) = match res {
                    Ok(val) => val,
                    Err(err) => {
                        warn!(?err, "recv_from");
                        continue;
                    }
                };
                let peer_count = u32::from_be_bytes(match dgram.try_into() {
                    Ok(arr) => arr,
                    Err(err) => {
                        warn!(?err, "invalid count dgram from peer");
                        continue;
                    }
                });
                let count = peer_count + 1;
                match sock.send_to(&addr, &count.to_be_bytes()).await {
                    Ok(_) => {
                        info!("got {peer_count} from {addr} sent {count}");
                    }
                    Err(err) => {
                        warn!(?err, "send_to {addr}");
                    }
                }
            }
        }
    }
}

#[instrument(skip_all)]
async fn dialer<C>(conn: C, addr_str: &str) -> Result<()>
where
    C: Connection + Clone + Send + Sync + 'static,
{
    let mut sock = Socket::new(conn, None, 0).await?;
    let addr: DHTAddr = addr_str.parse()?;
    let mut count: u32 = 0;
    let mut send_interval = interval(Duration::from_secs(30));

    loop {
        let count_bytes = count.to_be_bytes();
        select! {
            res = sock.recv_from() => {
                let (addr, dgram) = match res {
                    Ok(val) => val,
                    Err(err) => {
                        warn!(?err, "recv_from");
                        continue;
                    }
                };
                let peer_count = u32::from_be_bytes(match dgram.try_into() {
                    Ok(arr) => arr,
                    Err(err) => {
                        warn!(?err, "invalid count dgram from peer");
                        continue;
                    }
                });
                count = peer_count + 1;
                match sock.send_to(&addr, &count.to_be_bytes()).await {
                    Ok(_) => {
                        info!("got {peer_count} from {addr} sent {count}");
                    }
                    Err(err) => {
                        warn!(?err, "send_to {addr}");
                    }
                }
                send_interval.reset();
            }
            _ = send_interval.tick() => {
                match sock.send_to(&addr, &count_bytes).await {
                    Ok(()) => {
                        info!("sent {count} to {addr}");
                    }
                    Err(err) => {
                        warn!(?err, "sent_to {addr}")
                    }
                }
            }
        }
    }
}

fn initialize_stdout_logging(verbose: bool) {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::fmt::layer()
                .with_target(true)
                .with_level(true)
                .with_ansi(false)
                .with_writer(std::io::stdout),
        )
        .with(env_filter(verbose))
        .init();
}

fn env_filter(verbose: bool) -> EnvFilter {
    if std::env::var("RUST_LOG").is_ok() {
        EnvFilter::builder().from_env_lossy()
    } else if verbose {
        "debug".parse().unwrap()
    } else {
        "info".parse().unwrap()
    }
}