#[cfg(feature = "component-model-async-bytes")]
use bytes::{Bytes, BytesMut};
#[cfg(feature = "component-model-async-bytes")]
use std::io::Cursor;
use std::mem::{self, MaybeUninit};
use std::slice;
use std::vec::Vec;
pub use untyped::*;
mod untyped {
use super::WriteBuffer;
use crate::vm::SendSyncPtr;
use std::any::TypeId;
use std::marker;
use std::mem;
use std::ptr::NonNull;
pub struct UntypedWriteBuffer<'a> {
element_type_id: TypeId,
buf: SendSyncPtr<dyn WriteBuffer<()>>,
_marker: marker::PhantomData<&'a mut dyn WriteBuffer<()>>,
}
union ReinterpretWriteBuffer<T> {
typed: *mut dyn WriteBuffer<T>,
untyped: *mut dyn WriteBuffer<()>,
}
impl<'a> UntypedWriteBuffer<'a> {
pub fn new<T: 'static>(buf: &'a mut dyn WriteBuffer<T>) -> UntypedWriteBuffer<'a> {
UntypedWriteBuffer {
element_type_id: TypeId::of::<T>(),
buf: SendSyncPtr::new(
NonNull::new(unsafe {
let r = ReinterpretWriteBuffer { typed: buf };
assert_eq!(mem::size_of_val(&r.typed), mem::size_of_val(&r.untyped));
r.untyped
})
.unwrap(),
),
_marker: marker::PhantomData,
}
}
pub fn get_mut<T: 'static>(&mut self) -> &mut dyn WriteBuffer<T> {
assert_eq!(self.element_type_id, TypeId::of::<T>());
unsafe {
&mut *ReinterpretWriteBuffer {
untyped: self.buf.as_ptr(),
}
.typed
}
}
}
}
pub unsafe trait WriteBuffer<T>: Send + Sync + 'static {
fn remaining(&self) -> &[T];
fn skip(&mut self, count: usize);
fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>]));
}
pub trait ReadBuffer<T>: Send + Sync + 'static {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I);
fn remaining_capacity(&self) -> usize;
fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize);
}
pub(super) struct Extender<'a, B>(pub(super) &'a mut B);
impl<T, B: ReadBuffer<T>> Extend<T> for Extender<'_, B> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
self.0.extend(iter)
}
}
unsafe impl<T: Send + Sync + 'static> WriteBuffer<T> for Option<T> {
fn remaining(&self) -> &[T] {
if let Some(me) = self {
slice::from_ref(me)
} else {
&[]
}
}
fn skip(&mut self, count: usize) {
match count {
0 => {}
1 => {
assert!(self.is_some());
*self = None;
}
_ => panic!("cannot skip more than {} item(s)", self.remaining().len()),
}
}
fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>])) {
match count {
0 => fun(&mut []),
1 => {
let mut item = MaybeUninit::new(self.take().unwrap());
fun(slice::from_mut(&mut item));
}
_ => panic!("cannot forget more than {} item(s)", self.remaining().len()),
}
}
}
impl<T: Send + Sync + 'static> ReadBuffer<T> for Option<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
let mut iter = iter.into_iter();
if self.is_none() {
*self = iter.next();
}
assert!(iter.next().is_none());
}
fn remaining_capacity(&self) -> usize {
if self.is_some() { 0 } else { 1 }
}
fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize) {
match count {
0 => {}
1 => {
assert!(self.is_none());
input.take(1, &mut |slice| {
unsafe {
*self = Some(slice[0].assume_init_read());
}
});
}
_ => panic!(
"cannot take more than {} item(s)",
self.remaining_capacity()
),
}
}
}
pub struct SliceBuffer {
buffer: Vec<u8>,
offset: usize,
limit: usize,
}
impl SliceBuffer {
pub fn new(buffer: Vec<u8>, offset: usize, limit: usize) -> Self {
assert!(limit <= buffer.len());
Self {
buffer,
offset,
limit,
}
}
pub fn into_parts(self) -> (Vec<u8>, usize, usize) {
(self.buffer, self.offset, self.limit)
}
}
unsafe impl WriteBuffer<u8> for SliceBuffer {
fn remaining(&self) -> &[u8] {
&self.buffer[self.offset..self.limit]
}
fn skip(&mut self, count: usize) {
assert!(self.offset + count <= self.limit);
self.offset += count;
}
fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
assert!(count <= self.remaining().len());
self.offset += count;
fun(unsafe {
mem::transmute::<&[u8], &[MaybeUninit<u8>]>(
&self.buffer[self.offset - count..self.limit],
)
});
}
}
pub struct VecBuffer<T> {
buffer: Vec<MaybeUninit<T>>,
offset: usize,
}
impl<T> Default for VecBuffer<T> {
fn default() -> Self {
Self::with_capacity(0)
}
}
impl<T> VecBuffer<T> {
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
offset: 0,
}
}
pub fn reset(&mut self) {
self.skip_(self.remaining_().len());
self.buffer.clear();
self.offset = 0;
}
fn remaining_(&self) -> &[T] {
unsafe { mem::transmute::<&[MaybeUninit<T>], &[T]>(&self.buffer[self.offset..]) }
}
fn skip_(&mut self, count: usize) {
assert!(count <= self.remaining_().len());
for item in &mut self.buffer[self.offset..][..count] {
self.offset += 1;
unsafe {
item.assume_init_drop();
}
}
}
}
unsafe impl<T: Send + Sync + 'static> WriteBuffer<T> for VecBuffer<T> {
fn remaining(&self) -> &[T] {
self.remaining_()
}
fn skip(&mut self, count: usize) {
self.skip_(count)
}
fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<T>])) {
assert!(count <= self.remaining().len());
self.offset += count;
fun(&self.buffer[self.offset - count..]);
}
}
impl<T> From<Vec<T>> for VecBuffer<T> {
fn from(buffer: Vec<T>) -> Self {
Self {
buffer: unsafe { mem::transmute::<Vec<T>, Vec<MaybeUninit<T>>>(buffer) },
offset: 0,
}
}
}
impl<T> Drop for VecBuffer<T> {
fn drop(&mut self) {
self.reset();
}
}
impl<T: Send + Sync + 'static> ReadBuffer<T> for Vec<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
Extend::extend(self, iter)
}
fn remaining_capacity(&self) -> usize {
self.capacity().checked_sub(self.len()).unwrap()
}
fn move_from(&mut self, input: &mut dyn WriteBuffer<T>, count: usize) {
assert!(count <= self.remaining_capacity());
input.take(count, &mut |slice| {
for item in slice {
self.push(unsafe { item.assume_init_read() });
}
});
}
}
#[cfg(feature = "component-model-async-bytes")]
unsafe impl WriteBuffer<u8> for Cursor<Bytes> {
fn remaining(&self) -> &[u8] {
&self.get_ref()[usize::try_from(self.position()).unwrap()..]
}
fn skip(&mut self, count: usize) {
assert!(
count <= self.remaining().len(),
"tried to skip {count} with {} remaining",
self.remaining().len()
);
self.set_position(
self.position()
.checked_add(u64::try_from(count).unwrap())
.unwrap(),
);
}
fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
assert!(count <= self.remaining().len());
fun(unsafe_byte_slice(self.remaining()));
self.skip(count);
}
}
#[cfg(feature = "component-model-async-bytes")]
unsafe impl WriteBuffer<u8> for Cursor<BytesMut> {
fn remaining(&self) -> &[u8] {
&self.get_ref()[usize::try_from(self.position()).unwrap()..]
}
fn skip(&mut self, count: usize) {
assert!(count <= self.remaining().len());
self.set_position(
self.position()
.checked_add(u64::try_from(count).unwrap())
.unwrap(),
);
}
fn take(&mut self, count: usize, fun: &mut dyn FnMut(&[MaybeUninit<u8>])) {
assert!(count <= self.remaining().len());
fun(unsafe_byte_slice(self.remaining()));
self.skip(count);
}
}
#[cfg(feature = "component-model-async-bytes")]
impl ReadBuffer<u8> for BytesMut {
fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I) {
Extend::extend(self, iter)
}
fn remaining_capacity(&self) -> usize {
self.capacity().checked_sub(self.len()).unwrap()
}
fn move_from(&mut self, input: &mut dyn WriteBuffer<u8>, count: usize) {
assert!(count <= self.remaining_capacity());
input.take(count, &mut |slice| {
let slice = unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) };
self.extend_from_slice(slice);
});
}
}
#[cfg(feature = "component-model-async-bytes")]
fn unsafe_byte_slice(slice: &[u8]) -> &[MaybeUninit<u8>] {
unsafe { mem::transmute::<&[u8], &[MaybeUninit<u8>]>(slice) }
}