use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use crate::can_types::wire;
use crate::iroh::{IrohClientBuilder, IrohConnection};
pub use crate::can_types::{CanFdFlags, CanInterfaceInfo};
pub struct CanClient {
send: Arc<Mutex<iroh::endpoint::SendStream>>,
recv: Arc<Mutex<iroh::endpoint::RecvStream>>,
_conn: IrohConnection,
}
impl CanClient {
pub async fn connect(server_id: &str) -> Result<Self> {
tracing::debug!("Connecting to CAN server: {}", server_id);
let conn = IrohClientBuilder::new()
.connect_str(server_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to server: {}", e))?;
tracing::debug!("Connected to server, opening stream...");
let stream = conn
.open_stream()
.await
.map_err(|e| anyhow::anyhow!("Failed to open stream: {}", e))?;
tracing::debug!("Stream opened successfully");
let (send, recv) = stream.split();
Ok(Self {
send: Arc::new(Mutex::new(send)),
recv: Arc::new(Mutex::new(recv)),
_conn: conn,
})
}
pub async fn write_frame(&self, frame: &CanFrame) -> Result<()> {
let encoded = wire::encode(&AnyCanFrame::Can(frame.clone()));
let mut send = self.send.lock().await;
send.write_all(&encoded).await?;
drop(send);
tokio::task::yield_now().await;
Ok(())
}
pub async fn write_fd_frame(&self, frame: &CanFdFrame) -> Result<()> {
let encoded = wire::encode(&AnyCanFrame::CanFd(frame.clone()));
let mut send = self.send.lock().await;
send.write_all(&encoded).await?;
drop(send);
tokio::task::yield_now().await;
Ok(())
}
pub async fn read_frame(&self) -> Result<Option<AnyCanFrame>> {
let mut buf = [0u8; wire::FRAME_SIZE];
let mut recv = self.recv.lock().await;
let mut read = 0;
while read < wire::FRAME_SIZE {
match recv.read(&mut buf[read..]).await? {
Some(n) if n > 0 => read += n,
Some(_) => continue,
None => return Ok(None),
}
}
let (frame, _) = wire::decode(&buf)?;
Ok(Some(frame))
}
}
#[derive(Clone)]
pub enum Transport {
Iroh {
alpn: Option<Vec<u8>>,
relay_url: Option<String>,
},
Moq {
relay: String,
insecure: bool,
},
}
impl Default for Transport {
fn default() -> Self {
Transport::Iroh {
alpn: None,
relay_url: None,
}
}
}
pub struct CanSocketBuilder {
server_id: String,
timeout: Duration,
transport: Transport,
fd_enabled: bool,
}
impl CanSocketBuilder {
pub fn new(server_id: &str) -> Self {
Self {
server_id: server_id.to_string(),
timeout: Duration::from_secs(1),
transport: Transport::default(),
fd_enabled: false,
}
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn enable_fd(mut self, enabled: bool) -> Self {
self.fd_enabled = enabled;
self
}
pub fn with_iroh(mut self) -> Self {
self.transport = Transport::Iroh {
alpn: None,
relay_url: None,
};
self
}
pub fn alpn(mut self, alpn: &[u8]) -> Self {
if let Transport::Iroh {
alpn: ref mut a, ..
} = self.transport
{
*a = Some(alpn.to_vec());
}
self
}
pub fn iroh_relay(mut self, url: &str) -> Self {
if let Transport::Iroh {
relay_url: ref mut r,
..
} = self.transport
{
*r = Some(url.to_string());
}
self
}
pub fn with_moq(mut self, relay: &str) -> Self {
self.transport = Transport::Moq {
relay: relay.to_string(),
insecure: false,
};
self
}
pub fn insecure(mut self, val: bool) -> Self {
if let Transport::Moq {
insecure: ref mut i,
..
} = self.transport
{
*i = val;
}
self
}
pub fn open(self) -> Result<RemoteCanSocket> {
let runtime = tokio::runtime::Runtime::new()?;
let client = match self.transport {
Transport::Iroh { alpn, relay_url } => runtime.block_on(async {
let mut builder = IrohClientBuilder::new();
if let Some(alpn) = alpn {
builder = builder.alpn(&alpn);
}
if let Some(url) = relay_url {
builder = builder.relay_url(url);
}
let conn = builder.connect_str(&self.server_id).await?;
let stream = conn.open_stream().await?;
let (send, recv) = stream.split();
Ok::<_, anyhow::Error>(ClientInner::Iroh {
send: Arc::new(Mutex::new(send)),
recv: Arc::new(Mutex::new(recv)),
_conn: Box::new(conn),
})
})?,
Transport::Moq { relay, insecure } => runtime.block_on(async {
let mut builder = crate::moq::MoqBuilder::new().relay(&relay);
if insecure {
builder = builder.disable_tls_verify();
}
let cmd_builder = builder
.clone()
.path(&format!("{}/commands", self.server_id));
let state_builder = builder.path(&format!("{}/state", self.server_id));
let (pub_result, sub_result) = tokio::join!(
cmd_builder.connect_publisher_with_track("can"),
Self::connect_and_subscribe_retry(state_builder, "can")
);
let (publisher, cmd_writer) = pub_result?;
let (subscriber, state_reader) = sub_result?;
Ok::<_, anyhow::Error>(ClientInner::Moq {
cmd_writer: Arc::new(Mutex::new(cmd_writer)),
state_reader: Arc::new(Mutex::new(state_reader)),
_publisher: Box::new(publisher),
_subscriber: Box::new(subscriber),
})
})?,
};
Ok(RemoteCanSocket {
client,
runtime,
server_id: self.server_id,
timeout: self.timeout,
fd_enabled: self.fd_enabled,
read_buffer: std::sync::Mutex::new(Vec::new()),
})
}
}
enum ClientInner {
Iroh {
send: Arc<Mutex<iroh::endpoint::SendStream>>,
recv: Arc<Mutex<iroh::endpoint::RecvStream>>,
_conn: Box<IrohConnection>,
},
Moq {
cmd_writer: Arc<Mutex<crate::moq::MoqTrackWriter>>,
state_reader: Arc<Mutex<crate::moq::MoqTrackReader>>,
_publisher: Box<crate::moq::MoqPublisher>,
_subscriber: Box<crate::moq::MoqSubscriber>,
},
}
impl CanSocketBuilder {
async fn connect_and_subscribe_retry(
builder: crate::moq::MoqBuilder,
track_name: &str,
) -> anyhow::Result<(crate::moq::MoqSubscriber, crate::moq::MoqTrackReader)> {
let track = track_name.to_string();
loop {
let mut subscriber = builder.clone().connect_subscriber().await?;
match tokio::time::timeout(Duration::from_secs(2), subscriber.subscribe_track(&track))
.await
{
Ok(Ok(Some(reader))) => return Ok((subscriber, reader)),
Ok(Ok(None)) => {
tracing::debug!("Broadcast ended, reconnecting...");
}
Ok(Err(e)) => {
tracing::debug!("Subscribe error: {}, reconnecting...", e);
}
Err(_) => {
tracing::debug!("No broadcast yet, reconnecting...");
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
pub fn new(server_id: &str) -> CanSocketBuilder {
CanSocketBuilder::new(server_id)
}
pub struct RemoteCanSocket {
client: ClientInner,
runtime: tokio::runtime::Runtime,
server_id: String,
timeout: Duration,
fd_enabled: bool,
read_buffer: std::sync::Mutex<Vec<u8>>,
}
impl RemoteCanSocket {
pub fn open(server_id: &str) -> Result<Self> {
new(server_id).open()
}
pub fn server_id(&self) -> &str {
&self.server_id
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn set_timeout(&mut self, timeout: Duration) -> Result<()> {
self.timeout = timeout;
Ok(())
}
pub fn is_fd_enabled(&self) -> bool {
self.fd_enabled
}
pub fn write_frame(&mut self, frame: &CanFrame) -> Result<()> {
let encoded = wire::encode(&AnyCanFrame::Can(frame.clone()));
self.write_raw(&encoded)
}
pub fn write_fd_frame(&mut self, frame: &CanFdFrame) -> Result<()> {
let encoded = wire::encode(&AnyCanFrame::CanFd(frame.clone()));
self.write_raw(&encoded)
}
pub fn write_any_frame(&mut self, frame: &AnyCanFrame) -> Result<()> {
let encoded = wire::encode(frame);
self.write_raw(&encoded)
}
fn write_raw(&mut self, data: &[u8]) -> Result<()> {
self.runtime.block_on(async {
match &self.client {
ClientInner::Iroh { send, .. } => {
let mut s = send.lock().await;
s.write_all(data).await?;
drop(s);
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
}
ClientInner::Moq { cmd_writer, .. } => {
let mut w = cmd_writer.lock().await;
w.write_stream(data.to_vec());
}
}
Ok::<_, anyhow::Error>(())
})
}
pub fn read_frame(&mut self) -> Result<Option<AnyCanFrame>> {
if let Some(frame) = self.try_decode_buffered()? {
return Ok(Some(frame));
}
let data = self.read_raw_with_timeout()?;
if let Some(data) = data {
{
let mut buffer = self.read_buffer.lock().unwrap();
buffer.extend_from_slice(&data);
}
self.try_decode_buffered()
} else {
Ok(None)
}
}
fn read_raw_with_timeout(&mut self) -> Result<Option<Vec<u8>>> {
let timeout = self.timeout;
let result = self.runtime.block_on(async {
tokio::time::timeout(timeout, async {
let mut temp_buf = vec![0u8; 128];
match &self.client {
ClientInner::Iroh { recv, .. } => {
let mut r = recv.lock().await;
match r.read(&mut temp_buf).await? {
Some(n) => {
temp_buf.truncate(n);
Ok(Some(temp_buf))
}
None => Ok(None),
}
}
ClientInner::Moq { state_reader, .. } => {
let mut r = state_reader.lock().await;
if let Some(data) = r.read().await? {
Ok(Some(data.to_vec()))
} else {
Ok(None)
}
}
}
})
.await
});
match result {
Ok(Ok(data)) => Ok(data),
Ok(Err(e)) => Err(e),
Err(_) => Ok(None), }
}
fn try_decode_buffered(&self) -> Result<Option<AnyCanFrame>> {
let mut buffer = self.read_buffer.lock().unwrap();
if buffer.len() < wire::FRAME_SIZE {
return Ok(None);
}
match wire::decode(&buffer) {
Ok((frame, consumed)) => {
buffer.drain(..consumed);
Ok(Some(frame))
}
Err(e) => {
tracing::warn!("CAN frame decode error: {}, skipping frame to resync", e);
buffer.drain(..wire::FRAME_SIZE);
Ok(None)
}
}
}
pub fn read_frames<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(AnyCanFrame) -> bool,
{
loop {
match self.read_frame()? {
Some(frame) => {
if !callback(frame) {
break;
}
}
None => continue, }
}
Ok(())
}
}
pub use crate::can_types::AnyCanFrame;
pub use crate::can_types::CanBusSocket;
pub use crate::can_types::CanFdFrame;
pub use crate::can_types::CanFrame;
impl CanBusSocket for RemoteCanSocket {
fn is_open(&self) -> bool {
true
}
fn write_raw(&self, can_id: u32, data: &[u8]) -> anyhow::Result<()> {
let frame = CanFrame::new(can_id, data)?;
let encoded = wire::encode(&AnyCanFrame::Can(frame));
self.runtime.block_on(async {
match &self.client {
ClientInner::Iroh { send, .. } => {
let mut s = send.lock().await;
s.write_all(&encoded).await?;
drop(s);
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
}
ClientInner::Moq { cmd_writer, .. } => {
let mut w = cmd_writer.lock().await;
w.write_stream(encoded.to_vec());
}
}
Ok::<_, anyhow::Error>(())
})
}
fn read_raw(&self) -> anyhow::Result<Option<(u32, Vec<u8>)>> {
if let Some(frame) = self.try_decode_buffered()? {
return Ok(Some((frame.id(), frame.data().to_vec())));
}
let timeout = self.timeout;
let result = self.runtime.block_on(async {
tokio::time::timeout(timeout, async {
let mut temp_buf = vec![0u8; 128];
match &self.client {
ClientInner::Iroh { recv, .. } => {
let mut r = recv.lock().await;
match r.read(&mut temp_buf).await? {
Some(n) => {
temp_buf.truncate(n);
Ok(Some(temp_buf))
}
None => Ok(None),
}
}
ClientInner::Moq { state_reader, .. } => {
let mut r = state_reader.lock().await;
if let Some(data) = r.read().await? {
Ok(Some(data.to_vec()))
} else {
Ok(None)
}
}
}
})
.await
});
match result {
Ok(Ok(Some(data))) => {
{
let mut buffer = self.read_buffer.lock().unwrap();
buffer.extend_from_slice(&data);
}
if let Some(frame) = self.try_decode_buffered()? {
Ok(Some((frame.id(), frame.data().to_vec())))
} else {
Ok(None) }
}
Ok(Ok(None)) => Ok(None), Ok(Err(e)) => Err(e),
Err(_) => Ok(None), }
}
fn is_data_available(&self, _timeout_us: u64) -> anyhow::Result<bool> {
let buffer = self.read_buffer.lock().unwrap();
Ok(buffer.len() >= wire::FRAME_SIZE)
}
fn set_recv_timeout(&mut self, timeout_us: u64) -> anyhow::Result<()> {
self.timeout = std::time::Duration::from_micros(timeout_us);
Ok(())
}
}