use super::*;
use std::io;
use std::ops::{Index, IndexMut, RangeBounds};
use std::slice::{Iter, IterMut, SliceIndex};
use std::vec::{Drain, IntoIter};
use futures_util::future::select_all;
pub struct AsyncStdTun {
queues: Vec<AsyncStdQueue>,
name: String,
}
impl AsyncStdTun {
pub fn new(name: &str, num_queues: usize) -> Result<Self> {
if num_queues < 1 {
return Err(Error::InvalidNumQueues);
}
let (queues, name) = new_queues(name, num_queues)?;
Ok(Self { queues, name })
}
#[inline]
pub fn name(&self) -> &str {
self.name.as_str()
}
#[inline]
pub fn get<I>(&self, index: I) -> Option<&AsyncStdQueue>
where
I: SliceIndex<[AsyncStdQueue], Output = AsyncStdQueue>,
{
self.queues.get(index)
}
#[inline]
pub fn get_mut<I>(&mut self, index: I) -> Option<&mut AsyncStdQueue>
where
I: SliceIndex<[AsyncStdQueue], Output = AsyncStdQueue>,
{
self.queues.get_mut(index)
}
pub fn close(&mut self) -> Result<()> {
for mut queue in self.drain(..) {
queue.close()?;
}
Ok(())
}
#[inline]
pub fn drain<R>(&mut self, range: R) -> Drain<AsyncStdQueue>
where
R: RangeBounds<usize>,
{
self.queues.drain(range)
}
#[inline]
pub fn iter(&self) -> Iter<AsyncStdQueue> {
self.queues.iter()
}
#[inline]
pub fn iter_mut(&mut self) -> IterMut<AsyncStdQueue> {
self.queues.iter_mut()
}
pub async fn send(&self, datagram: &[u8]) -> io::Result<usize> {
loop {
let futures = self.iter().map(|queue| Box::pin(queue.writable()));
let (result, idx, _) = select_all(futures).await;
if let Err(e) = result {
return Err(e);
}
let queue = self
.get(idx)
.ok_or_else(|| Error::InvalidQueue(idx).into_io())?;
match queue.get_ref().send(datagram) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
result => return result,
}
}
}
pub async fn send_via(&self, queue: usize, datagram: &[u8]) -> io::Result<usize> {
self.get(queue)
.ok_or_else(|| Error::InvalidQueue(queue).into_io())?
.send(datagram)
.await
}
pub async fn recv(&self, datagram: &mut [u8]) -> io::Result<usize> {
loop {
let futures = self.iter().map(|queue| Box::pin(queue.readable()));
let (result, idx, _) = select_all(futures).await;
if let Err(e) = result {
return Err(e);
}
let queue = self
.get(idx)
.ok_or_else(|| Error::InvalidQueue(idx).into_io())?;
match queue.get_ref().recv(datagram) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
result => return result,
}
}
}
pub async fn recv_via(&self, queue: usize, datagram: &mut [u8]) -> io::Result<usize> {
self.get(queue)
.ok_or_else(|| Error::InvalidQueue(queue).into_io())?
.recv(datagram)
.await
}
}
impl IntoIterator for AsyncStdTun {
type Item = AsyncStdQueue;
type IntoIter = IntoIter<AsyncStdQueue>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.queues.into_iter()
}
}
impl Index<usize> for AsyncStdTun {
type Output = AsyncStdQueue;
#[inline]
fn index(&self, index: usize) -> &AsyncStdQueue {
self.queues.index(index)
}
}
impl IndexMut<usize> for AsyncStdTun {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut AsyncStdQueue {
self.queues.index_mut(index)
}
}