use crate::error::*;
use arcon_allocator::{Alloc, AllocId, Allocator};
use crossbeam_utils::CachePadded;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
#[derive(Debug)]
pub struct EventBuffer<T> {
ptr: *mut T,
allocator: Arc<Mutex<Allocator>>,
id: AllocId,
capacity: usize,
free: CachePadded<AtomicBool>,
}
impl<T> EventBuffer<T> {
pub fn new(capacity: usize, allocator: Arc<Mutex<Allocator>>) -> ArconResult<EventBuffer<T>> {
let mut a = allocator.lock().unwrap();
match unsafe { a.alloc::<T>(capacity) } {
Ok(Alloc(id, ptr)) => Ok(EventBuffer {
ptr: ptr as *mut T,
allocator: allocator.clone(),
id,
capacity,
free: AtomicBool::new(true).into(),
}),
Err(err) => Err(Error::Unsupported {
msg: err.to_string(),
}),
}
}
pub fn as_ptr(&self) -> *const T {
self.ptr
}
pub fn as_mut_ptr(&self) -> *mut T {
self.ptr
}
#[inline]
pub fn push(&self, value: T, len: usize) -> Option<T> {
if len >= self.capacity {
Some(value)
} else {
unsafe {
std::ptr::write(self.ptr.add(len), value);
};
None
}
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn release(&self) {
self.free.store(true, Ordering::Relaxed);
}
#[inline]
pub fn try_reserve(&self) -> bool {
match self
.free
.compare_exchange_weak(true, false, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(res) => res,
Err(res) => res,
}
}
}
unsafe impl<T: Sync> Sync for EventBuffer<T> {}
unsafe impl<T: Send> Send for EventBuffer<T> {}
impl<T> Drop for EventBuffer<T> {
fn drop(&mut self) {
let mut allocator = self.allocator.lock().unwrap();
unsafe { allocator.dealloc(self.id) };
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct BufferWriter<T> {
buffer: Arc<EventBuffer<T>>,
len: usize,
}
impl<T> BufferWriter<T> {
#[inline]
pub fn new(buffer: Arc<EventBuffer<T>>, len: usize) -> BufferWriter<T> {
BufferWriter { buffer, len }
}
#[inline]
pub fn push(&mut self, value: T) -> Option<T> {
if let Some(v) = (*self.buffer).push(value, self.len) {
Some(v)
} else {
self.len += 1;
None
}
}
#[inline]
pub fn as_ptr(&self) -> *const T {
self.buffer.as_ptr()
}
#[inline]
pub fn reader(&self) -> BufferReader<T> {
BufferReader {
buffer: self.buffer.clone(),
len: self.len,
}
}
pub fn copy_from_writer(&mut self, other: &BufferWriter<T>) {
let other_ptr = other.as_ptr();
let other_len = other.len();
unsafe {
std::ptr::copy(other_ptr, (*self.buffer).as_mut_ptr(), other_len);
};
self.len = other_len;
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
#[derive(Debug, Clone)]
pub struct BufferReader<T> {
buffer: Arc<EventBuffer<T>>,
len: usize,
}
impl<T> BufferReader<T> {
#[inline]
fn release(&self) {
(*self.buffer).release();
}
#[inline]
pub fn as_ptr(&self) -> *const T {
self.buffer.as_ptr()
}
#[inline]
pub fn as_slice(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
}
#[inline]
#[allow(clippy::uninit_vec)]
pub fn to_vec(&self) -> Vec<T> {
let mut dst = Vec::with_capacity(self.len);
unsafe {
dst.set_len(self.len);
std::ptr::copy(self.as_ptr(), dst.as_mut_ptr(), self.len);
};
dst
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
}
impl<T> Drop for BufferReader<T> {
fn drop(&mut self) {
self.release();
}
}
pub struct IntoIter<A> {
reader: BufferReader<A>,
current: usize,
end: usize,
}
impl<A> Iterator for IntoIter<A> {
type Item = A;
#[inline]
fn next(&mut self) -> Option<A> {
if self.current == self.end {
None
} else {
unsafe {
let current = self.current;
self.current += 1;
Some(std::ptr::read(self.reader.as_ptr().add(current)))
}
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let size = self.end - self.current;
(size, Some(size))
}
}
impl<A> IntoIterator for BufferReader<A> {
type IntoIter = IntoIter<A>;
type Item = A;
fn into_iter(self) -> Self::IntoIter {
let len = self.len();
IntoIter {
reader: self,
current: 0,
end: len,
}
}
}
#[cfg(test)]
impl<T> From<Vec<T>> for BufferReader<T> {
fn from(v: Vec<T>) -> BufferReader<T> {
let event_buffer = EventBuffer::<T>::new(v.len(), crate::test_utils::ALLOCATOR.clone())
.expect("Failed to alloc memory");
let mut writer = BufferWriter {
buffer: Arc::new(event_buffer),
len: 0,
};
for value in v.into_iter() {
writer.push(value);
}
writer.reader()
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct PoolInfo {
pub(crate) buffer_size: usize,
pub(crate) capacity: usize,
pub(crate) allocator: Arc<Mutex<Allocator>>,
}
impl PoolInfo {
pub fn new(buffer_size: usize, capacity: usize, allocator: Arc<Mutex<Allocator>>) -> PoolInfo {
PoolInfo {
buffer_size,
capacity,
allocator,
}
}
}
#[allow(dead_code)]
pub struct BufferPool<T> {
allocator: Arc<Mutex<Allocator>>,
buffer_size: usize,
buffers: Vec<Arc<EventBuffer<T>>>,
curr_buffer: usize,
}
impl<T> BufferPool<T> {
#[inline]
pub fn new(
capacity: usize,
buffer_size: usize,
allocator: Arc<Mutex<Allocator>>,
) -> ArconResult<BufferPool<T>> {
let mut buffers: Vec<Arc<EventBuffer<T>>> = Vec::with_capacity(capacity);
for _ in 0..capacity {
let buffer: EventBuffer<T> = EventBuffer::new(buffer_size, allocator.clone())?;
buffers.push(Arc::new(buffer));
}
Ok(BufferPool {
allocator,
buffer_size,
buffers,
curr_buffer: 0,
})
}
#[inline]
pub fn try_get(&mut self) -> Option<BufferWriter<T>> {
let buf = &self.buffers[self.curr_buffer];
let mut opt = None;
if buf.try_reserve() {
opt = Some(BufferWriter::new(buf.clone(), 0))
}
self.index_incr();
opt
}
#[inline]
pub fn get(&mut self) -> BufferWriter<T> {
loop {
match self.try_get() {
None => {}
Some(v) => return v,
}
}
}
#[inline]
fn index_incr(&mut self) {
self.curr_buffer += 1;
if self.curr_buffer == self.buffers.capacity() {
self.curr_buffer = 0;
}
}
#[inline]
#[allow(dead_code)]
pub fn capacity(&self) -> usize {
self.buffers.capacity()
}
#[inline]
#[allow(dead_code)]
pub fn buffer_size(&self) -> usize {
self.buffer_size
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn event_buffer_test() {
let total_bytes = 1024;
let allocator = Arc::new(Mutex::new(Allocator::new(total_bytes)));
let items: Vec<u64> = vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
{
let buffer: EventBuffer<u64> = EventBuffer::new(10, allocator.clone()).unwrap();
let mut writer = BufferWriter {
buffer: Arc::new(buffer),
len: 0,
};
for item in items.clone() {
writer.push(item);
}
assert_eq!(writer.push(10_u64), Some(10));
let reader = writer.reader();
assert_eq!(*reader.as_slice(), *items);
}
let a = allocator.lock().unwrap();
assert_eq!(a.total_allocations(), 1);
assert_eq!(a.bytes_remaining(), total_bytes);
}
#[test]
fn buffer_pool_test() {
let allocator = Arc::new(Mutex::new(Allocator::new(10024)));
let buffer_size = 100;
let pool_capacity = 2;
let mut pool: BufferPool<u64> =
BufferPool::new(pool_capacity, buffer_size, allocator).unwrap();
let mut buffer = pool.try_get().unwrap();
for i in 0..buffer_size {
buffer.push(i as u64);
}
let reader_one = buffer.reader();
let data = reader_one.as_slice();
assert_eq!(data.len(), buffer_size);
let buffer = pool.try_get().unwrap();
{
let reader_two = buffer.reader();
let data = reader_two.as_slice();
assert_eq!(data.len(), 0);
assert!(pool.try_get().is_none());
}
assert!(pool.try_get().is_some());
}
}