use std::{
collections::VecDeque,
mem::swap,
sync::{Arc, Mutex},
};
use crate::{
error,
handler::Handler,
matcher::Matcher,
path::Path,
streamer::{Streamer, Token},
};
use super::{Output, Strategy};
type MatcherItem = (Box<dyn Matcher>, Option<Arc<Mutex<dyn Handler>>>);
pub struct Filter {
input_start: usize,
buffer_idx: usize,
buffer: VecDeque<u8>,
streamer: Streamer,
matchers: Vec<MatcherItem>,
matches: Option<(Path, Vec<usize>)>,
last_streaming_path: Option<Path>,
level: usize,
}
impl Default for Filter {
fn default() -> Self {
Self {
input_start: 0,
buffer_idx: 0,
buffer: VecDeque::new(),
matchers: vec![],
streamer: Streamer::new(),
matches: None,
last_streaming_path: None,
level: 0,
}
}
}
impl Strategy for Filter {
fn process(&mut self, input: &[u8]) -> Result<Vec<Output>, error::General> {
self.streamer.feed(input);
self.buffer.extend(input);
let mut result = Vec::new();
loop {
match self.streamer.read()? {
Token::Start(idx, kind) => {
if self.level == 0 {
result.push(Output::Start(None));
}
self.level += 1;
if let Some((path, matched_indexes)) = self.matches.take() {
let data = self.move_forward(idx);
self.feed_handlers(&matched_indexes, data)?;
self.matches = Some((path, matched_indexes));
} else {
let current_path = self.streamer.current_path().clone();
let matcher_indexes: Vec<usize> = self
.matchers
.iter()
.enumerate()
.map(|(idx, matcher)| (idx, matcher.0.match_path(¤t_path, kind)))
.filter(|(_, matched)| *matched)
.map(|(idx, _)| idx)
.collect();
if !matcher_indexes.is_empty() {
self.start_handlers(
¤t_path,
&matcher_indexes,
Token::Start(idx, kind),
)?;
self.matches = Some((current_path, matcher_indexes));
self.move_forward(idx); } else {
self.last_streaming_path = Some(current_path);
result
.push(Output::Data(self.move_forward(idx + 1).drain(..).collect()));
}
}
}
Token::End(idx, kind) => {
self.level -= 1;
if let Some((path, matched_indexes)) = self.matches.take() {
let data = self.move_forward(idx);
self.feed_handlers(&matched_indexes, data)?;
if &path == self.streamer.current_path() {
self.end_handlers(&path, &matched_indexes, Token::End(idx, kind))?;
} else {
self.matches = Some((path, matched_indexes));
}
} else {
self.last_streaming_path = Some(self.streamer.current_path().clone());
result.push(Output::Data(self.move_forward(idx).drain(..).collect()));
}
if self.level == 0 {
result.push(Output::End);
}
}
Token::Pending => {
self.input_start += input.len();
return Ok(result);
}
Token::Separator(idx) => {
if let Some(path) = self.last_streaming_path.as_ref() {
if self.streamer.current_path() == path {
self.move_forward(idx + 1);
}
}
}
}
}
}
fn terminate(&mut self) -> Result<Vec<Output>, error::General> {
if self.level == 0 {
Ok(vec![])
} else {
Err(error::InputTerminated::new(self.input_start).into())
}
}
}
impl Filter {
pub fn new() -> Self {
Self::default()
}
fn move_forward(&mut self, idx: usize) -> VecDeque<u8> {
let mut splitted = self.buffer.split_off(idx - self.buffer_idx);
swap(&mut self.buffer, &mut splitted);
self.buffer_idx = idx;
splitted
}
pub fn add_matcher(
&mut self,
matcher: Box<dyn Matcher>,
handler: Option<Arc<Mutex<dyn Handler>>>,
) {
self.matchers.push((matcher, handler));
}
fn start_handlers(
&self,
path: &Path,
matched_indexes: &[usize],
token: Token,
) -> Result<(), error::General> {
for (matcher_idx, handler) in matched_indexes
.iter()
.filter(|idx| self.matchers[**idx].1.is_some())
.map(|idx| (idx, self.matchers[*idx].1.as_ref().unwrap()))
{
let mut guard = handler.lock().unwrap();
guard.start(&path, *matcher_idx, token.clone())?;
}
Ok(())
}
fn feed_handlers(
&self,
matched_indexes: &[usize],
data: VecDeque<u8>,
) -> Result<(), error::General> {
let (first, second) = data.as_slices();
for (matcher_idx, handler) in matched_indexes
.iter()
.filter(|idx| self.matchers[**idx].1.is_some())
.map(|idx| (idx, self.matchers[*idx].1.as_ref().unwrap()))
{
let mut guard = handler.lock().unwrap();
guard.feed(first, *matcher_idx)?;
guard.feed(second, *matcher_idx)?;
}
Ok(())
}
fn end_handlers(
&self,
path: &Path,
matched_indexes: &[usize],
token: Token,
) -> Result<(), error::General> {
for (matcher_idx, handler) in matched_indexes
.iter()
.filter(|idx| self.matchers[**idx].1.is_some())
.map(|idx| (idx, self.matchers[*idx].1.as_ref().unwrap()))
{
let mut guard = handler.lock().unwrap();
guard.end(&path, *matcher_idx, token.clone())?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{Filter, Strategy};
use crate::{
matcher::{Combinator, Simple},
strategy::OutputConverter,
test::{Single, Splitter, Window},
};
use rstest::*;
fn get_input() -> Vec<u8> {
br#"{"users": [{"uid": 1}, {"uid": 2}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
.to_vec()
}
#[test]
fn single_matcher_no_match() {
let input = get_input();
let matcher = Simple::new(r#"{"no-existing"}[]{"uid"}"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect::<Vec<u8>>(),
input.clone()
);
}
#[test]
fn single_matcher_array_first() {
let input = get_input();
let matcher = Simple::new(r#"{"users"}[0]"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{"users": [ {"uid": 2}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
);
}
#[test]
fn single_matcher_array_last() {
let input = get_input();
let matcher = Simple::new(r#"{"users"}[2]"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{"users": [{"uid": 1}, {"uid": 2}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
);
}
#[test]
fn single_matcher_array_middle() {
let input = get_input();
let matcher = Simple::new(r#"{"users"}[1]"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{"users": [{"uid": 1}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
);
}
#[test]
fn single_matcher_array_all() {
let input = get_input();
let matcher = Simple::new(r#"{"users"}[]"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{"users": [], "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
);
}
#[test]
fn single_matcher_object_first() {
let input = get_input();
let matcher = Simple::new(r#"{"users"}"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{ "groups": [{"gid": 1}, {"gid": 2}], "void": {}}"#
);
}
#[test]
fn single_matcher_object_last() {
let input = get_input();
let matcher = Simple::new(r#"{"void"}"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{"users": [{"uid": 1}, {"uid": 2}, {"uid": 3}], "groups": [{"gid": 1}, {"gid": 2}]}"#
);
}
#[test]
fn single_matcher_object_middle() {
let input = get_input();
let matcher = Simple::new(r#"{"groups"}"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{"users": [{"uid": 1}, {"uid": 2}, {"uid": 3}], "void": {}}"#
);
}
#[test]
fn single_matcher_object_all() {
let input = get_input();
let matcher = Simple::new(r#"{}"#).unwrap();
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
assert_eq!(
String::from_utf8(
OutputConverter::new()
.convert(&filter.process(&input).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect()
)
.unwrap(),
r#"{}"#
);
}
#[rstest(
splitter,
case::single(Box::new(Single::new())),
case::window1(Box::new(Window::new(1))),
case::window5(Box::new(Window::new(5))),
case::window100(Box::new(Window::new(100)))
)]
fn combinator_slices(splitter: Box<dyn Splitter>) {
let input = get_input();
for parts in splitter.split(input) {
let matcher = Combinator::new(Simple::new(r#"{"users"}"#).unwrap())
| Combinator::new(Simple::new(r#"{"void"}"#).unwrap());
let mut filter = Filter::new();
filter.add_matcher(Box::new(matcher), None);
let mut result: Vec<u8> = Vec::new();
let mut converter = OutputConverter::new();
for part in parts {
result.extend(
converter
.convert(&filter.process(&part).unwrap())
.into_iter()
.map(|e| e.1)
.flatten()
.collect::<Vec<u8>>(),
);
}
assert_eq!(
String::from_utf8(result).unwrap(),
r#"{ "groups": [{"gid": 1}, {"gid": 2}]}"#
)
}
}
}