use crate::rt::async_support::StreamVtable;
use crate::rt::Cleanup;
use std::alloc::Layout;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::vec::Vec;
pub struct AbiBuffer<T: 'static> {
rust_storage: Vec<MaybeUninit<T>>,
vtable: &'static StreamVtable<T>,
alloc: Option<Cleanup>,
cursor: usize,
}
impl<T: 'static> AbiBuffer<T> {
pub(crate) fn new(mut vec: Vec<T>, vtable: &'static StreamVtable<T>) -> AbiBuffer<T> {
assert_eq!(vtable.lower.is_some(), vtable.lift.is_some());
let rust_storage = unsafe {
let ptr = vec.as_mut_ptr();
let len = vec.len();
let cap = vec.capacity();
mem::forget(vec);
Vec::<MaybeUninit<T>>::from_raw_parts(ptr.cast(), len, cap)
};
let alloc = vtable.lower.and_then(|lower| {
let layout = Layout::from_size_align(
vtable.layout.size() * rust_storage.len(),
vtable.layout.align(),
)
.unwrap();
let (mut ptr, cleanup) = Cleanup::new(layout);
let cleanup = cleanup?;
unsafe {
for item in rust_storage.iter() {
let item = item.assume_init_read();
lower(item, ptr);
ptr = ptr.add(vtable.layout.size());
}
}
Some(cleanup)
});
AbiBuffer {
rust_storage,
alloc,
vtable,
cursor: 0,
}
}
pub(crate) fn abi_ptr_and_len(&self) -> (*const u8, usize) {
if self.vtable.lower.is_none() {
let ptr = unsafe { self.rust_storage.as_ptr().add(self.cursor).cast() };
let len = self.rust_storage.len() - self.cursor;
return (ptr, len.try_into().unwrap());
}
let ptr = self
.alloc
.as_ref()
.map(|c| c.ptr.as_ptr())
.unwrap_or(ptr::null_mut());
(
unsafe { ptr.add(self.cursor * self.vtable.layout.size()) },
self.rust_storage.len() - self.cursor,
)
}
pub fn into_vec(mut self) -> Vec<T> {
self.take_vec()
}
pub fn remaining(&self) -> usize {
self.rust_storage.len() - self.cursor
}
pub(crate) fn advance(&mut self, amt: usize) {
assert!(amt + self.cursor <= self.rust_storage.len());
let Some(dealloc_lists) = self.vtable.dealloc_lists else {
self.cursor += amt;
return;
};
let (mut ptr, len) = self.abi_ptr_and_len();
assert!(amt <= len);
for _ in 0..amt {
unsafe {
dealloc_lists(ptr.cast_mut());
ptr = ptr.add(self.vtable.layout.size());
}
}
self.cursor += amt;
}
fn take_vec(&mut self) -> Vec<T> {
if let Some(lift) = self.vtable.lift {
let (mut ptr, mut len) = self.abi_ptr_and_len();
unsafe {
for dst in self.rust_storage[self.cursor..].iter_mut() {
dst.write(lift(ptr.cast_mut()));
ptr = ptr.add(self.vtable.layout.size());
len -= 1;
}
assert_eq!(len, 0);
}
}
let mut storage = mem::take(&mut self.rust_storage);
storage.drain(..self.cursor);
self.cursor = 0;
self.alloc = None;
unsafe {
let ptr = storage.as_mut_ptr();
let len = storage.len();
let cap = storage.capacity();
mem::forget(storage);
Vec::<T>::from_raw_parts(ptr.cast(), len, cap)
}
}
}
impl<T> Drop for AbiBuffer<T> {
fn drop(&mut self) {
let _ = self.take_vec();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::vec;
extern "C" fn cancel(_: u32) -> u32 {
todo!()
}
extern "C" fn drop(_: u32) {
todo!()
}
extern "C" fn new() -> u64 {
todo!()
}
extern "C" fn start_read(_: u32, _: *mut u8, _: usize) -> u32 {
todo!()
}
extern "C" fn start_write(_: u32, _: *const u8, _: usize) -> u32 {
todo!()
}
static BLANK: StreamVtable<u8> = StreamVtable {
cancel_read: cancel,
cancel_write: cancel,
drop_readable: drop,
drop_writable: drop,
dealloc_lists: None,
lift: None,
lower: None,
layout: unsafe { Layout::from_size_align_unchecked(1, 1) },
new,
start_read,
start_write,
};
#[test]
fn blank_advance_to_end() {
let mut buffer = AbiBuffer::new(vec![1, 2, 3, 4], &BLANK);
assert_eq!(buffer.remaining(), 4);
buffer.advance(1);
assert_eq!(buffer.remaining(), 3);
buffer.advance(2);
assert_eq!(buffer.remaining(), 1);
buffer.advance(1);
assert_eq!(buffer.remaining(), 0);
assert_eq!(buffer.into_vec(), []);
}
#[test]
fn blank_advance_partial() {
let buffer = AbiBuffer::new(vec![1, 2, 3, 4], &BLANK);
assert_eq!(buffer.into_vec(), [1, 2, 3, 4]);
let mut buffer = AbiBuffer::new(vec![1, 2, 3, 4], &BLANK);
buffer.advance(1);
assert_eq!(buffer.into_vec(), [2, 3, 4]);
let mut buffer = AbiBuffer::new(vec![1, 2, 3, 4], &BLANK);
buffer.advance(1);
buffer.advance(2);
assert_eq!(buffer.into_vec(), [4]);
}
#[test]
fn blank_ptr_eq() {
let mut buf = vec![1, 2, 3, 4];
let ptr = buf.as_mut_ptr();
let mut buffer = AbiBuffer::new(buf, &BLANK);
let (a, b) = buffer.abi_ptr_and_len();
assert_eq!(a, ptr);
assert_eq!(b, 4);
unsafe {
assert_eq!(std::slice::from_raw_parts(a, b), [1, 2, 3, 4]);
}
buffer.advance(1);
let (a, b) = buffer.abi_ptr_and_len();
assert_eq!(a, ptr.wrapping_add(1));
assert_eq!(b, 3);
unsafe {
assert_eq!(std::slice::from_raw_parts(a, b), [2, 3, 4]);
}
buffer.advance(2);
let (a, b) = buffer.abi_ptr_and_len();
assert_eq!(a, ptr.wrapping_add(3));
assert_eq!(b, 1);
unsafe {
assert_eq!(std::slice::from_raw_parts(a, b), [4]);
}
let ret = buffer.into_vec();
assert_eq!(ret, [4]);
assert_eq!(ret.as_ptr(), ptr);
}
#[derive(PartialEq, Eq, Debug)]
struct B(u8);
static OP: StreamVtable<B> = StreamVtable {
cancel_read: cancel,
cancel_write: cancel,
drop_readable: drop,
drop_writable: drop,
dealloc_lists: Some(|_ptr| {}),
lift: Some(|ptr| unsafe { B(*ptr - 1) }),
lower: Some(|b, ptr| unsafe {
*ptr = b.0 + 1;
}),
layout: unsafe { Layout::from_size_align_unchecked(1, 1) },
new,
start_read,
start_write,
};
#[test]
fn op_advance_to_end() {
let mut buffer = AbiBuffer::new(vec![B(1), B(2), B(3), B(4)], &OP);
assert_eq!(buffer.remaining(), 4);
buffer.advance(1);
assert_eq!(buffer.remaining(), 3);
buffer.advance(2);
assert_eq!(buffer.remaining(), 1);
buffer.advance(1);
assert_eq!(buffer.remaining(), 0);
assert_eq!(buffer.into_vec(), []);
}
#[test]
fn op_advance_partial() {
let buffer = AbiBuffer::new(vec![B(1), B(2), B(3), B(4)], &OP);
assert_eq!(buffer.into_vec(), [B(1), B(2), B(3), B(4)]);
let mut buffer = AbiBuffer::new(vec![B(1), B(2), B(3), B(4)], &OP);
buffer.advance(1);
assert_eq!(buffer.into_vec(), [B(2), B(3), B(4)]);
let mut buffer = AbiBuffer::new(vec![B(1), B(2), B(3), B(4)], &OP);
buffer.advance(1);
buffer.advance(2);
assert_eq!(buffer.into_vec(), [B(4)]);
}
#[test]
fn op_ptrs() {
let mut buf = vec![B(1), B(2), B(3), B(4)];
let ptr = buf.as_mut_ptr().cast::<u8>();
let mut buffer = AbiBuffer::new(buf, &OP);
let (a, b) = buffer.abi_ptr_and_len();
let base = a;
assert_ne!(a, ptr);
assert_eq!(b, 4);
unsafe {
assert_eq!(std::slice::from_raw_parts(a, b), [2, 3, 4, 5]);
}
buffer.advance(1);
let (a, b) = buffer.abi_ptr_and_len();
assert_ne!(a, ptr.wrapping_add(1));
assert_eq!(a, base.wrapping_add(1));
assert_eq!(b, 3);
unsafe {
assert_eq!(std::slice::from_raw_parts(a, b), [3, 4, 5]);
}
buffer.advance(2);
let (a, b) = buffer.abi_ptr_and_len();
assert_ne!(a, ptr.wrapping_add(3));
assert_eq!(a, base.wrapping_add(3));
assert_eq!(b, 1);
unsafe {
assert_eq!(std::slice::from_raw_parts(a, b), [5]);
}
let ret = buffer.into_vec();
assert_eq!(ret, [B(4)]);
assert_eq!(ret.as_ptr(), ptr.cast());
}
#[test]
fn dealloc_lists() {
static DEALLOCS: AtomicUsize = AtomicUsize::new(0);
static OP: StreamVtable<B> = StreamVtable {
cancel_read: cancel,
cancel_write: cancel,
drop_readable: drop,
drop_writable: drop,
dealloc_lists: Some(|ptr| {
let prev = DEALLOCS.fetch_add(1, Relaxed);
assert_eq!(unsafe { usize::from(*ptr) }, prev + 1);
}),
lift: Some(|ptr| unsafe { B(*ptr) }),
lower: Some(|b, ptr| unsafe {
*ptr = b.0;
}),
layout: unsafe { Layout::from_size_align_unchecked(1, 1) },
new,
start_read,
start_write,
};
assert_eq!(DEALLOCS.load(Relaxed), 0);
let buf = vec![B(1), B(2), B(3), B(4)];
let mut buffer = AbiBuffer::new(buf, &OP);
assert_eq!(DEALLOCS.load(Relaxed), 0);
buffer.abi_ptr_and_len();
assert_eq!(DEALLOCS.load(Relaxed), 0);
buffer.advance(1);
assert_eq!(DEALLOCS.load(Relaxed), 1);
buffer.abi_ptr_and_len();
assert_eq!(DEALLOCS.load(Relaxed), 1);
buffer.advance(2);
assert_eq!(DEALLOCS.load(Relaxed), 3);
buffer.abi_ptr_and_len();
assert_eq!(DEALLOCS.load(Relaxed), 3);
buffer.into_vec();
assert_eq!(DEALLOCS.load(Relaxed), 3);
}
}