#[cfg(feature = "vectored")]
use crate::sources::VectoredByteSlices;
use crate::{
database::Database,
error::{CompressionError, VectorscanRuntimeError},
hs,
matchers::stream::StreamMatcher,
sources::ByteSlice,
state::Scratch,
};
use handles::{Handle, Resource};
use std::{ops, ptr};
pub type NativeStream = hs::hs_stream;
#[derive(Debug)]
#[repr(transparent)]
pub struct LiveStream(*mut NativeStream);
unsafe impl Send for LiveStream {}
impl LiveStream {
pub fn open(db: &Database) -> Result<Self, VectorscanRuntimeError> {
let mut ret = ptr::null_mut();
VectorscanRuntimeError::from_native(unsafe {
hs::hs_open_stream(
db.as_ref_native(),
0,
&mut ret,
)
})?;
Ok(unsafe { Self::from_native(ret) })
}
pub fn reset(&mut self) -> Result<(), VectorscanRuntimeError> {
VectorscanRuntimeError::from_native(unsafe { hs::hs_direct_reset_stream(self.as_mut_native()) })
}
pub fn compress(
&self,
into: CompressReserveBehavior,
) -> Result<CompressedStream, CompressionError> {
CompressedStream::compress(into, self)
}
}
impl LiveStream {
pub const unsafe fn from_native(p: *mut NativeStream) -> Self { Self(p) }
pub fn as_ref_native(&self) -> &NativeStream { unsafe { &*self.0 } }
pub fn as_mut_native(&mut self) -> &mut NativeStream { unsafe { &mut *self.0 } }
pub fn try_clone(&self) -> Result<Self, VectorscanRuntimeError> {
let mut ret = ptr::null_mut();
VectorscanRuntimeError::from_native(unsafe {
hs::hs_copy_stream(&mut ret, self.as_ref_native())
})?;
Ok(unsafe { Self::from_native(ret) })
}
pub unsafe fn try_clone_from(&mut self, source: &Self) -> Result<(), VectorscanRuntimeError> {
VectorscanRuntimeError::from_native(unsafe {
hs::hs_direct_reset_and_copy_stream(self.as_mut_native(), source.as_ref_native())
})
}
pub unsafe fn try_drop(&mut self) -> Result<(), VectorscanRuntimeError> {
VectorscanRuntimeError::from_native(unsafe { hs::hs_direct_free_stream(self.as_mut_native()) })
}
}
impl Clone for LiveStream {
fn clone(&self) -> Self { self.try_clone().unwrap() }
}
impl Resource for LiveStream {
type Error = VectorscanRuntimeError;
fn deep_clone(&self) -> Result<Self, Self::Error> { self.try_clone() }
fn deep_boxed_clone(&self) -> Result<Box<dyn Resource<Error=Self::Error>>, Self::Error> {
Ok(Box::new(self.try_clone()?))
}
unsafe fn sync_drop(&mut self) -> Result<(), Self::Error> { self.try_drop() }
}
impl ops::Drop for LiveStream {
fn drop(&mut self) {
unsafe {
self.try_drop().unwrap();
}
}
}
pub struct ScratchStreamSink<'code> {
pub live: Box<dyn Handle<R=LiveStream>>,
pub matcher: StreamMatcher<'code>,
pub scratch: Box<dyn Handle<R=Scratch>>,
}
impl<'code> ScratchStreamSink<'code> {
pub fn new(
live: impl Handle<R=LiveStream>,
matcher: StreamMatcher<'code>,
scratch: impl Handle<R=Scratch>,
) -> Self {
Self {
live: Box::new(live),
matcher,
scratch: Box::new(scratch),
}
}
#[allow(clippy::needless_lifetimes)]
pub fn scan<'data>(&mut self, data: ByteSlice<'data>) -> Result<(), VectorscanRuntimeError> {
let Self {
live,
matcher,
scratch,
} = self;
scratch
.make_mut()?
.scan_sync_stream(live.make_mut()?, matcher, data)
}
#[cfg(feature = "vectored")]
#[cfg_attr(docsrs, doc(cfg(feature = "vectored")))]
pub fn scan_vectored<'data>(
&mut self,
data: VectoredByteSlices<'data, 'data>,
) -> Result<(), VectorscanRuntimeError> {
let Self {
live,
matcher,
scratch,
} = self;
scratch
.make_mut()?
.scan_sync_vectored_stream(live.make_mut()?, matcher, data)
}
pub fn flush_eod(&mut self) -> Result<(), VectorscanRuntimeError> {
let Self {
live,
matcher,
scratch,
} = self;
scratch
.make_mut()?
.flush_eod_sync(live.make_mut()?, matcher)
}
pub fn reset(&mut self) -> Result<(), VectorscanRuntimeError> { self.live.make_mut()?.reset() }
}
pub(crate) mod std_impls {
use super::ScratchStreamSink;
use crate::sources::ByteSlice;
#[cfg(feature = "vectored")]
use crate::sources::VectoredByteSlices;
use std::io;
#[cfg(feature = "vectored")]
use std::mem;
pub struct StreamWriter<'code> {
#[allow(missing_docs)]
pub inner: ScratchStreamSink<'code>,
#[cfg(feature = "vectored")]
vectored_buf_cache: Vec<mem::MaybeUninit<ByteSlice<'static>>>,
}
impl<'code> StreamWriter<'code> {
pub fn new(inner: ScratchStreamSink<'code>) -> Self {
Self {
inner,
#[cfg(feature = "vectored")]
vectored_buf_cache: Vec::new(),
}
}
}
impl<'code> io::Write for StreamWriter<'code> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self
.inner
.scan(ByteSlice::from_slice(buf))
.map(|()| buf.len())
.map_err(io::Error::other)
}
fn flush(&mut self) -> io::Result<()> { Ok(()) }
#[cfg(feature = "vectored")]
#[cfg_attr(docsrs, doc(cfg(feature = "vectored")))]
fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
let bufs = VectoredByteSlices::from_io_slices(&mut self.vectored_buf_cache, bufs);
let len = bufs.length_sum();
self
.inner
.scan_vectored(bufs)
.map(|()| len)
.map_err(io::Error::other)
}
}
}
pub use std_impls::StreamWriter;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub mod channel {
use super::LiveStream;
#[cfg(feature = "vectored")]
use crate::sources::VectoredByteSlices;
use crate::{
error::{ScanError, VectorscanRuntimeError},
matchers::{
stream::{SendStreamMatcher, StreamMatch},
MatchResult,
},
sources::ByteSlice,
state::Scratch,
};
use futures_core::stream::Stream;
use handles::Handle;
use tokio::{sync::mpsc, task};
use std::mem;
pub struct ScratchStreamSinkChannel<'code> {
pub live: Box<dyn Handle<R=LiveStream>+Send>,
pub hf: Box<dyn FnMut(StreamMatch) -> MatchResult+Send+'code>,
pub scratch: Box<dyn Handle<R=Scratch>+Send>,
pub rx: mpsc::UnboundedReceiver<StreamMatch>,
}
impl<'code> ScratchStreamSinkChannel<'code> {
fn translate_async_sender(
hf: &'code mut (dyn FnMut(&StreamMatch) -> MatchResult+Send+'code),
tx: mpsc::UnboundedSender<StreamMatch>,
) -> Box<dyn FnMut(StreamMatch) -> MatchResult+Send+'code> {
Box::new(move |m| {
let result = hf(&m);
tx.send(m).unwrap();
result
})
}
pub fn new(
live: impl Handle<R=LiveStream>+Send,
hf: &'code mut (dyn FnMut(&StreamMatch) -> MatchResult+Send+'code),
scratch: impl Handle<R=Scratch>+Send,
) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let hf = Self::translate_async_sender(hf, tx);
Self {
live: Box::new(live),
hf,
scratch: Box::new(scratch),
rx,
}
}
pub async fn scan<'data>(&mut self, data: ByteSlice<'data>) -> Result<(), ScanError> {
let Self {
live, hf, scratch, ..
} = self;
let live: &'static mut LiveStream = unsafe { mem::transmute(live.make_mut()?) };
let scratch: &'static mut Scratch = unsafe { mem::transmute(scratch.make_mut()?) };
let data: ByteSlice<'static> = unsafe { mem::transmute(data) };
let hf: &mut (dyn FnMut(StreamMatch) -> MatchResult+Send+'code) = hf;
let matcher = SendStreamMatcher::new(hf);
let mut matcher: SendStreamMatcher<'static> = unsafe { mem::transmute(matcher) };
task::spawn_blocking(move || scratch.scan_sync_stream(live, matcher.as_mut_basic(), data))
.await??;
Ok(())
}
#[cfg(feature = "vectored")]
#[cfg_attr(docsrs, doc(cfg(feature = "vectored")))]
pub async fn scan_vectored<'data>(
&mut self,
data: VectoredByteSlices<'data, 'data>,
) -> Result<(), ScanError> {
let Self {
live, hf, scratch, ..
} = self;
let live: &'static mut LiveStream = unsafe { mem::transmute(live.make_mut()?) };
let scratch: &'static mut Scratch = unsafe { mem::transmute(scratch.make_mut()?) };
let data: VectoredByteSlices<'static, 'static> = unsafe { mem::transmute(data) };
let hf: &mut (dyn FnMut(StreamMatch) -> MatchResult+Send+'code) = hf;
let matcher = SendStreamMatcher::new(hf);
let mut matcher: SendStreamMatcher<'static> = unsafe { mem::transmute(matcher) };
task::spawn_blocking(move || {
scratch.scan_sync_vectored_stream(live, matcher.as_mut_basic(), data)
})
.await??;
Ok(())
}
pub async fn flush_eod(&mut self) -> Result<(), ScanError> {
let Self {
live, hf, scratch, ..
} = self;
let live: &'static mut LiveStream = unsafe { mem::transmute(live.make_mut()?) };
let scratch: &'static mut Scratch = unsafe { mem::transmute(scratch.make_mut()?) };
let hf: &mut (dyn FnMut(StreamMatch) -> MatchResult+Send+'code) = hf;
let matcher = SendStreamMatcher::new(hf);
let mut matcher: SendStreamMatcher<'static> = unsafe { mem::transmute(matcher) };
task::spawn_blocking(move || scratch.flush_eod_sync(live, matcher.as_mut_basic())).await??;
Ok(())
}
pub fn collect_matches(mut self) -> impl Stream<Item=StreamMatch> {
self.rx.close();
crate::async_utils::UnboundedReceiverStream(self.rx)
}
pub fn reset(&mut self) -> Result<(), VectorscanRuntimeError> { self.live.make_mut()?.reset() }
}
#[cfg(feature = "tokio-impls")]
pub(crate) mod tokio_impls {
use super::ScratchStreamSinkChannel;
use crate::sources::ByteSlice;
#[cfg(feature = "vectored")]
use crate::sources::VectoredByteSlices;
use futures_util::TryFutureExt;
use tokio::io;
#[cfg(feature = "vectored")]
use std::io::IoSlice;
use std::{
future::Future,
mem,
pin::Pin,
task::{ready, Context, Poll},
};
pub struct AsyncStreamWriter<'code> {
#[allow(missing_docs)]
pub inner: ScratchStreamSinkChannel<'code>,
#[cfg(feature = "vectored")]
vectored_buf_cache: Vec<mem::MaybeUninit<ByteSlice<'static>>>,
write_future: Option<Pin<Box<dyn Future<Output=io::Result<usize>>+'code>>>,
shutdown_future: Option<Pin<Box<dyn Future<Output=io::Result<()>>+'code>>>,
}
impl<'code> AsyncStreamWriter<'code> {
pub fn new(inner: ScratchStreamSinkChannel<'code>) -> Self {
Self {
inner,
#[cfg(feature = "vectored")]
vectored_buf_cache: Vec::new(),
write_future: None,
shutdown_future: None,
}
}
}
unsafe impl<'code> Send for AsyncStreamWriter<'code> {}
impl<'code> io::AsyncWrite for AsyncStreamWriter<'code> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.write_future.is_some() {
let ret = ready!(self
.as_mut()
.write_future
.as_mut()
.unwrap()
.as_mut()
.poll(cx));
self.write_future = None;
Poll::Ready(ret)
} else {
let mut fut: Pin<Box<dyn Future<Output=io::Result<usize>>+'code>> = {
let s: &'code mut Self = unsafe { mem::transmute(self.as_mut().get_mut()) };
let buf: &'code [u8] = unsafe { mem::transmute(buf) };
let buf_len = buf.len();
let buf = ByteSlice::from_slice(buf);
Box::pin(
s.inner
.scan(buf)
.map_ok(move |()| buf_len)
.map_err(io::Error::other),
)
};
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
return Poll::Ready(ret);
}
self.write_future = Some(fut);
Poll::Pending
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.shutdown_future.is_some() {
let ret = ready!(self
.as_mut()
.shutdown_future
.as_mut()
.unwrap()
.as_mut()
.poll(cx));
self.shutdown_future = None;
Poll::Ready(ret)
} else {
let mut fut: Pin<Box<dyn Future<Output=io::Result<()>>+'code>> = {
let s: &'code mut Self = unsafe { mem::transmute(self.as_mut().get_mut()) };
Box::pin(s.inner.flush_eod().map_err(io::Error::other))
};
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
return Poll::Ready(ret);
}
self.shutdown_future = Some(fut);
Poll::Pending
}
}
#[cfg(feature = "vectored")]
#[cfg_attr(docsrs, doc(cfg(feature = "vectored")))]
fn is_write_vectored(&self) -> bool { true }
#[cfg(feature = "vectored")]
#[cfg_attr(docsrs, doc(cfg(feature = "vectored")))]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
if self.write_future.is_some() {
let ret = ready!(self
.as_mut()
.write_future
.as_mut()
.unwrap()
.as_mut()
.poll(cx));
self.write_future = None;
Poll::Ready(ret)
} else {
let mut fut: Pin<Box<dyn Future<Output=io::Result<usize>>+'code>> = {
let s: &'code mut Self = unsafe { mem::transmute(self.as_mut().get_mut()) };
let bufs: &'code [IoSlice<'code>] = unsafe { mem::transmute(bufs) };
let bufs = VectoredByteSlices::from_io_slices(&mut s.vectored_buf_cache, bufs);
let len = bufs.length_sum();
Box::pin(
s.inner
.scan_vectored(bufs)
.map_ok(move |()| len)
.map_err(io::Error::other),
)
};
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
return Poll::Ready(ret);
}
self.write_future = Some(fut);
Poll::Pending
}
}
}
}
#[cfg(feature = "tokio-impls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-impls")))]
pub use tokio_impls::AsyncStreamWriter;
}
pub(crate) mod compress {
use super::{LiveStream, NativeStream};
use crate::{
database::Database,
error::{CompressionError, VectorscanRuntimeError},
hs,
};
use std::{mem, ptr};
pub enum CompressReserveBehavior {
NewBuf,
ExpandBuf(Vec<u8>),
FixedSizeBuf(Vec<u8>),
}
impl CompressReserveBehavior {
pub fn current_buf(&mut self) -> Option<&mut Vec<u8>> {
match self {
Self::NewBuf => None,
Self::ExpandBuf(ref mut buf) => Some(buf),
Self::FixedSizeBuf(ref mut buf) => Some(buf),
}
}
}
enum ReserveResponse {
MadeSpace(Vec<u8>),
NoSpace(Vec<u8>),
}
impl CompressReserveBehavior {
fn reserve(self, n: usize) -> ReserveResponse {
match self {
Self::NewBuf => ReserveResponse::MadeSpace(Vec::with_capacity(n)),
Self::ExpandBuf(mut buf) => {
if n > buf.capacity() {
let additional = n - buf.capacity();
buf.reserve(additional);
}
ReserveResponse::MadeSpace(buf)
},
Self::FixedSizeBuf(buf) => {
if buf.capacity() <= n {
ReserveResponse::NoSpace(buf)
} else {
ReserveResponse::MadeSpace(buf)
}
},
}
}
}
pub struct CompressedStream {
buf: Box<[u8]>,
}
impl CompressedStream {
pub fn into_buf(self) -> Box<[u8]> { self.buf }
pub fn compress(
mut into: CompressReserveBehavior,
live: &LiveStream,
) -> Result<Self, CompressionError> {
let mut required_space: usize = 0;
if let Some(buf) = into.current_buf() {
match VectorscanRuntimeError::from_native(unsafe {
hs::hs_compress_stream(
live.as_ref_native(),
mem::transmute(buf.as_mut_ptr()),
buf.capacity(),
&mut required_space,
)
}) {
Err(VectorscanRuntimeError::InsufficientSpace) => (),
Err(e) => return Err(e.into()),
Ok(()) => {
debug_assert!(buf.capacity() >= required_space);
unsafe {
buf.set_len(required_space);
}
return Ok(Self {
buf: mem::take(buf).into_boxed_slice(),
});
},
}
} else {
assert_eq!(
Err(VectorscanRuntimeError::InsufficientSpace),
VectorscanRuntimeError::from_native(unsafe {
hs::hs_compress_stream(
live.as_ref_native(),
ptr::null_mut(),
0,
&mut required_space,
)
})
);
}
let buf = match into.reserve(required_space) {
ReserveResponse::NoSpace(buf) => {
debug_assert!(required_space > buf.len());
return Err(CompressionError::NoSpace(required_space, buf));
},
ReserveResponse::MadeSpace(mut buf) => {
let mut allocated_space: usize = 0;
VectorscanRuntimeError::from_native(unsafe {
hs::hs_compress_stream(
live.as_ref_native(),
mem::transmute(buf.as_mut_ptr()),
buf.capacity(),
&mut allocated_space,
)
})?;
debug_assert_eq!(required_space, allocated_space);
debug_assert!(allocated_space <= buf.capacity());
unsafe {
buf.set_len(allocated_space);
}
buf
},
};
Ok(Self {
buf: buf.into_boxed_slice(),
})
}
pub fn expand(&self, db: &Database) -> Result<LiveStream, VectorscanRuntimeError> {
let mut inner = ptr::null_mut();
VectorscanRuntimeError::from_native(unsafe {
hs::hs_expand_stream(
db.as_ref_native(),
&mut inner,
mem::transmute(self.buf.as_ptr()),
self.buf.len(),
)
})?;
Ok(unsafe { LiveStream::from_native(inner) })
}
pub unsafe fn expand_into(&self, to: &mut LiveStream) -> Result<(), VectorscanRuntimeError> {
VectorscanRuntimeError::from_native(hs::hs_direct_expand_into(
to.as_mut_native(),
mem::transmute(self.buf.as_ptr()),
self.buf.len(),
))
}
pub unsafe fn expand_into_at(
&self,
db: &Database,
to: *mut NativeStream,
) -> Result<(), VectorscanRuntimeError> {
VectorscanRuntimeError::from_native(hs::hs_expand_stream_at(
db.as_ref_native(),
mem::transmute(self.buf.as_ptr()),
self.buf.len(),
to,
))
}
}
}
pub use compress::{CompressReserveBehavior, CompressedStream};