irrc 0.1.0

A client library for the IRRd query protocol
Documentation
use std::{env::args, io::stderr, sync::mpsc, thread};

use ip::{traits::PrefixSet as _, Any, Prefix, PrefixSet};
use irrc::{Error, IrrClient, Query, ResponseItem};
use rpsl::names::{AsSet, AutNum};

struct Collector {
    sender: Option<mpsc::Sender<Prefix<Any>>>,
    join_handle: thread::JoinHandle<PrefixSet<Any>>,
}

impl Collector {
    fn spawn() -> Self {
        let (tx, rx) = mpsc::channel();
        let sender = Some(tx);
        tracing::debug!("starting collector thread");
        let join_handle = thread::spawn(move || {
            rx.iter()
                .inspect(|prefix| tracing::trace!("adding prefix {} to prefix set", prefix))
                .collect::<PrefixSet<Any>>()
        });
        Self {
            sender,
            join_handle,
        }
    }

    fn sender(&mut self) -> Option<Sender> {
        self.sender.take().map(Sender)
    }

    fn print(self) {
        tracing::debug!("trying to join collector thread");
        match self.join_handle.join() {
            Ok(set) => set.ranges().for_each(|range| println!("{}", range)),
            Err(err) => tracing::error!("failed to join set builder thread: {:?}", err),
        }
    }
}

#[derive(Clone)]
struct Sender(mpsc::Sender<Prefix<Any>>);

impl Sender {
    fn collect(&self, item: ResponseItem<Prefix<Any>>) {
        let prefix = item.into_content();
        tracing::trace!("sending prefix {prefix} to collector");
        if let Err(err) = self.0.send(prefix) {
            tracing::warn!("failed to send prefix to collector: {}", err);
        }
    }
}

struct QueryThread(thread::JoinHandle<Result<(), Error>>);

impl QueryThread {
    fn spawn(host: String, object: AsSet, collector: &mut Collector) -> Self {
        let sender = collector
            .sender()
            .expect("failed to take collector send handle");
        let join_handle = thread::spawn(move || -> Result<(), Error> {
            IrrClient::new(host)
                .connect()?
                .pipeline_from_initial(Query::AsSetMembersRecursive(object), |item| {
                    item.map(into_routes_queries).map_err(log_warning).ok()
                })?
                .responses()
                .filter_map(|item| item.map_err(log_warning).ok())
                .for_each(|item| sender.collect(item));
            Ok(())
        });
        Self(join_handle)
    }

    fn join(self) -> Result<(), Error> {
        self.0.join().expect("failed to join query thread")
    }
}

fn log_warning<E: std::error::Error>(err: E) -> E {
    tracing::warn!("failed to parse item: {}", err);
    err
}

fn into_routes_queries(item: ResponseItem<AutNum>) -> [Query; 2] {
    let autnum = item.into_content();
    [Query::Ipv4Routes(autnum), Query::Ipv6Routes(autnum)]
}

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::WARN)
        .with_writer(stderr)
        .try_init()?;
    let args: Vec<String> = args().collect();
    let host = format!("{}:43", args[1]);
    let object = args[2].parse().unwrap();
    let mut collector = Collector::spawn();
    let query_thread = QueryThread::spawn(host, object, &mut collector);
    query_thread.join()?;
    collector.print();
    Ok(())
}