use crate::{MctxError, Publication, SendReport};
use std::io;
#[cfg(not(unix))]
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum TokioSendError {
#[error("MCTX: tokio readiness failed: {0}")]
Readiness(io::Error),
#[error(transparent)]
Send(#[from] MctxError),
}
#[derive(Debug)]
pub struct TokioPublication {
#[cfg(unix)]
inner: tokio::io::unix::AsyncFd<Publication>,
#[cfg(not(unix))]
inner: Publication,
#[cfg(not(unix))]
poll_interval: Duration,
}
impl TokioPublication {
pub fn new(publication: Publication) -> io::Result<Self> {
#[cfg(unix)]
{
Ok(Self {
inner: tokio::io::unix::AsyncFd::new(publication)?,
})
}
#[cfg(not(unix))]
{
Ok(Self {
inner: publication,
poll_interval: Duration::from_millis(10),
})
}
}
pub fn publication(&self) -> &Publication {
#[cfg(unix)]
{
self.inner.get_ref()
}
#[cfg(not(unix))]
{
&self.inner
}
}
pub fn into_publication(self) -> Publication {
#[cfg(unix)]
{
self.inner.into_inner()
}
#[cfg(not(unix))]
{
self.inner
}
}
#[cfg(not(unix))]
pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
self.poll_interval = poll_interval;
self
}
pub async fn send(&self, payload: &[u8]) -> Result<SendReport, TokioSendError> {
#[cfg(unix)]
{
loop {
let mut readiness = self
.inner
.writable()
.await
.map_err(TokioSendError::Readiness)?;
match self.inner.get_ref().send(payload) {
Ok(report) => return Ok(report),
Err(error) if error.is_would_block() => readiness.clear_ready(),
Err(error) => return Err(TokioSendError::Send(error)),
}
}
}
#[cfg(not(unix))]
{
loop {
match self.inner.send(payload) {
Ok(report) => return Ok(report),
Err(error) if error.is_would_block() => {
tokio::time::sleep(self.poll_interval).await
}
Err(error) => return Err(TokioSendError::Send(error)),
}
}
}
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use super::*;
use crate::test_support::{TEST_GROUP, recv_payload, test_multicast_receiver};
use crate::{Context, PublicationConfig};
#[tokio::test]
async fn tokio_publication_sends_a_packet() {
let (receiver, port) = test_multicast_receiver();
let mut context = Context::new();
let id = context
.add_publication(PublicationConfig::new(TEST_GROUP, port))
.unwrap();
let publication = context.take_publication(id).unwrap();
let publication = TokioPublication::new(publication).unwrap();
publication.send(b"tokio hello").await.unwrap();
let payload = recv_payload(&receiver);
assert_eq!(payload, b"tokio hello");
}
}