pub struct ScratchStreamSinkChannel<'code> {
pub live: Box<dyn Handle<R = LiveStream> + Send>,
pub hf: Box<dyn FnMut(StreamMatch) -> MatchResult + Send + 'code>,
pub scratch: Box<dyn Handle<R = Scratch> + Send>,
pub rx: UnboundedReceiver<StreamMatch>,
}
stream
and async
only.Expand description
An async
version of super::ScratchStreamSink
.
By holding handles to Self::live
and Self::scratch
, the stream
scanning API can be made quite fluent, without as many parameters per
call:
#[cfg(feature = "compiler")]
fn main() -> Result<(), vectorscan::error::VectorscanError> { tokio_test::block_on(async {
use vectorscan::{expression::*, flags::*, stream::channel::*, matchers::*};
use futures_util::StreamExt;
use std::ops::Range;
let expr: Expression = "a+".parse()?;
let db = expr.compile(Flags::SOM_LEFTMOST, Mode::STREAM | Mode::SOM_HORIZON_LARGE)?;
let scratch = db.allocate_scratch()?;
let live = db.allocate_stream()?;
let mut match_fn = |_: &_| MatchResult::Continue;
let mut sink = ScratchStreamSinkChannel::new(live, &mut match_fn, scratch);
sink.scan("aardvark".into()).await?;
sink.flush_eod().await?;
let matches: Vec<Range<usize>> = sink.collect_matches()
.map(|m| m.range.into())
.collect()
.await;
assert_eq!(&matches, &[0..1, 0..2, 5..6]);
Ok(())
})}
Fields§
§live: Box<dyn Handle<R = LiveStream> + Send>
Cloneable handle to a stateful stream.
hf: Box<dyn FnMut(StreamMatch) -> MatchResult + Send + 'code>
A “handler function” synthesized by Self::new()
which closes over an
mpsc::UnboundedSender
.
scratch: Box<dyn Handle<R = Scratch> + Send>
Cloneable handle to a scratch space initialized for the same db as
Self::live
.
rx: UnboundedReceiver<StreamMatch>
The other half of the sender/receiver pair created by Self::new()
.
Implementations§
Source§impl<'code> ScratchStreamSinkChannel<'code>
impl<'code> ScratchStreamSinkChannel<'code>
Sourcepub fn new(
live: impl Handle<R = LiveStream> + Send,
hf: &'code mut (dyn FnMut(&StreamMatch) -> MatchResult + Send + 'code),
scratch: impl Handle<R = Scratch> + Send,
) -> Self
pub fn new( live: impl Handle<R = LiveStream> + Send, hf: &'code mut (dyn FnMut(&StreamMatch) -> MatchResult + Send + 'code), scratch: impl Handle<R = Scratch> + Send, ) -> Self
Generate an mpsc::unbounded_channel()
and a wrapper over the
provided handler function hf
that sends match objects into the
channel as messages.
Sourcepub async fn scan<'data>(
&mut self,
data: ByteSlice<'data>,
) -> Result<(), ScanError>
pub async fn scan<'data>( &mut self, data: ByteSlice<'data>, ) -> Result<(), ScanError>
Write a single contiguous string into the automaton.
#[cfg(feature = "compiler")]
fn main() -> Result<(), vectorscan::error::VectorscanError> { tokio_test::block_on(async {
use vectorscan::{expression::*, flags::*, stream::channel::*, matchers::*};
use futures_util::StreamExt;
use std::ops::Range;
let expr: Expression = "a+".parse()?;
let db = expr.compile(Flags::SOM_LEFTMOST, Mode::STREAM | Mode::SOM_HORIZON_LARGE)?;
let scratch = db.allocate_scratch()?;
let live = db.allocate_stream()?;
let mut match_fn = |_: &_| MatchResult::Continue;
let mut sink = ScratchStreamSinkChannel::new(live, &mut match_fn, scratch);
sink.scan("aardvarka".into()).await?;
sink.scan("a".into()).await?;
sink.flush_eod().await?;
let matches: Vec<Range<usize>> = sink.collect_matches()
.map(|m| m.range.into())
.collect()
.await;
// 8..10 crossed our non-contiguous inputs!
assert_eq!(&matches, &[0..1, 0..2, 5..6, 8..9, 8..10]);
Ok(())
})}
Sourcepub async fn scan_vectored<'data>(
&mut self,
data: VectoredByteSlices<'data, 'data>,
) -> Result<(), ScanError>
Available on crate feature vectored
only.
pub async fn scan_vectored<'data>( &mut self, data: VectoredByteSlices<'data, 'data>, ) -> Result<(), ScanError>
vectored
only.Write vectored string data into the automaton.
#[cfg(feature = "compiler")]
fn main() -> Result<(), vectorscan::error::VectorscanError> { tokio_test::block_on(async {
use vectorscan::{expression::*, flags::*, stream::channel::*, matchers::*, sources::*};
use futures_util::StreamExt;
use std::ops::Range;
let expr: Expression = "a+".parse()?;
let db = expr.compile(Flags::SOM_LEFTMOST, Mode::STREAM | Mode::SOM_HORIZON_LARGE)?;
let scratch = db.allocate_scratch()?;
let live = db.allocate_stream()?;
let mut match_fn = |_: &_| MatchResult::Continue;
let mut sink = ScratchStreamSinkChannel::new(live, &mut match_fn, scratch);
let input: [ByteSlice; 2] = [
"aardvarka".into(),
"a".into(),
];
sink.scan_vectored(input.as_ref().into()).await?;
sink.flush_eod().await?;
let matches: Vec<Range<usize>> = sink.collect_matches()
.map(|m| m.range.into())
.collect()
.await;
// 8..10 crossed our non-contiguous inputs!
assert_eq!(&matches, &[0..1, 0..2, 5..6, 8..9, 8..10]);
Ok(())
})}
Sourcepub async fn flush_eod(&mut self) -> Result<(), ScanError>
pub async fn flush_eod(&mut self) -> Result<(), ScanError>
Trigger any match callbacks that require matching against the end of data (EOD).
Expression::info()
returns a MatchAtEndBehavior
can be used to
determine whether this check is necessary. But it typically makes sense
to execute it exactly once at the end of every stream instead of trying
to optimize this away.
Sourcepub fn collect_matches(self) -> impl Stream<Item = StreamMatch>
pub fn collect_matches(self) -> impl Stream<Item = StreamMatch>
Call mpsc::UnboundedReceiver::close()
on Self::rx
then convert
it into an async iterable stream.
Sourcepub fn reset(&mut self) -> Result<(), VectorscanRuntimeError>
pub fn reset(&mut self) -> Result<(), VectorscanRuntimeError>
Reach into Self::live
and call LiveStream::reset()
.