cspcl 0.5.0

Rust bindings for the cspcl library
#![cfg(feature = "async-tokio")]

use std::sync::{Mutex, MutexGuard};

use cspcl::async_api::AsyncCspcl;
use cspcl::{Cspcl, CspclConfig, Error, Interface, InterfaceName};

static TEST_GUARD: Mutex<()> = Mutex::new(());

fn test_lock() -> MutexGuard<'static, ()> {
    TEST_GUARD
        .lock()
        .unwrap_or_else(|poisoned| poisoned.into_inner())
}

fn test_instance() -> Cspcl {
    Cspcl::from_config(
        CspclConfig::new(7).with_interface(Interface::Loopback(InterfaceName::new("loopback"))),
    )
    .expect("failed to initialize test cspcl instance")
}

fn assert_timeout(err: Error) {
    assert_eq!(
        err.code(),
        cspcl::cspcl_sys::cspcl_error_t_CSPCL_ERR_TIMEOUT
    );
}

fn assert_not_initialized(err: Error) {
    assert_eq!(
        err.code(),
        cspcl::cspcl_sys::cspcl_error_t_CSPCL_ERR_NOT_INITIALIZED
    );
}

fn assert_invalid_param(err: Error) {
    assert_eq!(
        err.code(),
        cspcl::cspcl_sys::cspcl_error_t_CSPCL_ERR_INVALID_PARAM
    );
}

#[tokio::test(flavor = "multi_thread")]
async fn async_wrappers_can_be_constructed_from_sync_runtime() {
    let _guard = test_lock();
    let cspcl = test_instance();
    let async_cspcl = AsyncCspcl::from_sync(cspcl.clone());
    let (_sender, _receiver) = async_cspcl.split();

    assert!(async_cspcl.is_initialized());
    assert_eq!(async_cspcl.local_addr(), cspcl.local_addr());
    assert_eq!(async_cspcl.connection_stats(), cspcl.connection_stats());
}

#[tokio::test(flavor = "multi_thread")]
async fn async_receiver_times_out_without_pending_bundle() {
    let _guard = test_lock();
    let cspcl = test_instance();
    let async_receiver = AsyncCspcl::from_sync(cspcl).receiver();

    assert_timeout(async_receiver.recv_bundle(5).await.unwrap_err());
}

#[tokio::test(flavor = "multi_thread")]
async fn async_receiver_into_times_out_without_pending_bundle() {
    let _guard = test_lock();
    let cspcl = test_instance();
    let async_receiver = AsyncCspcl::from_sync(cspcl).receiver();
    let mut buffer = [0_u8; 64];

    assert_timeout(
        async_receiver
            .recv_bundle_into(&mut buffer, 5)
            .await
            .unwrap_err(),
    );
}

#[tokio::test(flavor = "multi_thread")]
async fn async_receive_observes_sync_shutdown() {
    let _guard = test_lock();
    let cspcl = test_instance();
    let async_receiver = AsyncCspcl::from_sync(cspcl.clone()).receiver();
    let mut buffer = [0_u8; 64];

    cspcl.shutdown().unwrap();

    assert_not_initialized(async_receiver.recv_bundle(5).await.unwrap_err());
    assert_not_initialized(
        async_receiver
            .recv_bundle_into(&mut buffer, 5)
            .await
            .unwrap_err(),
    );
}

#[tokio::test(flavor = "multi_thread")]
async fn async_sender_can_send_and_report_stats() {
    let _guard = test_lock();
    let cspcl = test_instance();
    let async_cspcl = AsyncCspcl::from_sync(cspcl.clone());
    let sender = async_cspcl.sender();
    let sender_clone = sender.clone();

    sender.send_bundle(&[1, 2, 3], 42, 10).await.unwrap();
    sender_clone.send_bundle(&[4, 5, 6], 42, 10).await.unwrap();

    let stats = async_cspcl.connection_stats();
    assert!(stats.misses >= 1);
    assert!(stats.hits >= 1);
    assert_eq!(sender.connection_stats(), stats);
}

#[tokio::test(flavor = "multi_thread")]
async fn async_sender_rejects_invalid_input_and_shutdown() {
    let _guard = test_lock();
    let cspcl = test_instance();
    let async_cspcl = AsyncCspcl::from_sync(cspcl.clone());
    let sender = async_cspcl.sender();

    assert_invalid_param(sender.send_bundle(&[], 42, 10).await.unwrap_err());

    async_cspcl.shutdown().await.unwrap();

    assert_not_initialized(sender.send_bundle(&[1, 2, 3], 42, 10).await.unwrap_err());
    assert!(!cspcl.is_initialized());
}