#![feature(allocator_api)]
#![feature(negative_impls)]
use std::sync::Arc;
use futures::stream::{Stream, FusedStream, StreamExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
pub mod async_pipeline;
use async_pipeline::BoxReceiver;
pub struct PartBuffer(Bytes);
unsafe impl Send for PartBuffer {}
unsafe impl Sync for PartBuffer {}
impl AsRef<[u8]> for PartBuffer {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
impl Buf for PartBuffer {
fn remaining(&self) -> usize {
self.0.remaining()
}
fn chunk(&self) -> &[u8] {
self.0.chunk()
}
fn advance(&mut self, cnt: usize) {
self.0.advance(cnt);
}
}
impl PartBuffer {
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
pub fn get_isize(&mut self) -> isize {
self.0.get_int(isize::BITS as usize / 8) as isize
}
#[inline]
pub fn get_isize_le(&mut self) -> isize {
self.0.get_int_le(isize::BITS as usize / 8) as isize
}
#[inline]
pub fn get_usize(&mut self) -> usize {
self.0.get_uint(usize::BITS as usize / 8) as usize
}
#[inline]
pub fn get_usize_le(&mut self) -> usize {
self.0.get_uint_le(usize::BITS as usize / 8) as usize
}
#[inline]
pub fn get(&mut self, len: usize) -> Self {
PartBuffer(self.0.copy_to_bytes(len))
}
}
pub struct ByteBuffer {
stream: BoxReceiver<'static, Arc<Vec<u8>>>, buf: Option<BytesMut>, readed: usize, }
unsafe impl Send for ByteBuffer {}
impl !Sync for ByteBuffer {}
impl AsRef<[u8]> for ByteBuffer {
fn as_ref(&self) -> &[u8] {
self
.buf
.as_ref()
.unwrap()
.as_ref()
}
}
impl ByteBuffer {
pub fn new(stream: BoxReceiver<'static, Arc<Vec<u8>>>) -> Self {
let buf = match stream.size_hint() {
(0, _) => Some(BytesMut::default()),
(size, _) => Some(BytesMut::with_capacity(size)),
};
ByteBuffer {
stream,
buf,
readed: 0,
}
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
#[inline]
pub fn is_empty(&self) -> bool {
self
.buf
.as_ref()
.unwrap()
.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self
.buf
.as_ref()
.unwrap()
.len()
}
#[inline]
pub fn remaining(&self) -> usize {
self
.buf
.as_ref()
.unwrap()
.remaining()
}
#[inline]
pub fn unreceived(&self) -> Option<usize> {
self.stream.current_len()
}
#[inline]
pub fn readed(&self) -> usize {
self.readed
}
pub async fn get_i8(&mut self) -> Option<i8> {
if !try_fill_buffer(self, 1).await {
return None;
}
self.readed += 1;
Some(self.buf.as_mut().unwrap().get_i8())
}
pub async fn get_u8(&mut self) -> Option<u8> {
if !try_fill_buffer(self, 1).await {
return None;
}
self.readed += 1;
Some(self.buf.as_mut().unwrap().get_u8())
}
pub async fn get_i16(&mut self) -> Option<i16> {
if !try_fill_buffer(self, 2).await {
return None;
}
self.readed += 2;
Some(self.buf.as_mut().unwrap().get_i16())
}
pub async fn get_i16_le(&mut self) -> Option<i16> {
if !try_fill_buffer(self, 2).await {
return None;
}
self.readed += 2;
Some(self.buf.as_mut().unwrap().get_i16_le())
}
pub async fn get_u16(&mut self) -> Option<u16> {
if !try_fill_buffer(self, 2).await {
return None;
}
self.readed += 2;
Some(self.buf.as_mut().unwrap().get_u16())
}
pub async fn get_u16_le(&mut self) -> Option<u16> {
if !try_fill_buffer(self, 2).await {
return None;
}
self.readed += 2;
Some(self.buf.as_mut().unwrap().get_u16_le())
}
pub async fn get_i32(&mut self) -> Option<i32> {
if !try_fill_buffer(self, 4).await {
return None;
}
self.readed += 4;
Some(self.buf.as_mut().unwrap().get_i32())
}
pub async fn get_i32_le(&mut self) -> Option<i32> {
if !try_fill_buffer(self, 4).await {
return None;
}
self.readed += 4;
Some(self.buf.as_mut().unwrap().get_i32_le())
}
pub async fn get_u32(&mut self) -> Option<u32> {
if !try_fill_buffer(self, 4).await {
return None;
}
self.readed += 4;
Some(self.buf.as_mut().unwrap().get_u32())
}
pub async fn get_u32_le(&mut self) -> Option<u32> {
if !try_fill_buffer(self, 4).await {
return None;
}
self.readed += 4;
Some(self.buf.as_mut().unwrap().get_u32_le())
}
pub async fn get_i64(&mut self) -> Option<i64> {
if !try_fill_buffer(self, 8).await {
return None;
}
self.readed += 8;
Some(self.buf.as_mut().unwrap().get_i64())
}
pub async fn get_i64_le(&mut self) -> Option<i64> {
if !try_fill_buffer(self, 8).await {
return None;
}
self.readed += 8;
Some(self.buf.as_mut().unwrap().get_i64_le())
}
pub async fn get_u64(&mut self) -> Option<u64> {
if !try_fill_buffer(self, 8).await {
return None;
}
self.readed += 8;
Some(self.buf.as_mut().unwrap().get_u64())
}
pub async fn get_u64_le(&mut self) -> Option<u64> {
if !try_fill_buffer(self, 8).await {
return None;
}
self.readed += 8;
Some(self.buf.as_mut().unwrap().get_u64_le())
}
pub async fn get_i128(&mut self) -> Option<i128> {
if !try_fill_buffer(self, 16).await {
return None;
}
self.readed += 16;
Some(self.buf.as_mut().unwrap().get_i128())
}
pub async fn get_i128_le(&mut self) -> Option<i128> {
if !try_fill_buffer(self, 16).await {
return None;
}
self.readed += 16;
Some(self.buf.as_mut().unwrap().get_i128_le())
}
pub async fn get_u128(&mut self) -> Option<u128> {
if !try_fill_buffer(self, 16).await {
return None;
}
self.readed += 16;
Some(self.buf.as_mut().unwrap().get_u128())
}
pub async fn get_u128_le(&mut self) -> Option<u128> {
if !try_fill_buffer(self, 16).await {
return None;
}
self.readed += 16;
Some(self.buf.as_mut().unwrap().get_u128_le())
}
pub async fn get_isize(&mut self) -> Option<isize> {
let require = isize::BITS as usize / 8;
if !try_fill_buffer(self, require).await {
return None;
}
self.readed += isize::BITS as usize / 8;
Some(self.buf.as_mut().unwrap().get_int(require) as isize)
}
pub async fn get_isize_le(&mut self) -> Option<isize> {
let require = isize::BITS as usize / 8;
if !try_fill_buffer(self, require).await {
return None;
}
self.readed += isize::BITS as usize / 8;
Some(self.buf.as_mut().unwrap().get_int_le(require) as isize)
}
pub async fn get_usize(&mut self) -> Option<usize> {
let require = usize::BITS as usize / 8;
if !try_fill_buffer(self, require).await {
return None;
}
self.readed += usize::BITS as usize / 8;
Some(self.buf.as_mut().unwrap().get_uint(require) as usize)
}
pub async fn get_usize_le(&mut self) -> Option<usize> {
let require = usize::BITS as usize / 8;
if !try_fill_buffer(self, require).await {
return None;
}
self.readed += usize::BITS as usize / 8;
Some(self.buf.as_mut().unwrap().get_uint_le(require) as usize)
}
pub async fn get_f32(&mut self) -> Option<f32> {
if !try_fill_buffer(self, 4).await {
return None;
}
self.readed += 4;
Some(self.buf.as_mut().unwrap().get_f32())
}
pub async fn get_f32_le(&mut self) -> Option<f32> {
if !try_fill_buffer(self, 4).await {
return None;
}
self.readed += 4;
Some(self.buf.as_mut().unwrap().get_f32_le())
}
pub async fn get_f64(&mut self) -> Option<f64> {
if !try_fill_buffer(self, 8).await {
return None;
}
self.readed += 8;
Some(self.buf.as_mut().unwrap().get_f64())
}
pub async fn get_f64_le(&mut self) -> Option<f64> {
if !try_fill_buffer(self, 8).await {
return None;
}
self.readed += 8;
Some(self.buf.as_mut().unwrap().get_f64_le())
}
pub async fn get(&mut self, len: usize) -> Option<PartBuffer> {
if !try_fill_buffer(self, len).await {
return None;
}
self.readed += len; Some(PartBuffer(self.buf.as_mut().unwrap().copy_to_bytes(len)))
}
pub async fn try_get(&mut self, len: usize) -> Option<PartBuffer> {
let mut remaining = self.remaining();
if (remaining == 0) && !try_fill_buffer_by_non_blocking(self, len).await {
return None;
}
remaining = self.remaining(); if remaining == 0 {
Some(PartBuffer(Bytes::new()))
} else if remaining < len {
self.readed += remaining; Some(PartBuffer(self.buf.as_mut().unwrap().copy_to_bytes(remaining)))
} else {
let len = if len == 0 {
remaining
} else {
len
};
self.readed += len; Some(PartBuffer(self.buf.as_mut().unwrap().copy_to_bytes(len)))
}
}
pub async fn try_fill(&mut self) -> usize {
try_fill_buffer_by_non_blocking(self, 0).await;
self.remaining()
}
pub fn truncate(&mut self) -> usize {
let old_buf = self.buf.take().unwrap();
let mut new_buf = BytesMut::with_capacity(old_buf.remaining());
new_buf.put(old_buf);
self.buf = Some(new_buf);
let result = self.readed();
self.readed = 0;
result
}
pub fn clear(&mut self) {
let _ = self.buf.take().unwrap();
self.buf = Some(BytesMut::new());
self.readed = 0;
}
}
#[inline]
async fn try_fill_buffer(buffer: &mut ByteBuffer,
require: usize) -> bool {
let mut ready_len = buffer.remaining(); while ready_len < require {
match buffer.stream.next().await {
None => {
return false;
},
Some(bin) => {
buffer
.buf
.as_mut()
.unwrap()
.put_slice(bin.as_ref()); ready_len += bin.len(); },
}
}
true
}
#[inline]
async fn try_fill_buffer_by_non_blocking(buffer: &mut ByteBuffer,
require: usize) -> bool {
let mut ready_len = buffer.remaining(); while (buffer.stream.current_len().unwrap() > 0)
&& (require == 0 || ready_len < require) {
match buffer.stream.next().await {
None => {
return false;
},
Some(bin) => {
buffer
.buf
.as_mut()
.unwrap()
.put_slice(bin.as_ref()); ready_len += bin.len(); },
}
}
true
}