emcyphal 0.1.0

Async Cyphal/CAN network stack for no_std environments
Documentation
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use emcyphal::buffer;
use emcyphal::core::{NodeId, Priority, PrioritySet, ServiceId};
use emcyphal::data_types::ByteArray;
use emcyphal::frame::{Data, DataSpecifier, Frame, Header, Mtu};
use emcyphal::node::{CoreNode, Hub};
use emcyphal::socket::Responder;
use emcyphal::time::{Duration, Instant};
use emcyphal_driver::link::{self, FilterUpdate};
use futures_executor::LocalPool;
use futures_task::LocalSpawn;
use std::boxed::Box;
use std::sync::atomic::{AtomicBool, Ordering};

const REQUESTER_NODE: NodeId = NodeId::new(10).unwrap();
const RESPONDER_NODE: NodeId = NodeId::new(20).unwrap();
const SERVICE_ID: ServiceId = ServiceId::new(100).unwrap();
const TIMEOUT: Duration = Duration::from_micros(2_000_000);
const PRIORITY: Priority = Priority::Nominal;
const MTU: Mtu = Mtu::Classic;

#[test]
fn test_rx() {
    let mut executor = LocalPool::new();
    let spawner = executor.spawner();

    let (hub, (rx_filter, rx, tx)) = {
        let node = CoreNode::<CriticalSectionRawMutex>::new(RESPONDER_NODE, 1);
        let node = Box::leak(Box::new(node));
        let (hub, link) = node.split();
        (hub, link.split())
    };

    let complete = Box::leak(Box::new(AtomicBool::new(false)));
    spawner
        .spawn_local_obj(Box::new(run_responder(hub)).into())
        .unwrap();
    spawner
        .spawn_local_obj(Box::new(send_request(rx_filter, rx)).into())
        .unwrap();
    spawner
        .spawn_local_obj(Box::new(test_response(tx, complete)).into())
        .unwrap();
    executor.run_until_stalled();

    assert!(complete.load(Ordering::SeqCst));
}

async fn run_responder(hub: Hub<'static>) {
    let mut req_buffer: buffer::rx_req::PriorityFifo<ByteArray, 10, 10> = Default::default();
    let mut resp_buffer: buffer::tx_resp::Blocking<ByteArray> = Default::default();
    let mut responder = Responder::create(
        hub,
        &mut req_buffer,
        &mut resp_buffer,
        SERVICE_ID,
        TIMEOUT,
        TIMEOUT,
    )
    .unwrap();

    responder
        .run(async |req| {
            let mut bytes = req.bytes.clone();
            for val in bytes.iter_mut() {
                *val = val.wrapping_add(1);
            }
            ByteArray { bytes }
        })
        .await;
}

async fn send_request(mut rx_filter: link::RxFilter<'static>, mut rx: link::Rx<'static>) {
    let subject_update = rx_filter.pop().await;
    assert_eq!(subject_update, FilterUpdate::AddDestination(RESPONDER_NODE));

    let frame = Frame {
        header: Header {
            priority: PRIORITY,
            data_spec: DataSpecifier::Request(SERVICE_ID),
            destination: Some(RESPONDER_NODE),
            source: Some(REQUESTER_NODE),
        },
        data: Data::new(&[0x01, 0x00, 0x02, 0b1110_0101]).unwrap(),
        timestamp: ts(1000),
        loop_back: false,
    };
    rx.push(frame, MTU).await;
}

async fn test_response(mut tx: link::Tx<'static>, complete: &'static AtomicBool) {
    let ref_envelope = Frame {
        header: Header {
            priority: PRIORITY,
            data_spec: DataSpecifier::Response(SERVICE_ID),
            destination: Some(REQUESTER_NODE),
            source: Some(RESPONDER_NODE),
        },
        data: Data::new(&[0x01, 0x00, 0x03, 0b1110_0101]).unwrap(),
        timestamp: ts(1000) + TIMEOUT,
        loop_back: false,
    };
    let test_envelope = tx.pop(PrioritySet::new_eq(PRIORITY), MTU).await;
    assert_eq!(test_envelope, ref_envelope);

    complete.store(true, Ordering::SeqCst);
}

fn ts(us: u64) -> Instant {
    Instant::MIN.saturating_add(Duration::from_micros(us))
}