use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::port_handle::PortHandle;
use crate::protocol::convert::result_to_reply;
use crate::protocol::event::{EventPayload, PortEvent};
use crate::protocol::value::Timestamp;
use crate::protocol::{EventFilter, PortReply, PortRequest};
use crate::request::RequestOp;
use crate::user::AsynUser;
use super::client::{ConnectionState, RuntimeClient};
use super::error::TransportError;
use super::tracker::RequestTracker;
#[derive(Clone)]
pub struct InProcessClient {
handle: PortHandle,
_tracker: Arc<RequestTracker>,
}
impl InProcessClient {
pub fn new(handle: PortHandle) -> Self {
Self {
handle,
_tracker: Arc::new(RequestTracker::new()),
}
}
pub fn handle(&self) -> &PortHandle {
&self.handle
}
fn build_user(req: &PortRequest) -> AsynUser {
let mut user = AsynUser::new(req.meta.reason)
.with_addr(req.meta.addr)
.with_timeout(req.meta.timeout());
user.priority = req.meta.priority.into();
if let Some(token) = req.meta.block_token {
user.block_token = Some(token);
}
user
}
}
impl RuntimeClient for InProcessClient {
fn request(
&self,
req: PortRequest,
) -> Pin<Box<dyn Future<Output = Result<PortReply, TransportError>> + Send + '_>> {
let request_id = req.meta.request_id;
let op = RequestOp::from(&req.command);
let user = Self::build_user(&req);
let handle = self.handle.clone();
Box::pin(async move {
let result = handle
.submit(op, user)
.await
.map_err(TransportError::from)?;
Ok(result_to_reply(&result, request_id))
})
}
fn request_blocking(&self, req: PortRequest) -> Result<PortReply, TransportError> {
let request_id = req.meta.request_id;
let op = RequestOp::from(&req.command);
let user = Self::build_user(&req);
let result = self
.handle
.submit_blocking(op, user)
.map_err(TransportError::from)?;
Ok(result_to_reply(&result, request_id))
}
fn subscribe(
&self,
filter: EventFilter,
) -> Pin<
Box<
dyn Future<Output = Result<tokio::sync::mpsc::Receiver<PortEvent>, TransportError>>
+ Send
+ '_,
>,
> {
let port_name = self.handle.port_name().to_string();
let mut broadcast_rx = self.handle.interrupts().subscribe_async();
Box::pin(async move {
let (tx, rx) = tokio::sync::mpsc::channel(256);
tokio::spawn(async move {
loop {
match broadcast_rx.recv().await {
Ok(iv) => {
if let Some(r) = filter.reason {
if iv.reason != r {
continue;
}
}
if let Some(a) = filter.addr {
if iv.addr != a {
continue;
}
}
let event = PortEvent {
port_name: port_name.clone(),
payload: EventPayload::from(&iv),
timestamp: Timestamp::from(iv.timestamp),
};
if tx.send(event).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
Ok(rx)
})
}
fn connection_state(&self) -> ConnectionState {
ConnectionState::Connected
}
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use std::time::SystemTime;
use super::*;
use crate::interrupt::InterruptValue;
use crate::manager::PortManager;
use crate::param::ParamType;
use crate::port::{PortDriver, PortDriverBase, PortFlags};
use crate::protocol::command::PortCommand;
use crate::protocol::reply::ReplyPayload;
use crate::protocol::request::{ProtocolPriority, RequestMeta};
use crate::protocol::value::ParamValue;
struct TestPort {
base: PortDriverBase,
}
impl TestPort {
fn new() -> Self {
let mut base = PortDriverBase::new("ipc_test", 1, PortFlags::default());
base.create_param("VAL", ParamType::Int32).unwrap();
base.create_param("F64", ParamType::Float64).unwrap();
base.create_param("MSG", ParamType::Octet).unwrap();
Self { base }
}
}
impl PortDriver for TestPort {
fn base(&self) -> &PortDriverBase {
&self.base
}
fn base_mut(&mut self) -> &mut PortDriverBase {
&mut self.base
}
}
fn make_client() -> (PortManager, InProcessClient) {
let mgr = PortManager::new();
let rt_handle = mgr.register_port(TestPort::new());
let client = InProcessClient::new(rt_handle.port_handle().clone());
(mgr, client)
}
fn make_request(cmd: PortCommand, reason: usize) -> PortRequest {
PortRequest {
meta: RequestMeta {
request_id: 1,
port_name: "ipc_test".into(),
addr: 0,
reason,
timeout_ms: 5000,
priority: ProtocolPriority::Medium,
block_token: None,
},
command: cmd,
}
}
#[tokio::test]
async fn int32_write_read_cycle() {
let (_mgr, client) = make_client();
let req = make_request(PortCommand::Int32Write { value: 42 }, 0);
let reply = client.request(req).await.unwrap();
assert_eq!(reply.payload, ReplyPayload::Ack);
let req = make_request(PortCommand::Int32Read, 0);
let reply = client.request(req).await.unwrap();
match reply.payload {
ReplyPayload::Value(ParamValue::Int32(v)) => assert_eq!(v, 42),
_ => panic!("expected Int32 value, got {:?}", reply.payload),
}
}
#[tokio::test]
async fn float64_write_read_cycle() {
let (_mgr, client) = make_client();
let req = make_request(PortCommand::Float64Write { value: 3.14 }, 1);
let reply = client.request(req).await.unwrap();
assert_eq!(reply.payload, ReplyPayload::Ack);
let req = make_request(PortCommand::Float64Read, 1);
let reply = client.request(req).await.unwrap();
match reply.payload {
ReplyPayload::Value(ParamValue::Float64(v)) => assert!((v - 3.14).abs() < 1e-10),
_ => panic!("expected Float64 value"),
}
}
#[test]
fn blocking_int32_cycle() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let (_mgr, client) = rt.block_on(async { make_client() });
let req = make_request(PortCommand::Int32Write { value: 99 }, 0);
let reply = client.request_blocking(req).unwrap();
assert_eq!(reply.payload, ReplyPayload::Ack);
let req = make_request(PortCommand::Int32Read, 0);
let reply = client.request_blocking(req).unwrap();
match reply.payload {
ReplyPayload::Value(ParamValue::Int32(v)) => assert_eq!(v, 99),
_ => panic!("expected Int32 value"),
}
}
#[test]
fn connection_state_connected() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let (_mgr, client) = rt.block_on(async { make_client() });
assert_eq!(client.connection_state(), ConnectionState::Connected);
}
#[tokio::test]
async fn event_subscription() {
let (_mgr, client) = make_client();
let mut rx = client.subscribe(EventFilter::default()).await.unwrap();
let handle = client.handle();
handle.interrupts().notify(InterruptValue {
reason: 0,
addr: 0,
value: crate::param::ParamValue::Int32(77),
timestamp: SystemTime::now(),
uint32_changed_mask: 0,
});
let event = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(event.port_name, "ipc_test");
match event.payload {
EventPayload::ValueChanged {
reason,
addr,
value,
} => {
assert_eq!(reason, 0);
assert_eq!(addr, 0);
assert_eq!(value, ParamValue::Int32(77));
}
_ => panic!("expected ValueChanged"),
}
}
#[tokio::test]
async fn event_subscription_filtered() {
let (_mgr, client) = make_client();
let mut rx = client
.subscribe(EventFilter {
reason: Some(1),
addr: None,
})
.await
.unwrap();
let handle = client.handle();
handle.interrupts().notify(InterruptValue {
reason: 0,
addr: 0,
value: crate::param::ParamValue::Int32(10),
timestamp: SystemTime::now(),
uint32_changed_mask: 0,
});
handle.interrupts().notify(InterruptValue {
reason: 1,
addr: 0,
value: crate::param::ParamValue::Int32(20),
timestamp: SystemTime::now(),
uint32_changed_mask: 0,
});
let event = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.unwrap()
.unwrap();
match event.payload {
EventPayload::ValueChanged { reason, value, .. } => {
assert_eq!(reason, 1);
assert_eq!(value, ParamValue::Int32(20));
}
_ => panic!("expected ValueChanged"),
}
}
}