use anyhow::Error;
use std::path::PathBuf;
use std::time::SystemTime;
use tailtalk_packets::aarp;
use tailtalk_packets::ddp::DdpPacket;
use tailtalk_packets::llap::{LlapPacket, LlapType};
use tashtalk::TashTalk;
use tokio::sync::mpsc;
use tokio_serial::SerialPortBuilderExt;
pub use tokio_util::sync::CancellationToken;
#[cfg(feature = "ethertalk")]
mod ethertalk;
pub use tashtalk::TashTalkFeatures;
pub mod addressing;
pub mod adsp;
pub mod afp;
pub mod asp;
pub mod atp;
pub mod ddp;
pub mod echo;
pub mod nbp;
pub mod pap;
pub mod stylewriter;
#[derive(Debug, PartialEq, Eq)]
pub enum DataLinkProtocol {
Ddp,
Aarp,
LlapEnq,
LlapAck,
}
#[derive(Debug)]
pub struct DataLinkPacket {
pub dest_node: addressing::Node,
pub protocol: DataLinkProtocol,
pub payload: Box<[u8]>,
pub src_node_id: u8,
}
pub struct PacketProcessor {
#[cfg(feature = "ethertalk")]
transport: Option<ethertalk::EtherTalkTransport>,
outbound_rx: mpsc::Receiver<DataLinkPacket>,
localtalk_serial_path: Option<String>,
tashtalk_features: tashtalk::TashTalkFeatures,
pcap_sender: Option<std::sync::mpsc::SyncSender<(SystemTime, Vec<u8>)>>,
}
pub struct PacketProcessorBuilder {
#[cfg(feature = "ethertalk")]
ethernet_intf: Option<String>,
localtalk_serial_path: Option<String>,
tashtalk_features: tashtalk::TashTalkFeatures,
pcap_path: Option<PathBuf>,
}
impl PacketProcessorBuilder {
fn new() -> Self {
Self {
#[cfg(feature = "ethertalk")]
ethernet_intf: None,
localtalk_serial_path: None,
tashtalk_features: tashtalk::TashTalkFeatures::new(),
pcap_path: None,
}
}
#[cfg(feature = "ethertalk")]
pub fn ethernet(mut self, intf: &str) -> Self {
self.ethernet_intf = Some(intf.to_string());
self
}
pub fn tashtalk_features(mut self, features: tashtalk::TashTalkFeatures) -> Self {
self.tashtalk_features = features;
self
}
pub fn localtalk(mut self, serial_path: &str) -> Self {
self.localtalk_serial_path = Some(serial_path.to_string());
self
}
pub fn pcap_capture(mut self, path: impl Into<PathBuf>) -> Self {
self.pcap_path = Some(path.into());
self
}
pub fn build(self) -> Result<(PacketProcessor, OutboundHandle), Error> {
#[cfg(feature = "ethertalk")]
let transport = if let Some(ref intf) = self.ethernet_intf {
Some(ethertalk::EtherTalkTransport::open(intf)?)
} else {
None
};
let (outbound_tx, outbound_rx) = mpsc::channel(100);
let pcap_sender = self.pcap_path.and_then(spawn_pcap_writer);
let processor = PacketProcessor {
#[cfg(feature = "ethertalk")]
transport,
outbound_rx,
localtalk_serial_path: self.localtalk_serial_path,
tashtalk_features: self.tashtalk_features,
pcap_sender,
};
let handle = OutboundHandle { tx: outbound_tx };
Ok((processor, handle))
}
}
fn spawn_pcap_writer(
path: PathBuf,
) -> Option<std::sync::mpsc::SyncSender<(SystemTime, Vec<u8>)>> {
use std::io::Write as _;
let mut file = match std::fs::File::create(&path).map(std::io::BufWriter::new) {
Ok(f) => f,
Err(e) => {
tracing::error!("Failed to create pcap file '{}': {e}", path.display());
return None;
}
};
let mut hdr = [0u8; 24];
hdr[0..4].copy_from_slice(&0xa1b2c3d4u32.to_le_bytes()); hdr[4..6].copy_from_slice(&2u16.to_le_bytes()); hdr[6..8].copy_from_slice(&4u16.to_le_bytes()); hdr[16..20].copy_from_slice(&65535u32.to_le_bytes()); hdr[20..24].copy_from_slice(&114u32.to_le_bytes());
if let Err(e) = file.write_all(&hdr) {
tracing::error!("Failed to write pcap global header: {e}");
return None;
}
let (tx, rx) = std::sync::mpsc::sync_channel::<(SystemTime, Vec<u8>)>(512);
std::thread::spawn(move || {
use std::io::Write as _;
tracing::info!("LocalTalk pcap capture started: '{}'", path.display());
for (ts, data) in rx {
let d = ts.duration_since(std::time::UNIX_EPOCH).unwrap_or_default();
let ts_sec = d.as_secs() as u32;
let ts_usec = d.subsec_micros();
let len = data.len() as u32;
let mut rec = [0u8; 16];
rec[0..4].copy_from_slice(&ts_sec.to_le_bytes());
rec[4..8].copy_from_slice(&ts_usec.to_le_bytes());
rec[8..12].copy_from_slice(&len.to_le_bytes());
rec[12..16].copy_from_slice(&len.to_le_bytes());
if file.write_all(&rec).is_err() || file.write_all(&data).is_err() {
tracing::error!("pcap write error, stopping capture");
break;
}
let _ = file.flush();
}
tracing::info!("LocalTalk pcap capture stopped: '{}'", path.display());
});
Some(tx)
}
impl PacketProcessor {
pub fn builder() -> PacketProcessorBuilder {
PacketProcessorBuilder::new()
}
pub fn get_mac(&self) -> Option<[u8; 6]> {
#[cfg(feature = "ethertalk")]
return self.transport.as_ref().map(|t| t.our_mac);
#[cfg(not(feature = "ethertalk"))]
None
}
pub async fn run(
self,
et_addressing: Option<addressing::AddressingHandle>,
lt_addressing: Option<addressing::AddressingHandle>,
ddp: ddp::DdpHandle,
token: CancellationToken,
) {
#[cfg(not(feature = "ethertalk"))]
let _ = et_addressing;
let pcap_rx_sender = self.pcap_sender.clone();
let pcap_tx_sender = self.pcap_sender;
#[cfg(feature = "ethertalk")]
let mut et_tx = if let Some(transport) = self.transport {
let addressing = et_addressing.clone()
.expect("EtherTalk transport requires ET addressing");
match transport.spawn_rx_task(ddp.clone(), addressing, token.clone()) {
None => return,
Some(tx) => Some(tx),
}
} else {
None
};
let tashtalk_features = self.tashtalk_features;
let (tashtalk_ready_tx, mut tashtalk_ready_rx) = tokio::sync::watch::channel(false);
let (tashtalk_tx_watch_tx, tashtalk_tx_watch_rx) =
tokio::sync::watch::channel::<Option<mpsc::Sender<Vec<u8>>>>(None);
let has_localtalk = self.localtalk_serial_path.is_some();
if let Some(serial_path) = self.localtalk_serial_path {
let ddp_handle = ddp.clone();
let addressing_handle = lt_addressing.clone().expect("TashTalk task requires LT addressing");
let tash_token = token.clone();
let ready = tashtalk_ready_tx;
let tx_watch = tashtalk_tx_watch_tx;
tokio::spawn(async move {
let mut first_connect = true;
loop {
let stream = match tokio_serial::new(&serial_path, 1_000_000)
.flow_control(tokio_serial::FlowControl::Hardware)
.open_native_async()
{
Ok(s) => {
tracing::info!("TashTalk: opened {}", serial_path);
s
}
Err(e) => {
if first_connect {
tracing::warn!("TashTalk: {} not available ({e}), waiting for device", serial_path);
first_connect = false;
}
tokio::select! {
_ = tash_token.cancelled() => return,
_ = tokio::time::sleep(std::time::Duration::from_secs(2)) => continue,
}
}
};
first_connect = false;
let mut tashtalk_instance = TashTalk::new(stream);
tracing::info!("TashTalk: resetting");
if let Err(e) = tashtalk_instance.reset().await {
tracing::error!("TashTalk reset failed: {:?}", e);
continue;
}
tracing::info!("TashTalk: setting features {:?}", tashtalk_features);
if let Err(e) = tashtalk_instance.set_features(tashtalk_features).await {
tracing::error!("TashTalk set_features failed: {:?}", e);
continue;
}
match addressing_handle.addr().await {
Ok(addr) => {
let node_id = addr.node_number;
tracing::info!("TashTalk: setting node ID bits for node {}", node_id);
let mut node_bits = [0u8; 32];
node_bits[(node_id / 8) as usize] |= 1 << (node_id % 8);
if let Err(e) = tashtalk_instance.set_node_ids(node_bits).await {
tracing::error!("TashTalk set_node_ids failed: {:?}", e);
continue;
}
}
Err(e) => {
tracing::error!("TashTalk: failed to get address: {:?}", e);
continue;
}
}
let (tx, mut tashtalk_rx) = mpsc::channel::<Vec<u8>>(100);
let _ = tx_watch.send(Some(tx));
let _ = ready.send(true);
tracing::info!("TashTalk: online");
loop {
tokio::select! {
_ = tash_token.cancelled() => return,
frame_opt = tashtalk_rx.recv() => {
if let Some(frame) = frame_opt {
if let Err(e) = tashtalk_instance.send_frame(&frame).await {
tracing::error!("TashTalk send_frame error: {:?}", e);
break;
}
} else {
break;
}
}
res = tashtalk_instance.receive_frame() => {
match res {
Ok(Some(data)) => {
if let Some(ref tx) = pcap_rx_sender {
let _ = tx.try_send((SystemTime::now(), data.clone()));
}
if data.len() < 3 { continue; }
if let Ok(llap) = LlapPacket::parse(&data) {
match llap.type_ {
LlapType::DdpShort => {
tracing::debug!("TashTalk: LocalTalk DDP Short");
if let Ok(headers) = DdpPacket::parse_short(
&data[3..],
llap.dst_node,
llap.src_node,
) {
tracing::debug!(
"LLAP: {:?}, DDP Short: {:?}",
llap,
headers
);
let end = (3 + headers.len).min(data.len());
let payload = data[8..end].to_vec().into_boxed_slice();
ddp_handle.received_parsed_pkt(
headers,
payload,
aarp::AddressSource::LocalTalk,
[0; 6],
);
}
}
LlapType::Enquiry => {
addressing_handle.received_llap_enq(llap.dst_node);
}
LlapType::Acknowledge => {
addressing_handle.received_llap_ack(llap.src_node);
}
LlapType::DdpLong => {
tracing::info!("TashTalk: LocalTalk DDP Long");
if let Ok(headers) = DdpPacket::parse(&data[3..]) {
let end = (3 + headers.len).min(data.len());
let payload =
data[(3 + DdpPacket::LEN)..end].to_vec().into_boxed_slice();
ddp_handle.received_parsed_pkt(
headers,
payload,
aarp::AddressSource::LocalTalk,
[0; 6],
);
}
}
_ => {}
}
}
}
Ok(None) => break,
Err(e) => {
tracing::error!("TashTalk I/O error: {:?}", e);
break;
}
}
}
}
}
tracing::warn!("TashTalk: offline, will reconnect");
let _ = tx_watch.send(None);
let _ = ready.send(false);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
});
} else {
let _ = tashtalk_ready_tx.send(true);
};
let mut rx = self.outbound_rx;
if has_localtalk {
if !*tashtalk_ready_rx.borrow() {
tracing::info!("Waiting for TashTalk device...");
let _ = tashtalk_ready_rx.wait_for(|ready| *ready).await;
}
}
let mut tashtalk_tx_watch_rx = tashtalk_tx_watch_rx;
loop {
let pkt = tokio::select! {
_ = token.cancelled() => break,
pkt = rx.recv() => match pkt { Some(p) => p, None => break },
};
let mut output_buf: [u8; 1500] = [0u8; 1500];
let final_size = match pkt.protocol {
DataLinkProtocol::Ddp => {
match pkt.dest_node {
#[cfg(feature = "ethertalk")]
addressing::Node::EtherTalkPhase1(_) | addressing::Node::EtherTalkPhase2(_) => {
et_tx.as_ref().map_or(0, |t| t.build_ddp_frame(pkt.dest_node, &pkt.payload, &mut output_buf))
}
addressing::Node::LocalTalk(node_id) => {
let llap_pkt = LlapPacket {
dst_node: node_id,
src_node: pkt.src_node_id,
type_: LlapType::DdpShort,
};
let header_len = llap_pkt
.to_bytes(&mut output_buf)
.expect("failed to frame LLAP");
let payload_len = pkt.payload.len();
output_buf[header_len..header_len + payload_len]
.copy_from_slice(&pkt.payload);
let frame_end = header_len + payload_len;
let crc = tashtalk::lt_crc(&output_buf[..frame_end]);
output_buf[frame_end] = crc[0];
output_buf[frame_end + 1] = crc[1];
frame_end + 2
}
#[cfg(not(feature = "ethertalk"))]
_ => 0,
}
}
DataLinkProtocol::LlapEnq | DataLinkProtocol::LlapAck => {
match pkt.dest_node {
addressing::Node::LocalTalk(node_id) => {
let type_ = if pkt.protocol == DataLinkProtocol::LlapEnq {
LlapType::Enquiry
} else {
LlapType::Acknowledge
};
let llap_pkt = LlapPacket {
dst_node: node_id,
src_node: pkt.src_node_id,
type_,
};
let header_len = llap_pkt
.to_bytes(&mut output_buf)
.expect("failed to frame LLAP control");
let crc = tashtalk::lt_crc(&output_buf[..header_len]);
output_buf[header_len] = crc[0];
output_buf[header_len + 1] = crc[1];
header_len + 2
}
_ => 0,
}
}
DataLinkProtocol::Aarp => {
match pkt.dest_node {
#[cfg(feature = "ethertalk")]
addressing::Node::EtherTalkPhase1(_) | addressing::Node::EtherTalkPhase2(_) => {
et_tx.as_ref().map_or(0, |t| t.build_aarp_frame(pkt.dest_node, &pkt.payload, &mut output_buf))
}
addressing::Node::LocalTalk(_) => 0,
#[cfg(not(feature = "ethertalk"))]
_ => 0,
}
}
};
if final_size == 0 {
continue;
}
match pkt.dest_node {
#[cfg(feature = "ethertalk")]
addressing::Node::EtherTalkPhase1(_) | addressing::Node::EtherTalkPhase2(_) => {
if let Some(ref mut t) = et_tx {
t.sendpacket(&output_buf, final_size);
}
}
addressing::Node::LocalTalk(_) => {
let tashtalk_tx = tashtalk_tx_watch_rx.borrow_and_update().clone();
if let Some(tx) = tashtalk_tx {
tracing::debug!("Sending to Tashtalk tx: {:X?}", &output_buf[..final_size]);
if let Err(e) = tx.send(output_buf[..final_size].to_vec()).await {
tracing::error!("Failed to send to Tashtalk tx: {}", e);
}
}
if let Some(ref tx) = pcap_tx_sender {
let frame_len = final_size.saturating_sub(2);
let _ = tx.try_send((SystemTime::now(), output_buf[..frame_len].to_vec()));
}
}
#[cfg(not(feature = "ethertalk"))]
_ => {}
}
}
}
}
#[derive(Clone)]
pub struct OutboundHandle {
tx: mpsc::Sender<DataLinkPacket>,
}
impl OutboundHandle {
pub fn new(tx: mpsc::Sender<DataLinkPacket>) -> Self {
Self { tx }
}
pub async fn send(&self, packet: DataLinkPacket) -> Result<(), Error> {
self.tx.send(packet).await?;
Ok(())
}
}
#[derive(Clone)]
pub struct ShutdownHandle {
service_token: CancellationToken,
transport_token: CancellationToken,
services_done: CancellationToken,
}
impl ShutdownHandle {
pub fn shutdown(&self) {
self.service_token.cancel();
self.transport_token.cancel();
}
pub async fn transport_closed(&self) {
self.transport_token.cancelled().await;
}
pub async fn graceful_shutdown(&self) {
self.service_token.cancel();
let _ = tokio::time::timeout(
std::time::Duration::from_secs(5),
self.services_done.cancelled(),
)
.await;
self.transport_token.cancel();
}
}
pub struct TalkStack {
pub et_addressing: Option<addressing::AddressingHandle>,
pub lt_addressing: Option<addressing::AddressingHandle>,
pub ddp: ddp::DdpHandle,
pub nbp: nbp::NbpHandle,
pub echo: echo::EchoHandle,
service_token: CancellationToken,
transport_token: CancellationToken,
services_done: CancellationToken,
}
impl TalkStack {
pub async fn wait_for_shutdown(&self) {
self.transport_token.cancelled().await;
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
ShutdownHandle {
service_token: self.service_token.clone(),
transport_token: self.transport_token.clone(),
services_done: self.services_done.clone(),
}
}
pub fn token(&self) -> CancellationToken {
self.service_token.clone()
}
pub fn services_done_token(&self) -> CancellationToken {
self.services_done.clone()
}
pub async fn spawn_afp(&self, socket: Option<u8>, config: afp::AfpServerConfig) -> anyhow::Result<afp::AfpServer> {
afp::AfpServer::spawn(&self.ddp, &self.nbp, socket, config, self.service_token.clone(), self.services_done.clone()).await
}
pub async fn listen_asp(
&self,
socket: Option<u8>,
entity_name: tailtalk_packets::nbp::EntityName,
status_data: Vec<u8>,
) -> anyhow::Result<asp::AspHandle> {
asp::Asp::bind(&self.ddp, &self.nbp, socket, entity_name, status_data, self.service_token.clone(), self.services_done.clone()).await
}
pub async fn listen_adsp(&self, socket: Option<u8>) -> std::io::Result<(u8, adsp::AdspListener)> {
adsp::Adsp::bind(&self.ddp, socket).await
}
pub async fn connect_adsp(&self, remote_addr: adsp::AdspAddress) -> std::io::Result<adsp::AdspStream> {
adsp::Adsp::connect(&self.ddp, remote_addr).await
}
pub async fn pap_client(&self) -> pap::PapClient {
let (_, atp_requestor, atp_responder) = atp::Atp::spawn(&self.ddp, None).await;
pap::PapClient::new(atp_requestor, atp_responder)
}
pub async fn pap_status(&self, address: atp::AtpAddress) -> anyhow::Result<String> {
let (_, atp_requestor, _) = atp::Atp::spawn(&self.ddp, None).await;
pap::PapClient::get_status(atp_requestor, address).await
}
}
pub struct TalkStackBuilder {
#[cfg(feature = "ethertalk")]
ethernet_intf: Option<String>,
localtalk_serial_path: Option<String>,
tashtalk_features: tashtalk::TashTalkFeatures,
#[cfg(feature = "ethertalk")]
fixed_addr: Option<tailtalk_packets::aarp::AppleTalkAddress>,
lt_fixed_node: Option<u8>,
pcap_path: Option<PathBuf>,
}
impl TalkStack {
pub fn builder() -> TalkStackBuilder {
TalkStackBuilder {
#[cfg(feature = "ethertalk")]
ethernet_intf: None,
localtalk_serial_path: None,
tashtalk_features: tashtalk::TashTalkFeatures::new(),
#[cfg(feature = "ethertalk")]
fixed_addr: None,
lt_fixed_node: None,
pcap_path: None,
}
}
}
impl TalkStackBuilder {
#[cfg(feature = "ethertalk")]
pub fn ethernet(mut self, intf: &str) -> Self {
self.ethernet_intf = Some(intf.to_string());
self
}
pub fn localtalk(mut self, serial_path: &str) -> Self {
self.localtalk_serial_path = Some(serial_path.to_string());
self
}
pub fn tashtalk_features(mut self, features: tashtalk::TashTalkFeatures) -> Self {
self.tashtalk_features = features;
self
}
pub fn pcap_capture(mut self, path: impl Into<PathBuf>) -> Self {
self.pcap_path = Some(path.into());
self
}
pub fn localtalk_fixed_address(mut self, node: u8) -> Self {
self.lt_fixed_node = Some(node);
self
}
#[cfg(feature = "ethertalk")]
pub fn fixed_address(mut self, network: u16, node: u8) -> Self {
self.fixed_addr = Some(tailtalk_packets::aarp::AppleTalkAddress {
network_number: network,
node_number: node,
});
self
}
pub async fn build(self) -> Result<TalkStack, Error> {
let mut pp = PacketProcessor::builder().tashtalk_features(self.tashtalk_features);
#[cfg(feature = "ethertalk")]
if let Some(ref intf) = self.ethernet_intf {
pp = pp.ethernet(intf);
}
if let Some(ref path) = self.localtalk_serial_path {
pp = pp.localtalk(path);
}
if let Some(path) = self.pcap_path {
pp = pp.pcap_capture(path);
}
let (processor, outbound) = pp.build()?;
#[cfg(feature = "ethertalk")]
let et_addressing = if self.ethernet_intf.is_some() {
Some(addressing::Addressing::spawn(
processor.get_mac(),
outbound.clone(),
self.fixed_addr,
aarp::AddressSource::EtherTalkPhase2,
))
} else {
None
};
#[cfg(not(feature = "ethertalk"))]
let et_addressing: Option<addressing::AddressingHandle> = None;
let lt_addressing = if self.localtalk_serial_path.is_some() {
let lt_fixed = self.lt_fixed_node.map(|node| tailtalk_packets::aarp::AppleTalkAddress {
network_number: 0,
node_number: node,
});
Some(addressing::Addressing::spawn(
None,
outbound.clone(),
lt_fixed,
aarp::AddressSource::LocalTalk,
))
} else {
None
};
let ddp = ddp::DdpProcessor::spawn(et_addressing.clone(), lt_addressing.clone(), outbound);
let echo = echo::Echo::spawn(&ddp).await;
let nbp = nbp::Nbp::spawn(&ddp, et_addressing.clone(), lt_addressing.clone()).await;
let service_token = CancellationToken::new();
let transport_token = CancellationToken::new();
let services_done = CancellationToken::new();
tokio::spawn(processor.run(et_addressing.clone(), lt_addressing.clone(), ddp.clone(), transport_token.clone()));
if let Some(et) = &et_addressing {
et.addr().await?;
}
if let Some(lt) = <_addressing {
lt.addr().await?;
}
Ok(TalkStack { et_addressing, lt_addressing, ddp, nbp, echo, service_token, transport_token, services_done })
}
}
pub fn time_to_afp(time: SystemTime) -> u32 {
const AFP2_EPOCH_OFFSET: u64 = 946_684_800;
let unix_secs = time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
unix_secs.saturating_sub(AFP2_EPOCH_OFFSET) as u32
}
pub fn time_to_afp_v1(time: SystemTime) -> u32 {
const MAC_TO_UNIX_EPOCH_OFFSET: u64 = 2_082_844_800;
let unix_secs = time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
(unix_secs + MAC_TO_UNIX_EPOCH_OFFSET) as u32
}