epicars 0.1.2

Standalone, pure rust implementation of EPICS CA protocol
Documentation
use std::{
    io::{self, ErrorKind},
    net::ToSocketAddrs,
};

use epicars::messages::{self, RawMessage, parse_search_packet};
use socket2::{Domain, Protocol, Type};
use tokio::{net::UdpSocket, task::yield_now};
use tracing::{error, info, warn};

pub fn new_reusable_udp_socket<T: ToSocketAddrs>(address: T) -> io::Result<std::net::UdpSocket> {
    let socket = socket2::Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
    socket.set_reuse_port(true)?;
    socket.set_nonblocking(true)?;
    let addr = address.to_socket_addrs()?.next().unwrap();
    socket.bind(&addr.into())?;
    Ok(socket.into())
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    tracing_subscriber::fmt::init();

    tokio::spawn(async {
        let std_sock = match new_reusable_udp_socket("0.0.0.0:5065") {
            Ok(x) => x,
            Err(err) => match err.kind() {
                ErrorKind::AddrInUse => {
                    panic!(
                        "Error: Port 5065 already in use, without reuse flag. Is a caRepeater running?"
                    );
                }
                x => panic!("IO Error: {x}"),
            },
        };

        let socket_beacon = UdpSocket::from_std(std_sock).unwrap();

        loop {
            read_socket(&socket_beacon).await;
        }
    });

    tokio::spawn(async {
        let socket_search =
            tokio::net::UdpSocket::from_std(new_reusable_udp_socket("0.0.0.0:5064").unwrap())
                .unwrap();
        loop {
            read_socket(&socket_search).await;
        }
    });
    info!("Waiting for packets on 0.0.0.0:5065, 0.0.0.0:5064",);
    loop {
        yield_now().await;
    }
}

async fn read_socket(socket: &UdpSocket) {
    let mut buf: Vec<u8> = vec![0; 0xFFFF];

    while let Ok((size, sender)) = socket.recv_from(&mut buf).await {
        let msg_buf = &buf[..size];

        if let Ok((leftover, raw)) = RawMessage::parse(msg_buf) {
            match raw.command {
                0 => {
                    if let Ok(searches) = parse_search_packet(msg_buf) {
                        info!(
                            "Received SEARCH for {} names from {sender}: {}",
                            searches.len(),
                            searches
                                .iter()
                                .map(|x| x.channel_name.clone())
                                .collect::<Vec<String>>()
                                .join(" ")
                        );
                    } else {
                        error!(
                            "Receied code 0 CA_PROTO_VERSION packet from {sender} but we don't understand the contents"
                        );
                    }
                }
                13 => {
                    let beacon: messages::RsrvIsUp = raw.try_into().unwrap();
                    info!(
                        "Received BEACON {}:{} ({}) from {sender}",
                        beacon.server_ip.map(|f| f.into()).unwrap_or(sender.ip()),
                        beacon.server_port,
                        beacon.beacon_id,
                    );
                    if !leftover.is_empty() {
                        warn!(
                            "Warning: After parsing BEACON, {} bytes were remaining",
                            leftover.len()
                        )
                    }
                }
                _ => warn!("Unexpected broadcast packet received!"),
            }
        } else {
            error!("Got an error parsing a raw message; was it a real CA message? {msg_buf:x?}");
            break;
        }
    }
}