use crate::Parallel;
use alloc::vec::Vec;
use async_channel::{Receiver, Sender};
use core::ops::{Deref, DerefMut};
pub struct BufferedChannel<T: Send> {
pub chunk_size: usize,
pool: Parallel<Vec<Vec<T>>>,
}
impl<T: Send> Default for BufferedChannel<T> {
fn default() -> Self {
Self {
chunk_size: 1024,
pool: Parallel::default(),
}
}
}
impl<T: Send> BufferedChannel<T> {
const MAX_POOL_SIZE: usize = 8;
fn recycle(&self, mut chunk: Vec<T>) {
if chunk.capacity() < self.chunk_size {
return;
}
chunk.clear();
let mut pool = self.pool.borrow_local_mut();
if pool.len() < Self::MAX_POOL_SIZE {
pool.push(chunk);
}
}
}
pub struct BufferedReceiver<'a, T: Send> {
channel: &'a BufferedChannel<T>,
rx: Receiver<Vec<T>>,
}
impl<'a, T: Send> BufferedReceiver<'a, T> {
pub async fn recv(&self) -> Result<RecycledVec<'_, T>, async_channel::RecvError> {
let buffer = self.rx.recv().await?;
Ok(RecycledVec {
buffer: Some(buffer),
channel: self.channel,
})
}
pub fn recv_blocking(&self) -> Result<RecycledVec<'_, T>, async_channel::RecvError> {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
let buffer = self.rx.recv_blocking()?;
#[cfg(any(not(feature = "std"), target_family = "wasm"))]
let buffer = bevy_platform::future::block_on(self.rx.recv())?;
Ok(RecycledVec {
buffer: Some(buffer),
channel: self.channel,
})
}
}
impl<'a, T: Send> Clone for BufferedReceiver<'a, T> {
fn clone(&self) -> Self {
Self {
channel: self.channel,
rx: self.rx.clone(),
}
}
}
pub struct RecycledVec<'a, T: Send> {
buffer: Option<Vec<T>>,
channel: &'a BufferedChannel<T>,
}
impl<'a, T: Send> RecycledVec<'a, T> {
pub fn drain(&mut self) -> alloc::vec::Drain<'_, T> {
self.buffer.as_mut().unwrap().drain(..)
}
}
impl<'a, T: Send> Deref for RecycledVec<'a, T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
self.buffer.as_ref().unwrap()
}
}
impl<'a, T: Send> DerefMut for RecycledVec<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer.as_mut().unwrap()
}
}
impl<'a, 'b, T: Send> IntoIterator for &'b RecycledVec<'a, T> {
type Item = &'b T;
type IntoIter = core::slice::Iter<'b, T>;
fn into_iter(self) -> Self::IntoIter {
self.buffer.as_ref().unwrap().iter()
}
}
impl<'a, 'b, T: Send> IntoIterator for &'b mut RecycledVec<'a, T> {
type Item = &'b mut T;
type IntoIter = core::slice::IterMut<'b, T>;
fn into_iter(self) -> Self::IntoIter {
self.buffer.as_mut().unwrap().iter_mut()
}
}
impl<'a, T: Send> Drop for RecycledVec<'a, T> {
fn drop(&mut self) {
if let Some(buffer) = self.buffer.take() {
self.channel.recycle(buffer);
}
}
}
pub struct BufferedSender<'a, T: Send> {
channel: &'a BufferedChannel<T>,
buffer: Option<Vec<T>>,
tx: Sender<Vec<T>>,
}
impl<T: Send> BufferedChannel<T> {
fn get_buffer(&self) -> Vec<T> {
self.pool
.borrow_local_mut()
.pop()
.unwrap_or_else(|| Vec::with_capacity(self.chunk_size))
}
pub fn unbounded(&self) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) {
let (tx, rx) = async_channel::unbounded();
(
BufferedReceiver { channel: self, rx },
BufferedSender {
channel: self,
buffer: None,
tx,
},
)
}
pub fn bounded(&self, cap: usize) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) {
let (tx, rx) = async_channel::bounded(cap);
(
BufferedReceiver { channel: self, rx },
BufferedSender {
channel: self,
buffer: None,
tx,
},
)
}
}
impl<'a, T: Send> BufferedSender<'a, T> {
pub async fn send(&mut self, msg: T) -> Result<(), async_channel::SendError<Vec<T>>> {
let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer());
buffer.push(msg);
if buffer.len() >= self.channel.chunk_size {
let full_buffer = self.buffer.take().unwrap();
self.tx.send(full_buffer).await?;
}
Ok(())
}
pub fn send_blocking(&mut self, msg: T) -> Result<(), async_channel::SendError<Vec<T>>> {
let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer());
buffer.push(msg);
if buffer.len() >= self.channel.chunk_size {
let full_buffer = self.buffer.take().unwrap();
#[cfg(all(feature = "std", not(target_family = "wasm")))]
self.tx.send_blocking(full_buffer)?;
#[cfg(any(not(feature = "std"), target_family = "wasm"))]
bevy_platform::future::block_on(self.tx.send(full_buffer))?;
}
Ok(())
}
pub fn flush(&mut self) {
if let Some(buffer) = self.buffer.take() {
if !buffer.is_empty() {
#[cfg(all(feature = "std", not(target_family = "wasm")))]
let _ = self.tx.send_blocking(buffer);
#[cfg(any(not(feature = "std"), target_family = "wasm"))]
let _ = bevy_platform::future::block_on(self.tx.send(buffer));
} else {
self.channel.recycle(buffer);
}
}
}
}
impl<'a, T: Send> Clone for BufferedSender<'a, T> {
fn clone(&self) -> Self {
Self {
channel: self.channel,
buffer: None,
tx: self.tx.clone(),
}
}
}
impl<'a, T: Send> Drop for BufferedSender<'a, T> {
fn drop(&mut self) {
self.flush();
}
}