use crate::error::*;
use arcon_allocator::{Alloc, AllocId, Allocator};
use kompact::net::buffers::Chunk;
use std::sync::{Arc, Mutex};
pub(crate) struct NetworkBuffer {
ptr: *mut u8,
allocator: Arc<Mutex<Allocator>>,
id: AllocId,
capacity: usize,
}
impl NetworkBuffer {
#[inline]
#[allow(dead_code)]
pub fn new(capacity: usize, allocator: Arc<Mutex<Allocator>>) -> ArconResult<NetworkBuffer> {
let mut a = allocator.lock().unwrap();
match unsafe { a.alloc::<u8>(capacity) } {
Ok(Alloc(id, ptr)) => Ok(NetworkBuffer {
ptr,
allocator: allocator.clone(),
id,
capacity,
}),
Err(err) => Err(Error::Unsupported {
msg: err.to_string(),
}),
}
}
#[inline]
#[allow(dead_code)]
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Drop for NetworkBuffer {
fn drop(&mut self) {
let mut allocator = self.allocator.lock().unwrap();
unsafe { allocator.dealloc(self.id) };
}
}
unsafe impl Send for NetworkBuffer {}
impl Chunk for NetworkBuffer {
fn as_mut_ptr(&mut self) -> *mut u8 {
self.ptr
}
fn len(&self) -> usize {
self.capacity
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn network_buffer_test() {
let total_bytes = 1024;
let allocator = Arc::new(Mutex::new(Allocator::new(total_bytes)));
{
let buffer: NetworkBuffer = NetworkBuffer::new(512, allocator.clone()).unwrap();
assert_eq!(buffer.capacity(), 512);
}
let a = allocator.lock().unwrap();
assert_eq!(a.total_allocations(), 1);
assert_eq!(a.bytes_remaining(), total_bytes);
}
}