Skip to main content

wpl/eval/runtime/
vm_unit.rs

1use crate::ast::WplPipe;
2use crate::ast::group::{WplGroup, WplGroupType};
3use crate::ast::{WplExpress, WplStatementType};
4use crate::ast::{WplField, WplSep};
5use crate::compat::New3;
6use crate::eval::builtins::{self, PipeLineResult, raw_to_utf8_string};
7use crate::eval::runtime::field::FieldEvalUnit;
8use crate::eval::runtime::field_pipe::PipeEnum;
9use crate::eval::runtime::group::WplEvalGroup;
10use std::borrow::Cow;
11use wp_model_core::raw::RawData;
12use wp_parse_api::{PipeHold, WparseError, WparseReason};
13
14use crate::parser::error::{WplCodeError, WplCodeReason};
15use crate::parser::wpl_rule::wpl_rule;
16use orion_error::conversion::ToStructError;
17use orion_error::{ErrorWith, UvsFrom};
18use wp_log::debug_edata;
19use wp_model_core::model::DataRecord;
20use wp_primitives::Parser;
21use wp_primitives::WResult as ModalResult;
22
23// Internal DataResult for wp-lang usage
24// Plugin developers should use wp_parse_api::DataResult instead
25pub type DataResult = Result<(DataRecord, String), WparseError>;
26pub const OPTIMIZE_TIMES: usize = 10000;
27
28pub trait IntoRawData {
29    fn into_raw(self) -> RawData;
30}
31
32impl IntoRawData for RawData {
33    fn into_raw(self) -> RawData {
34        self
35    }
36}
37
38impl IntoRawData for String {
39    fn into_raw(self) -> RawData {
40        RawData::from_string(self)
41    }
42}
43
44impl IntoRawData for &String {
45    fn into_raw(self) -> RawData {
46        RawData::from_string(self.as_str())
47    }
48}
49
50impl IntoRawData for &str {
51    fn into_raw(self) -> RawData {
52        RawData::from_string(self)
53    }
54}
55
56#[derive(Default, Clone)]
57pub struct WplEvaluator {
58    preorder: Vec<PipeHold>,
59    group_units: Vec<WplEvalGroup>,
60}
61unsafe impl Send for WplEvaluator {}
62
63impl WplEvaluator {
64    pub fn preorder_proc(&self, data: RawData) -> Result<Vec<PipeLineResult>, WparseError> {
65        let mut pipe_obj = Vec::new();
66        let mut target = data;
67        for proc_unit in &self.preorder {
68            target = proc_unit.process(target)?;
69            pipe_obj.push(PipeLineResult {
70                name: proc_unit.name().to_string(),
71                result: raw_to_utf8_string(&target),
72            });
73        }
74        Ok(pipe_obj)
75    }
76
77    fn pipe_proc(&self, e_id: u64, data: RawData) -> Result<RawData, WparseError> {
78        let mut target = data;
79        for proc_unit in &self.preorder {
80            target = proc_unit
81                .process(target)
82                .doing("pipe convert")
83                .with_context(e_id.to_string())
84                .with_context(proc_unit.name())?;
85
86            debug_edata!(
87                e_id,
88                "pipe  {}  out:{}",
89                proc_unit.name(),
90                raw_to_utf8_string(&target)
91            );
92        }
93        Ok(target)
94    }
95
96    /// 从引用解析,仅在 preorder 不为空时 clone 数据。
97    /// 高频调用路径避免每条规则都 clone payload。
98    pub fn proc_ref(&self, e_id: u64, data: &RawData, oth_suc_len: usize) -> DataResult {
99        let owned: Option<RawData>;
100        let working_raw: &RawData = if !self.preorder.is_empty() {
101            owned = Some(self.pipe_proc(e_id, data.clone())?);
102            owned.as_ref().unwrap()
103        } else {
104            data
105        };
106
107        let input_holder: Cow<'_, str> = match working_raw {
108            RawData::String(s) => Cow::Borrowed(s.as_str()),
109            RawData::Bytes(b) => Cow::Owned(String::from_utf8_lossy(b).into_owned()),
110            RawData::ArcBytes(b) => Cow::Owned(String::from_utf8_lossy(b).into_owned()),
111        };
112        let mut input: &str = input_holder.as_ref();
113
114        let ori_len = input.len();
115        match self.parse_groups(e_id, &mut input) {
116            Ok(log) => Ok((log, input.to_string())),
117            Err(e) => {
118                let cur_pos = input.len();
119                let pos = ori_len - cur_pos;
120                if pos >= oth_suc_len {
121                    let preview: String = if input.len() <= 80 {
122                        input.to_string()
123                    } else {
124                        input.chars().take(80).collect()
125                    };
126                    Err(WparseReason::from_data()
127                        .to_err()
128                        .with_detail(format!("{preview} @ {pos}"))
129                        .with_detail(e.to_string()))
130                } else {
131                    Err(WparseError::from(WparseReason::NotMatch))
132                }
133            }
134        }
135    }
136
137    pub fn proc<D>(&self, e_id: u64, data: D, oth_suc_len: usize) -> DataResult
138    where
139        D: IntoRawData,
140    {
141        let raw: RawData = data.into_raw();
142        self.proc_ref(e_id, &raw, oth_suc_len)
143    }
144    pub fn from_code(code: &str) -> Result<Self, WplCodeError> {
145        let mut cur_code = code;
146        let rule = wpl_rule.parse_next(&mut cur_code).map_err(
147            |err| {
148                WplCodeReason::from_data()
149                    .to_err()
150                    .with_detail(cur_code.to_string())
151                    .with_detail(err.to_string())
152            }, //ParseCodeError::new(err.to_string())
153        )?;
154        let WplStatementType::Express(rule_define) = rule.statement;
155        Self::from(&rule_define, None)
156    }
157    pub fn from(dy_lang: &WplExpress, inject: Option<&WplExpress>) -> Result<Self, WplCodeError> {
158        let mut target_dpl = WplEvaluator {
159            ..Default::default()
160        };
161        if let Some(inject) = inject {
162            Self::assemble_ins(inject, &mut target_dpl)?;
163        }
164        Self::assemble_ins(dy_lang, &mut target_dpl)?;
165        Ok(target_dpl)
166    }
167
168    fn assemble_ins(
169        express: &WplExpress,
170        target_dpl: &mut WplEvaluator,
171    ) -> Result<(), WplCodeError> {
172        builtins::ensure_builtin_pipe_units();
173        for proc in &express.pipe_process {
174            if let Some(pipe_unit) = builtins::registry::create_pipe_unit(proc) {
175                target_dpl.preorder.push(pipe_unit);
176            } else {
177                return Err(WplCodeError::builder(WplCodeReason::UnSupport)
178                    .detail(format!("Pipe processor '{}' not registered", proc))
179                    .finish());
180            }
181        }
182        for (i, group) in express.group.iter().enumerate() {
183            let p_group = Self::assemble_group(i + 1, group)?;
184            target_dpl.group_units.push(p_group);
185        }
186        Ok(())
187    }
188    fn assemble_group(index: usize, group: &WplGroup) -> Result<WplEvalGroup, WplCodeError> {
189        let mut p_group =
190            WplEvalGroup::new(index, group.meta.clone(), group.base_group_sep.clone());
191        for (idx, conf) in group.fields.iter().enumerate() {
192            let fpu = Self::assemble_fpu(idx + 1, conf, group.meta.clone())?;
193            p_group.field_units.push(fpu)
194        }
195        Ok(p_group)
196    }
197    fn assemble_pipe(parent_idx: usize, pipe: &WplPipe) -> Result<PipeEnum, WplCodeError> {
198        match pipe {
199            WplPipe::Fun(fun) => Ok(PipeEnum::Fun(fun.clone())),
200            WplPipe::Group(group) => {
201                let mut p_group = WplEvalGroup::new(
202                    parent_idx * 10,
203                    group.meta.clone(),
204                    group.base_group_sep.clone(),
205                );
206                for conf in &group.fields {
207                    let fpu = Self::assemble_fpu(0, conf, group.meta.clone())?;
208                    p_group.field_units.push(fpu)
209                }
210                Ok(PipeEnum::Group(p_group))
211            }
212        }
213    }
214
215    pub fn assemble_fpu(
216        idx: usize,
217        conf: &WplField,
218        grp: WplGroupType,
219    ) -> Result<FieldEvalUnit, WplCodeError> {
220        let mut fpu = Self::build_fpu(idx, &grp, conf)?;
221        if let Some(subs) = conf.sub_fields() {
222            for (k, conf) in subs.conf_items().exact_iter() {
223                let sub_fpu = Self::build_fpu(0, &grp, conf)?;
224                fpu.add_sub_fpu(k.clone(), sub_fpu);
225            }
226            for (k, _, conf) in subs.conf_items().wild_iter() {
227                let sub_fpu = Self::build_fpu(0, &grp, conf)?;
228                fpu.add_sub_fpu(k.clone(), sub_fpu);
229            }
230        }
231        Ok(fpu)
232    }
233
234    fn build_fpu(
235        idx: usize,
236        grp: &WplGroupType,
237        conf: &WplField,
238    ) -> Result<FieldEvalUnit, WplCodeError> {
239        let mut fpu = FieldEvalUnit::create(idx, conf.clone(), grp.clone())?;
240        for pipe_conf in conf.pipe.clone() {
241            let pipe = Self::assemble_pipe(idx, &pipe_conf)?;
242            fpu.add_pipe(pipe);
243        }
244        Ok(fpu)
245    }
246    //pub fn fields_proc(&self, data: &mut &str) -> WparseResult<DataRecord> {
247    pub fn parse_groups(&self, e_id: u64, data: &mut &str) -> ModalResult<DataRecord> {
248        let mut result = Vec::with_capacity(100);
249
250        let sep = WplSep::default();
251        for group_unit in self.group_units.iter() {
252            match group_unit.proc(e_id, &sep, data, &mut result) {
253                Ok(_) => {}
254                Err(e) => {
255                    return Err(e);
256                }
257            }
258        }
259        let storage_items: Vec<_> = result
260            .into_iter()
261            .map(wp_model_core::model::FieldStorage::from_owned)
262            .collect();
263        Ok(DataRecord::from(storage_items))
264    }
265}
266
267pub struct StopWatch {
268    continuous: bool,
269    run_cnt: Option<usize>,
270}
271
272impl StopWatch {
273    pub fn tag_used(&mut self) {
274        if self.continuous
275            && let Some(cnt) = &mut self.run_cnt
276        {
277            *cnt -= 1;
278        }
279    }
280    pub fn is_stop(&self) -> bool {
281        if !self.continuous {
282            true
283        } else {
284            self.run_cnt == Some(0)
285        }
286    }
287    pub fn allow_try(&self) -> bool {
288        self.continuous && self.run_cnt.is_none()
289    }
290    pub fn new(continuous: bool, run_cnt: Option<usize>) -> Self {
291        Self {
292            continuous,
293            run_cnt,
294        }
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use crate::ast::fld_fmt::for_test::{fdc2, fdc2_1, fdc3, fdc4_1};
301    use crate::ast::{WplField, WplFieldFmt};
302    use crate::compat::New1;
303    use crate::eval::builtins::raw_to_utf8_string;
304    use crate::eval::runtime::vm_unit::WplEvaluator;
305    use crate::eval::value::parse_def::Hold;
306    use crate::parser::error::IntoWplCodeError;
307    use crate::parser::error::WplCodeResult;
308    use crate::{WparseResult, WplExpress, register_wpl_pipe};
309    use orion_error::testcase::TestAssert;
310    use smol_str::SmolStr;
311    use wp_model_core::raw::RawData;
312    use wp_parse_api::PipeProcessor;
313
314    #[test]
315    fn log_test_ty() -> WplCodeResult<()> {
316        let mut data = r#"<158> May 15 14:19:16 skyeye SyslogClient[1]: 2023-05-15 14:19:16|10.180.8.8|alarm| {"_origin": 1}"#;
317
318        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
319        let ppl = WplEvaluator::from(&conf, None)?;
320
321        let result = ppl.parse_groups(0, &mut data).assert();
322        result.items.iter().for_each(|f| println!("{}", f));
323        Ok(())
324    }
325
326    #[test]
327    fn log_test_ips() -> WplCodeResult<()> {
328        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
329        let ppl = WplEvaluator::from(&conf, None)?;
330        let mut data = r#"id=tos time="2023-05-15 09:11:53" fw=OS  pri=5 type=mgmt user=superman src=10.111.233.51 op="Modify pwd of manager" result=0 recorder=manager_so msg="null""#;
331        let result = ppl.parse_groups(0, &mut data).assert();
332        result.items.iter().for_each(|f| println!("{}", f));
333        let mut data = r#"id=tos time="2023-05-15 09:11:53" fw=OS  pri=5 type=mgmt user=superman src=10.111.233.51 op="system admininfo modify name zhaolei new_password QXF5dW53ZWleMDIwNw== privilege config login_type local comment 安全管理员 add" result=0 recorder=config msg="nuid=tos time="2023-05-15 09:11:53" fw=OS  pri=5 type=mgmt user=superman src=10.111.233.51 op="webtr webadmin show" result=-1 recorder=config msg="error -8010 : 无效输入,分析" "#;
334        let result = ppl.parse_groups(0, &mut data).assert();
335        result.items.iter().for_each(|f| println!("{}", f));
336        Ok(())
337    }
338
339    //59.x.x.x - - [06/Aug/2019:12:12:19 +0800] "GET /nginx-logo.png HTTP/1.1" 200 368 "http://119.x.x.x/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36" "-"
340    #[test]
341    fn log_test_nginx() -> WplCodeResult<()> {
342        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
343        let ppl = WplEvaluator::from(&conf, None)?;
344        let mut data = r#"192.168.1.2 - - [06/Aug/2019:12:12:19 +0800] "GET /nginx-logo.png HTTP/1.1" 200 368 "http://119.122.1.4/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36" "-""#;
345
346        let result = ppl.parse_groups(0, &mut data).assert();
347        assert_eq!(data, "");
348        result.items.iter().for_each(|f| println!("{}", f));
349        Ok(())
350    }
351
352    #[test]
353    fn test_huawei_default() -> WplCodeResult<()> {
354        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6, source-ip=10.111.117.49, source-port=34616, destination-ip=10.111.48.230, destination-port=50051, time=2023/5/15 15:09:12, source-zone=untrust, destination-zone=trust, application-name=, line-name=HO202212080377705-1.%"#;
355
356        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
357        let ppl = WplEvaluator::from(&conf, None)?;
358        let result = ppl.parse_groups(0, &mut data).assert();
359
360        assert_eq!(data, "");
361        result.items.iter().for_each(|f| println!("{}", f));
362        Ok(())
363    }
364
365    #[test]
366    fn test_huawei_detail() -> WplCodeResult<()> {
367        //*auto chars: auto; *auto,
368        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6"#;
369        let fmt = WplFieldFmt {
370            //separator: PrioSep::default(),
371            scope_beg: Some("<".to_string()),
372            scope_end: Some(">".to_string()),
373            field_cnt: None,
374            sub_fmt: None,
375        };
376        let conf = WplExpress::new(vec![
377            fdc2_1("digit", fmt).map_err(|e| e.into_wpl_err())?,
378            fdc2("auto", " ").map_err(|e| e.into_wpl_err())?,
379            fdc2("chars", " ").map_err(|e| e.into_wpl_err())?,
380            fdc2("chars", ":").map_err(|e| e.into_wpl_err())?,
381            fdc2("kv", ";").map_err(|e| e.into_wpl_err())?,
382            fdc2("auto", ",").map_err(|e| e.into_wpl_err())?,
383            fdc2("auto", ",").map_err(|e| e.into_wpl_err())?,
384        ]);
385
386        let ppl = WplEvaluator::from(&conf, None)?;
387        let result = ppl.parse_groups(0, &mut data).assert();
388        assert_eq!(data, "");
389        result.items.iter().for_each(|f| println!("{}", f));
390        Ok(())
391    }
392
393    #[test]
394    fn test_huawei_simple() -> WplCodeResult<()> {
395        //*auto chars: auto; *auto,
396        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6"#;
397        let conf = WplExpress::new(vec![
398            fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?,
399            fdc2("chars", ":").map_err(|e| e.into_wpl_err())?,
400            fdc3("auto", ";", false).map_err(|e| e.into_wpl_err())?,
401            fdc3("auto", ",", true).map_err(|e| e.into_wpl_err())?,
402        ]);
403        let ppl = WplEvaluator::from(&conf, None)?;
404        let result = ppl.parse_groups(0, &mut data).assert();
405        assert_eq!(data, "");
406        result.items.iter().for_each(|f| println!("{:?}", f));
407        Ok(())
408    }
409
410    #[test]
411    fn test_huawei_simple2() -> WplCodeResult<()> {
412        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6"#;
413        let conf = WplExpress::new(vec![
414            WplField::try_parse("symbol(<190>)[5]").assert(),
415            fdc3("time", " ", false).map_err(|e| e.into_wpl_err())?,
416            WplField::try_parse("symbol(KM)[2]").assert(),
417            fdc2("chars", ":").map_err(|e| e.into_wpl_err())?,
418            fdc3("auto", ";", false).map_err(|e| e.into_wpl_err())?,
419            fdc3("auto", ",", true).map_err(|e| e.into_wpl_err())?,
420        ]);
421        let ppl = WplEvaluator::from(&conf, None)?;
422        let result = ppl.parse_groups(0, &mut data).assert();
423        assert_eq!(data, "");
424        result.items.iter().for_each(|f| println!("{:?}", f));
425        Ok(())
426    }
427
428    #[test]
429    fn test_gen() -> WplCodeResult<()> {
430        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740,2022-1-18 19:30:30,jki=BkRzBo0f,138.11.13.43,tEu=GRcCwKkR,chars_493,Mrc=EskxskU3,sYp=jfKkn7th,UBa=eKhcfd9h,nXa=ZQSta6Je"#;
431        let conf = WplExpress::new(vec![
432            fdc3("digit", ",", false).map_err(|e| e.into_wpl_err())?,
433            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
434            fdc3("sn", ",", false).map_err(|e| e.into_wpl_err())?,
435            fdc3("chars", ",", false).map_err(|e| e.into_wpl_err())?,
436            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
437            fdc3("auto", ",", true).map_err(|e| e.into_wpl_err())?,
438        ]);
439        let ppl = WplEvaluator::from(&conf, None)?;
440        let result = ppl.parse_groups(0, &mut data).assert();
441        assert_eq!(data, "");
442        result.items.iter().for_each(|f| println!("{}", f));
443        Ok(())
444    }
445
446    #[test]
447    fn test_gen2() -> WplCodeResult<()> {
448        let mut data = r#"7106,2020-6-10 2:54:9,U5BH-UC-UQVY-MMKU,chars_472,2020-9-22 13:4:6,Emm=LXJDV5DC,22.161.67.67,nsL=LvVRv5uf,chars_1534,DNw=0xCQKTaQ,UFh=dMPbabRG,q29=aMsZTj83,oUi=ywMsKT2G"#;
449        let conf = WplExpress::new(vec![
450            fdc3("digit", ",", false).map_err(|e| e.into_wpl_err())?,
451            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
452            fdc3("sn", ",", false).map_err(|e| e.into_wpl_err())?,
453            fdc3("chars", ",", false).map_err(|e| e.into_wpl_err())?,
454            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
455            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
456            fdc3("ip", ",", false).map_err(|e| e.into_wpl_err())?,
457            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
458            fdc3("chars", ",", false).map_err(|e| e.into_wpl_err())?,
459            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
460            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
461            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
462            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
463        ]);
464        let ppl = WplEvaluator::from(&conf, None)?;
465        let result = ppl.parse_groups(0, &mut data).assert();
466        assert_eq!(data, "");
467        result.items.iter().for_each(|f| println!("{}", f));
468
469        let mut data = r#"1857,2021-4-10 0:46:8,R2IP-IF-06UT-7KUU,chars_1914,2021-4-15 2:19:43,u6s=TNSAlucV,228.211.38.109,k02=doYanSlf,chars_276,SIw=nu8atSqT,84e=e6qUb2k7,aVs=pk8M8rQU,5An=9upLU8aa"#;
470        let result = ppl.parse_groups(0, &mut data).assert();
471        assert_eq!(data, "");
472        result.items.iter().for_each(|f| println!("{}", f));
473        Ok(())
474    }
475
476    #[test]
477    fn preorder_plg_pipe_unit_executes() -> WplCodeResult<()> {
478        #[derive(Debug)]
479        struct MockStage;
480
481        impl PipeProcessor for MockStage {
482            fn process(&self, data: RawData) -> WparseResult<RawData> {
483                let mut value = raw_to_utf8_string(&data);
484                value.push_str("-mock");
485                Ok(RawData::from_string(value))
486            }
487
488            fn name(&self) -> &'static str {
489                "mock_stage"
490            }
491        }
492
493        register_wpl_pipe!("plg_pipe/MOCK-STAGE", || Hold::new(MockStage));
494
495        let mut expr =
496            WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
497        expr.pipe_process = vec![SmolStr::from("plg_pipe/MOCK-STAGE")];
498
499        let evaluator = WplEvaluator::from(&expr, None)?;
500        let results = evaluator
501            .preorder_proc(RawData::from_string("data".to_string()))
502            .map_err(|e| e.into_wpl_err())?;
503        assert_eq!(results.len(), 1);
504        assert_eq!(results[0].result, "data-mock");
505        assert_eq!(results[0].name, "mock_stage");
506        Ok(())
507    }
508
509    #[test]
510    fn test_ignore() -> WplCodeResult<()> {
511        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740"#;
512        let conf = WplExpress::new(vec![
513            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
514            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
515            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
516            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
517        ]);
518        let ppl = WplEvaluator::from(&conf, None)?;
519        let result = ppl.parse_groups(0, &mut data).assert();
520        assert_eq!(data, "");
521        result.items.iter().for_each(|f| println!("{}", f));
522        Ok(())
523    }
524
525    #[test]
526    fn test_ignore_cnt() -> WplCodeResult<()> {
527        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740"#;
528        let conf = WplExpress::new(vec![
529            fdc4_1("_", ",", true, 4).map_err(|e| e.into_wpl_err())?,
530        ]);
531        let ppl = WplEvaluator::from(&conf, None)?;
532        let result = ppl.parse_groups(0, &mut data).assert();
533        assert_eq!(data, "");
534        assert_eq!(result.items.len(), 4);
535        result.items.iter().for_each(|f| println!("{}", f));
536
537        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740"#;
538        let conf = WplExpress::new(vec![
539            fdc4_1("_", ",", true, 3).map_err(|e| e.into_wpl_err())?,
540        ]);
541        let ppl = WplEvaluator::from(&conf, None)?;
542        let result = ppl.parse_groups(0, &mut data).assert();
543        assert_eq!(data, "chars_740");
544        assert_eq!(result.items.len(), 3);
545        Ok(())
546    }
547
548    #[test]
549    fn test_pipe_unit_direct_lookup() -> WplCodeResult<()> {
550        use crate::eval::builtins::raw_to_utf8_string;
551        use crate::{create_preorder_pipe_unit, list_preorder_pipe_units};
552
553        // Define MockStage for this test
554        #[derive(Debug)]
555        struct TestMockStage;
556
557        impl PipeProcessor for TestMockStage {
558            fn process(&self, data: RawData) -> WparseResult<RawData> {
559                let mut value = raw_to_utf8_string(&data);
560                value.push_str("-mock");
561                Ok(RawData::from_string(value))
562            }
563
564            fn name(&self) -> &'static str {
565                "test_mock_stage"
566            }
567        }
568
569        // Test direct lookup after simplification
570        // Register test processors with different naming schemes
571        register_wpl_pipe!("direct-test", || Hold::new(TestMockStage));
572        register_wpl_pipe!("plg_pipe/mock-prefix", || Hold::new(TestMockStage));
573
574        // Test that direct naming works
575        let processor = create_preorder_pipe_unit("direct-test");
576        assert!(processor.is_some(), "Should find direct-test processor");
577
578        // Test that prefixed registration also works (with full name)
579        let processor = create_preorder_pipe_unit("plg_pipe/mock-prefix");
580        assert!(
581            processor.is_some(),
582            "Should find plg_pipe/with-prefix processor"
583        );
584
585        // Test actual processing with different naming
586        let test_data = RawData::from_string("hello".to_string());
587
588        if let Some(processor) = create_preorder_pipe_unit("direct-test") {
589            let result = processor
590                .process(test_data.clone())
591                .map_err(|e| e.into_wpl_err())?;
592            assert_eq!(raw_to_utf8_string(&result), "hello-mock");
593        }
594
595        if let Some(processor) = create_preorder_pipe_unit("plg_pipe/mock-prefix") {
596            let result = processor.process(test_data).map_err(|e| e.into_wpl_err())?;
597            assert_eq!(raw_to_utf8_string(&result), "hello-mock");
598        }
599
600        // List all processors to see what's registered
601        let processors = list_preorder_pipe_units();
602        println!("All registered processors: {:?}", processors);
603
604        // Verify both naming approaches work (names are converted to uppercase)
605        assert!(processors.contains(&"DIRECT-TEST".into()));
606        assert!(processors.contains(&"PLG_PIPE/MOCK-PREFIX".into()));
607
608        Ok(())
609    }
610
611    #[test]
612    fn test_simplified_assemble_ins_logic() -> WplCodeResult<()> {
613        use crate::eval::builtins::raw_to_utf8_string;
614        use crate::{create_preorder_pipe_unit, list_preorder_pipe_units};
615
616        // Define test processor
617        #[derive(Debug)]
618        struct SimplifiedTestStage;
619
620        impl PipeProcessor for SimplifiedTestStage {
621            fn process(&self, data: RawData) -> WparseResult<RawData> {
622                let mut value = raw_to_utf8_string(&data);
623                value.push_str("-simplified");
624                Ok(RawData::from_string(value))
625            }
626
627            fn name(&self) -> &'static str {
628                "simplified_test"
629            }
630        }
631
632        // Register processors with both naming styles
633        register_wpl_pipe!("simple-test", || Hold::new(SimplifiedTestStage));
634        register_wpl_pipe!("plg_pipe/simple-prefix", || Hold::new(SimplifiedTestStage));
635
636        // Test that both can be found directly
637        let processor1 = create_preorder_pipe_unit("simple-test");
638        assert!(processor1.is_some(), "Should find simple-test");
639
640        let processor2 = create_preorder_pipe_unit("plg_pipe/simple-prefix");
641        assert!(processor2.is_some(), "Should find plg_pipe/with-prefix");
642
643        // Test that processors registered with plg_pipe/ prefix can be found without it
644        // This would fail because registration is case-sensitive and stores full name
645        let processor3 = create_preorder_pipe_unit("simple-prefix");
646        assert!(
647            processor3.is_none(),
648            "Should NOT find with-prefix (was registered as plg_pipe/with-prefix)"
649        );
650
651        // Show all registered processors
652        let processors = list_preorder_pipe_units();
653        println!("All processors: {:?}", processors);
654
655        Ok(())
656    }
657}