use dashmap::DashMap;
use im::hashmap::HashMap;
use rand::Rng;
use std::sync::Arc;
use std::{io::Error, time::Duration};
use tailtalk_packets::{aarp::*};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::{CancellationToken, DropGuard};
use crate::{DataLinkPacket, DataLinkProtocol, OutboundHandle};
#[derive(Debug, Copy, Clone)]
pub enum Node {
EtherTalkPhase1(EthernetMac),
EtherTalkPhase2(EthernetMac),
LocalTalk(u8), }
struct AarpRequest {
addr: AppleTalkAddress,
chan: oneshot::Sender<Result<Node, Error>>,
}
struct SelfAddress {
chan: oneshot::Sender<AppleTalkAddress>,
}
enum AarpCommand {
Lookup(AarpRequest),
OurAddr(SelfAddress),
}
pub struct Addressing {
packet_recv: mpsc::Receiver<(AarpPacket, AddressSource)>,
request_recv: mpsc::Receiver<AarpCommand>,
pending: HashMap<AppleTalkAddress, Vec<Arc<AarpRequest>>>,
cache: Arc<DashMap<AppleTalkAddress, Node>>,
outbound: OutboundHandle,
our_mac: EthernetMac,
cancel: CancellationToken,
}
impl Addressing {
const BROADCAST_MAC: [u8; 6] = [0xFF; 6];
pub fn spawn(
our_mac: EthernetMac,
outbound: OutboundHandle,
fixed_addr: Option<AppleTalkAddress>,
) -> AddressingHandle {
let (request_send, request_recv) = mpsc::channel(100);
let (packet_send, packet_recv) = mpsc::channel(100);
let cache = Arc::new(DashMap::new());
let token = CancellationToken::new();
let handle_token = token.clone();
let us = Self {
pending: HashMap::new(),
cache: cache.clone(),
cancel: token,
request_recv,
packet_recv,
our_mac,
outbound,
};
tokio::spawn(async move { us.run(fixed_addr).await });
AddressingHandle {
_cancel: Arc::new(handle_token.drop_guard()),
request_send,
packet_send,
cache,
}
}
fn create_packet(
&self,
target: AppleTalkAddress,
target_mac: EthernetMac,
our_addr: AppleTalkAddress,
opcode: AarpOpcode,
source_type: AddressSource,
) -> DataLinkPacket {
let aarp_header = AarpPacket {
hardware_type: 1, protocol_type: 0x809b, hardware_size: 6,
protocol_size: 4,
opcode,
sender_addr: self.our_mac,
sender_protocol: our_addr,
target_addr: target_mac, target_protocol: target,
};
let mut payload = [0u8; AarpPacket::LEN];
aarp_header.to_bytes(&mut payload);
let dest_node = match source_type {
AddressSource::EtherTalkPhase2 => Node::EtherTalkPhase2(target_mac),
AddressSource::EtherTalkPhase1 => Node::EtherTalkPhase1(target_mac),
AddressSource::LocalTalk => unimplemented!("AARP not used on LocalTalk"),
};
DataLinkPacket {
dest_node,
protocol: DataLinkProtocol::Aarp,
payload: payload.into(),
src_node_id: 0, }
}
async fn run(mut self, fixed_addr: Option<AppleTalkAddress>) {
let our_addr = if let Some(addr) = fixed_addr {
tracing::info!(
"addressing: Using fixed address {}.{}",
addr.network_number,
addr.node_number
);
addr
} else {
'probe: loop {
let node_num = {
rand::rng().random_range(1..=254)
};
let addr = AppleTalkAddress {
network_number: 1,
node_number: node_num,
};
tracing::info!("addressing: Selecting 1.{node_num} as our address");
let probe = self.create_packet(
addr,
Self::BROADCAST_MAC,
addr,
AarpOpcode::Probe,
AddressSource::EtherTalkPhase2,
);
self.outbound
.send(probe)
.await
.expect("failed to dispatch probe packet");
let probe_deadline = tokio::time::Instant::now() + Duration::from_secs(1);
loop {
tokio::select! {
_ = tokio::time::sleep_until(probe_deadline) => {
tracing::info!("No response, address 1.{node_num} confirmed");
break 'probe addr;
}
pkt = self.packet_recv.recv() => {
if let Some((pkt, _source)) = pkt
&& pkt.opcode == AarpOpcode::Probe && pkt.target_protocol == addr {
tracing::warn!("Address conflict for 1.{node_num}, re-selecting");
continue 'probe;
}
}
}
}
}
};
loop {
tokio::select! {
pkt = self.packet_recv.recv() => {
if let Some((pkt, source)) = pkt {
match pkt.opcode {
AarpOpcode::Request => {
if pkt.target_protocol.node_number == our_addr.node_number {
self.send_response(pkt, our_addr, source).await;
}
},
AarpOpcode::Response => {
if let Some(reqs) = self.pending.remove(&pkt.sender_protocol) {
for req in reqs {
let inner_req = Arc::<AarpRequest>::into_inner(req).unwrap();
let node = match source {
AddressSource::EtherTalkPhase2 => {
Node::EtherTalkPhase2(pkt.sender_addr)
},
AddressSource::EtherTalkPhase1 => {
Node::EtherTalkPhase1(pkt.sender_addr)
}
AddressSource::LocalTalk => {
continue;
},
};
if let Err(e) = inner_req.chan.send(Ok(node)) {
tracing::error!("error responding to AarpRequest: {e:?}");
}
self.cache.insert(pkt.sender_protocol, node);
}
}
},
AarpOpcode::Probe => {
if pkt.target_protocol == our_addr {
self.send_response(pkt, our_addr, source).await;
tracing::info!("send response to probe that matched our addr");
}
},
}
}
}
req = self.request_recv.recv() => {
if let Some(command) = req {
match command {
AarpCommand::Lookup(lookup) => {
let packet = self.create_packet(lookup.addr, Self::BROADCAST_MAC, our_addr, AarpOpcode::Request, AddressSource::EtherTalkPhase2);
self.outbound.send(packet).await.expect("failed to dispatch request");
self.pending.entry(lookup.addr).or_default().push(Arc::new(lookup));
},
AarpCommand::OurAddr(req) => {
req.chan.send(our_addr).expect("failed to send our_addr");
},
}
}
}
_ = self.cancel.cancelled() => {
break;
}
}
}
}
async fn send_response(
&self,
pkt: AarpPacket,
our_addr: AppleTalkAddress,
source_type: AddressSource,
) {
let packet = self.create_packet(
pkt.sender_protocol, pkt.sender_addr, our_addr, AarpOpcode::Response,
source_type,
);
self.outbound
.send(packet)
.await
.expect("failed to dispatch response");
}
}
#[derive(Clone)]
pub struct AddressingHandle {
request_send: mpsc::Sender<AarpCommand>,
packet_send: mpsc::Sender<(AarpPacket, AddressSource)>,
pub(crate) cache: Arc<DashMap<AppleTalkAddress, Node>>,
_cancel: Arc<DropGuard>,
}
impl AddressingHandle {
pub fn try_lookup(&self, addr: &AppleTalkAddress) -> Option<Node> {
if addr.node_number == 255 {
return Some(Node::EtherTalkPhase2(Addressing::BROADCAST_MAC));
}
self.cache.get(addr).map(|v| *v)
}
pub async fn lookup(&self, addr: AppleTalkAddress) -> Result<Node, Error> {
if let Some(mac) = self.try_lookup(&addr) {
return Ok(mac);
}
let (tx, rx) = oneshot::channel();
let request = AarpCommand::Lookup(AarpRequest { addr, chan: tx });
if let Err(e) = self.request_send.send(request).await {
return Err(Error::other(e));
}
match rx.await {
Ok(res) => res,
Err(e) => Err(Error::other(e)),
}
}
pub async fn addr(&self) -> Result<AppleTalkAddress, Error> {
let (tx, rx) = oneshot::channel();
let request = AarpCommand::OurAddr(SelfAddress { chan: tx });
if let Err(e) = self.request_send.send(request).await {
return Err(Error::other(e));
}
match rx.await {
Ok(res) => Ok(res),
Err(e) => Err(Error::other(e)),
}
}
pub fn received_pkt(&self, pkt: &[u8], source: AddressSource) -> Result<(), Error> {
let headers = AarpPacket::parse(pkt).unwrap();
let node = match source {
AddressSource::EtherTalkPhase2 => Node::EtherTalkPhase2(headers.sender_addr),
AddressSource::EtherTalkPhase1 => Node::EtherTalkPhase1(headers.sender_addr),
AddressSource::LocalTalk => return Ok(()), };
self.cache.insert(headers.sender_protocol, node);
self.packet_send
.try_send((headers, source))
.map_err(Error::other)?;
Ok(())
}
}