use std::os::unix::io::{AsRawFd, RawFd};
use tokio::io::Interest;
use tokio::io::unix::AsyncFd;
use crate::async_bridge::ControlHandle;
use crate::error::{BridgeError, Result};
struct RawFdWrapper {
fd: RawFd,
}
impl AsRawFd for RawFdWrapper {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
pub struct AsyncControlHandle<Req, Rsp> {
pub inner: ControlHandle<Req, Rsp>,
req_space_fd: AsyncFd<RawFdWrapper>,
rsp_ready_fd: AsyncFd<RawFdWrapper>,
}
impl<Req, Rsp> AsyncControlHandle<Req, Rsp> {
pub fn new(handle: ControlHandle<Req, Rsp>) -> std::io::Result<Self> {
let req_space_fd = AsyncFd::with_interest(
RawFdWrapper {
fd: handle.request_space_fd(),
},
Interest::READABLE,
)?;
let rsp_ready_fd = AsyncFd::with_interest(
RawFdWrapper {
fd: handle.response_wake_fd(),
},
Interest::READABLE,
)?;
Ok(Self {
inner: handle,
req_space_fd,
rsp_ready_fd,
})
}
pub async fn send_request(&mut self, req: Req) -> Result<()>
where
Req: Clone,
{
match self.inner.try_send_request(req.clone()) {
Ok(()) => return Ok(()),
Err(BridgeError::Full { .. }) => {}
Err(e) => return Err(e),
}
loop {
let mut guard =
self.req_space_fd
.readable()
.await
.map_err(|_| BridgeError::Backpressure {
percent: 100,
threshold: 95,
})?;
let _ = self.inner.req_wake.producer_wake.try_read();
guard.clear_ready();
match self.inner.try_send_request(req.clone()) {
Ok(()) => return Ok(()),
Err(BridgeError::Full { .. }) => continue, Err(e) => return Err(e),
}
}
}
pub async fn recv_response(&mut self) -> Result<Rsp> {
match self.inner.try_recv_response() {
Ok(rsp) => return Ok(rsp),
Err(BridgeError::Empty) => {}
Err(e) => return Err(e),
}
loop {
let mut guard =
self.rsp_ready_fd
.readable()
.await
.map_err(|_| BridgeError::Backpressure {
percent: 0,
threshold: 0,
})?;
let _ = self.inner.rsp_wake.consumer_wake.try_read();
guard.clear_ready();
match self.inner.try_recv_response() {
Ok(rsp) => return Ok(rsp),
Err(BridgeError::Empty) => continue,
Err(e) => return Err(e),
}
}
}
pub fn try_recv_response(&mut self) -> Result<Rsp> {
self.inner.try_recv_response()
}
pub fn pressure(&self) -> crate::backpressure::PressureState {
self.inner.pressure()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::async_bridge::BridgeChannel;
#[tokio::test]
async fn async_send_recv_roundtrip() {
let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
let mut data = bridge.data;
let mut async_control = AsyncControlHandle::new(bridge.control).unwrap();
async_control.send_request(42).await.unwrap();
let req = data.try_recv_request().unwrap();
assert_eq!(req, 42);
data.try_send_response(84).unwrap();
let rsp = async_control.recv_response().await.unwrap();
assert_eq!(rsp, 84);
}
#[tokio::test]
async fn async_send_wakes_on_space() {
let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(4, 4).unwrap();
let mut data = bridge.data;
let mut async_control = AsyncControlHandle::new(bridge.control).unwrap();
for i in 0..4 {
async_control.send_request(i).await.unwrap();
}
let send_task = tokio::spawn(async move {
async_control.send_request(99).await.unwrap();
async_control
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let _ = data.try_recv_request().unwrap();
let _control = tokio::time::timeout(std::time::Duration::from_secs(5), send_task)
.await
.expect("send_request should complete after space freed")
.unwrap();
}
#[tokio::test]
async fn async_recv_wakes_on_data() {
let bridge: BridgeChannel<u64, u64> = BridgeChannel::new(16, 16).unwrap();
let mut data = bridge.data;
let mut async_control = AsyncControlHandle::new(bridge.control).unwrap();
let recv_task = tokio::spawn(async move {
let rsp = async_control.recv_response().await.unwrap();
(async_control, rsp)
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
data.try_send_response(777).unwrap();
let (_control, rsp) = tokio::time::timeout(std::time::Duration::from_secs(5), recv_task)
.await
.expect("recv_response should complete after data sent")
.unwrap();
assert_eq!(rsp, 777);
}
}