use bytes::Bytes;
use libc;
use std::mem;
use std::slice;
use super::datatypes::*;
use super::memory::*;
pub struct Buffer<T>
where
T: ArrowPrimitiveType,
{
data: *const T,
len: usize,
}
impl<T> Buffer<T>
where
T: ArrowPrimitiveType,
{
pub unsafe fn from_raw_parts(data: *const T, len: usize) -> Self {
Buffer { data, len }
}
pub fn len(&self) -> usize {
self.len
}
pub fn data(&self) -> *const T {
self.data
}
pub fn slice(&self, start: usize, end: usize) -> &[T] {
assert!(end <= self.len);
assert!(start <= end);
unsafe { slice::from_raw_parts(self.data.offset(start as isize), end - start) }
}
pub fn get(&self, i: usize) -> &T {
assert!(i < self.len);
unsafe { &(*self.data.offset(i as isize)) }
}
pub fn set(&mut self, i: usize, v: T) {
assert!(i < self.len);
let p = self.data as *mut T;
unsafe {
*p.offset(i as isize) = v;
}
}
pub fn iter(&self) -> BufferIterator<T> {
BufferIterator {
data: self.data,
len: self.len,
index: 0,
}
}
}
impl<T> Drop for Buffer<T>
where
T: ArrowPrimitiveType,
{
fn drop(&mut self) {
if !self.data.is_null() {
free_aligned(self.data as *const u8);
self.data = std::ptr::null_mut();
}
}
}
pub struct BufferIterator<T>
where
T: ArrowPrimitiveType,
{
data: *const T,
len: usize,
index: isize,
}
impl<T> Iterator for BufferIterator<T>
where
T: ArrowPrimitiveType,
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.len as isize {
let value = unsafe { *self.data.offset(self.index) };
self.index += 1;
Some(value)
} else {
None
}
}
}
impl<T> From<Vec<T>> for Buffer<T>
where
T: ArrowPrimitiveType,
{
fn from(v: Vec<T>) -> Self {
let len = v.len();
let sz = mem::size_of::<T>();
let buffer = allocate_aligned((len * sz) as i64).unwrap();
Buffer {
len,
data: unsafe {
let dst = mem::transmute::<*const u8, *mut libc::c_void>(buffer);
libc::memcpy(
dst,
mem::transmute::<*const T, *const libc::c_void>(v.as_ptr()),
len * sz,
);
mem::transmute::<*mut libc::c_void, *const T>(dst)
},
}
}
}
impl From<Bytes> for Buffer<u8> {
fn from(bytes: Bytes) -> Self {
let len = bytes.len();
let sz = mem::size_of::<u8>();
let buf_mem = allocate_aligned((len * sz) as i64).unwrap();
let dst = buf_mem as *mut libc::c_void;
Buffer {
len,
data: unsafe {
libc::memcpy(dst, bytes.as_ptr() as *const libc::c_void, len * sz);
dst as *mut u8
},
}
}
}
unsafe impl<T: ArrowPrimitiveType> Sync for Buffer<T> {}
unsafe impl<T: ArrowPrimitiveType> Send for Buffer<T> {}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_buffer_i32() {
let b: Buffer<i32> = Buffer::from(vec![1, 2, 3, 4, 5]);
assert_eq!(5, b.len);
}
#[test]
fn test_iterator_i32() {
let b: Buffer<i32> = Buffer::from(vec![1, 2, 3, 4, 5]);
let it = b.iter();
let v: Vec<i32> = it.map(|n| n + 1).collect();
assert_eq!(vec![2, 3, 4, 5, 6], v);
}
#[test]
fn test_buffer_eq() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
let b = Buffer::from(vec![5, 4, 3, 2, 1]);
let c = a.iter()
.zip(b.iter())
.map(|(a, b)| a == b)
.collect::<Vec<bool>>();
assert_eq!(c, vec![false, false, true, false, false]);
}
#[test]
fn test_buffer_lt() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
let b = Buffer::from(vec![5, 4, 3, 2, 1]);
let c = a.iter()
.zip(b.iter())
.map(|(a, b)| a < b)
.collect::<Vec<bool>>();
assert_eq!(c, vec![true, true, false, false, false]);
}
#[test]
fn test_buffer_gt() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
let b = Buffer::from(vec![5, 4, 3, 2, 1]);
let c = a.iter()
.zip(b.iter())
.map(|(a, b)| a > b)
.collect::<Vec<bool>>();
assert_eq!(c, vec![false, false, false, true, true]);
}
#[test]
fn test_buffer_add() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
let b = Buffer::from(vec![5, 4, 3, 2, 1]);
let c = a.iter()
.zip(b.iter())
.map(|(a, b)| a + b)
.collect::<Vec<i32>>();
assert_eq!(c, vec![6, 6, 6, 6, 6]);
}
#[test]
fn test_buffer_multiply() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
let b = Buffer::from(vec![5, 4, 3, 2, 1]);
let c = a.iter()
.zip(b.iter())
.map(|(a, b)| a * b)
.collect::<Vec<i32>>();
assert_eq!(c, vec![5, 8, 9, 8, 5]);
}
#[test]
#[should_panic]
fn test_get_out_of_bounds() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
a.get(123);
}
#[test]
fn slice_empty_at_end() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
let s = a.slice(5, 5);
assert_eq!(0, s.len());
}
#[test]
#[should_panic]
fn slice_start_out_of_bounds() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
a.slice(6, 6);
}
#[test]
#[should_panic]
fn slice_end_out_of_bounds() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
a.slice(0, 6);
}
#[test]
#[should_panic]
fn slice_end_before_start() {
let a = Buffer::from(vec![1, 2, 3, 4, 5]);
a.slice(3, 2);
}
#[test]
fn test_access_buffer_concurrently() {
let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
assert_eq!(vec![1, 2, 3, 4, 5], buffer.iter().collect::<Vec<i32>>());
let collected_vec = thread::spawn(move || {
buffer.iter().collect::<Vec<i32>>()
}).join();
assert!(collected_vec.is_ok());
assert_eq!(vec![1, 2, 3, 4, 5], collected_vec.ok().unwrap());
}
}