Skip to main content

wpl/eval/runtime/
vm_unit.rs

1use crate::ast::WplPipe;
2use crate::ast::group::{WplGroup, WplGroupType};
3use crate::ast::{WplExpress, WplStatementType};
4use crate::ast::{WplField, WplSep};
5use crate::compat::New3;
6use crate::eval::builtins::{self, PipeLineResult, raw_to_utf8_string};
7use crate::eval::runtime::field::FieldEvalUnit;
8use crate::eval::runtime::field_pipe::PipeEnum;
9use crate::eval::runtime::group::WplEvalGroup;
10use std::borrow::Cow;
11use wp_model_core::raw::RawData;
12use wp_parse_api::{PipeHold, WparseError, WparseReason};
13
14use crate::parser::error::{WplCodeError, WplCodeReason};
15use crate::parser::wpl_rule::wpl_rule;
16use orion_error::conversion::ToStructError;
17use orion_error::{ErrorWith, UvsFrom};
18use wp_log::debug_edata;
19use wp_model_core::model::DataRecord;
20use wp_primitives::Parser;
21use wp_primitives::WResult as ModalResult;
22
23// Internal DataResult for wp-lang usage
24// Plugin developers should use wp_parse_api::DataResult instead
25pub type DataResult = Result<(DataRecord, String), WparseError>;
26pub const OPTIMIZE_TIMES: usize = 10000;
27
28pub trait IntoRawData {
29    fn into_raw(self) -> RawData;
30}
31
32impl IntoRawData for RawData {
33    fn into_raw(self) -> RawData {
34        self
35    }
36}
37
38impl IntoRawData for String {
39    fn into_raw(self) -> RawData {
40        RawData::from_string(self)
41    }
42}
43
44impl IntoRawData for &String {
45    fn into_raw(self) -> RawData {
46        RawData::from_string(self.as_str())
47    }
48}
49
50impl IntoRawData for &str {
51    fn into_raw(self) -> RawData {
52        RawData::from_string(self)
53    }
54}
55
56#[derive(Default, Clone)]
57pub struct WplEvaluator {
58    preorder: Vec<PipeHold>,
59    group_units: Vec<WplEvalGroup>,
60}
61unsafe impl Send for WplEvaluator {}
62
63impl WplEvaluator {
64    pub fn preorder_proc(&self, data: RawData) -> Result<Vec<PipeLineResult>, WparseError> {
65        let mut pipe_obj = Vec::new();
66        let mut target = data;
67        for proc_unit in &self.preorder {
68            target = proc_unit.process(target)?;
69            pipe_obj.push(PipeLineResult {
70                name: proc_unit.name().to_string(),
71                result: raw_to_utf8_string(&target),
72            });
73        }
74        Ok(pipe_obj)
75    }
76
77    fn pipe_proc(&self, e_id: u64, data: RawData) -> Result<RawData, WparseError> {
78        let mut target = data;
79        for proc_unit in &self.preorder {
80            target = proc_unit
81                .process(target)
82                .doing("pipe convert")
83                .with_context(e_id.to_string())
84                .with_context(proc_unit.name())?;
85
86            debug_edata!(
87                e_id,
88                "pipe  {}  out:{}",
89                proc_unit.name(),
90                raw_to_utf8_string(&target)
91            );
92        }
93        Ok(target)
94    }
95
96    /// 从引用解析,仅在 preorder 不为空时 clone 数据。
97    /// 高频调用路径避免每条规则都 clone payload。
98    pub fn proc_ref(&self, e_id: u64, data: &RawData, oth_suc_len: usize) -> DataResult {
99        let owned: Option<RawData>;
100        let working_raw: &RawData = if !self.preorder.is_empty() {
101            owned = Some(self.pipe_proc(e_id, data.clone())?);
102            owned.as_ref().unwrap()
103        } else {
104            data
105        };
106
107        let input_holder: Cow<'_, str> = match working_raw {
108            RawData::String(s) => Cow::Borrowed(s.as_str()),
109            RawData::Bytes(b) => Cow::Owned(String::from_utf8_lossy(b).into_owned()),
110            RawData::ArcBytes(b) => Cow::Owned(String::from_utf8_lossy(b).into_owned()),
111        };
112        let mut input: &str = input_holder.as_ref();
113
114        let ori_len = input.len();
115        match self.parse_groups(e_id, &mut input) {
116            Ok(log) => Ok((log, input.to_string())),
117            Err(e) => {
118                let cur_pos = input.len();
119                let pos = ori_len - cur_pos;
120                if pos >= oth_suc_len {
121                    let preview: String = if input.len() <= 80 {
122                        input.to_string()
123                    } else {
124                        input.chars().take(80).collect()
125                    };
126                    Err(WparseReason::from_data()
127                        .to_err()
128                        .with_detail(format!("{preview} @ {pos}"))
129                        .with_detail(e.to_string()))
130                } else {
131                    Err(WparseError::from(WparseReason::NotMatch))
132                }
133            }
134        }
135    }
136
137    pub fn proc<D>(&self, e_id: u64, data: D, oth_suc_len: usize) -> DataResult
138    where
139        D: IntoRawData,
140    {
141        let raw: RawData = data.into_raw();
142        self.proc_ref(e_id, &raw, oth_suc_len)
143    }
144    pub fn from_code(code: &str) -> Result<Self, WplCodeError> {
145        let mut cur_code = code;
146        let rule = wpl_rule.parse_next(&mut cur_code).map_err(
147            |err| {
148                WplCodeReason::from_data()
149                    .to_err()
150                    .with_detail(cur_code.to_string())
151                    .with_detail(err.to_string())
152            }, //ParseCodeError::new(err.to_string())
153        )?;
154        let WplStatementType::Express(rule_define) = rule.statement;
155        Self::from(&rule_define, None)
156    }
157    pub fn from(dy_lang: &WplExpress, inject: Option<&WplExpress>) -> Result<Self, WplCodeError> {
158        let mut target_dpl = WplEvaluator {
159            ..Default::default()
160        };
161        if let Some(inject) = inject {
162            Self::assemble_ins(inject, &mut target_dpl)?;
163        }
164        Self::assemble_ins(dy_lang, &mut target_dpl)?;
165        Ok(target_dpl)
166    }
167
168    fn assemble_ins(
169        express: &WplExpress,
170        target_dpl: &mut WplEvaluator,
171    ) -> Result<(), WplCodeError> {
172        builtins::ensure_builtin_pipe_units();
173        for proc in &express.pipe_process {
174            if let Some(pipe_unit) = builtins::registry::create_pipe_unit(proc) {
175                target_dpl.preorder.push(pipe_unit);
176            } else {
177                return Err(WplCodeError::from(WplCodeReason::UnSupport(format!(
178                    "Pipe processor '{}' not registered",
179                    proc
180                ))));
181            }
182        }
183        for (i, group) in express.group.iter().enumerate() {
184            let p_group = Self::assemble_group(i + 1, group)?;
185            target_dpl.group_units.push(p_group);
186        }
187        Ok(())
188    }
189    fn assemble_group(index: usize, group: &WplGroup) -> Result<WplEvalGroup, WplCodeError> {
190        let mut p_group =
191            WplEvalGroup::new(index, group.meta.clone(), group.base_group_sep.clone());
192        for (idx, conf) in group.fields.iter().enumerate() {
193            let fpu = Self::assemble_fpu(idx + 1, conf, group.meta.clone())?;
194            p_group.field_units.push(fpu)
195        }
196        Ok(p_group)
197    }
198    fn assemble_pipe(parent_idx: usize, pipe: &WplPipe) -> Result<PipeEnum, WplCodeError> {
199        match pipe {
200            WplPipe::Fun(fun) => Ok(PipeEnum::Fun(fun.clone())),
201            WplPipe::Group(group) => {
202                let mut p_group = WplEvalGroup::new(
203                    parent_idx * 10,
204                    group.meta.clone(),
205                    group.base_group_sep.clone(),
206                );
207                for conf in &group.fields {
208                    let fpu = Self::assemble_fpu(0, conf, group.meta.clone())?;
209                    p_group.field_units.push(fpu)
210                }
211                Ok(PipeEnum::Group(p_group))
212            }
213        }
214    }
215
216    pub fn assemble_fpu(
217        idx: usize,
218        conf: &WplField,
219        grp: WplGroupType,
220    ) -> Result<FieldEvalUnit, WplCodeError> {
221        let mut fpu = Self::build_fpu(idx, &grp, conf)?;
222        if let Some(subs) = conf.sub_fields() {
223            for (k, conf) in subs.conf_items().exact_iter() {
224                let sub_fpu = Self::build_fpu(0, &grp, conf)?;
225                fpu.add_sub_fpu(k.clone(), sub_fpu);
226            }
227            for (k, _, conf) in subs.conf_items().wild_iter() {
228                let sub_fpu = Self::build_fpu(0, &grp, conf)?;
229                fpu.add_sub_fpu(k.clone(), sub_fpu);
230            }
231        }
232        Ok(fpu)
233    }
234
235    fn build_fpu(
236        idx: usize,
237        grp: &WplGroupType,
238        conf: &WplField,
239    ) -> Result<FieldEvalUnit, WplCodeError> {
240        let mut fpu = FieldEvalUnit::create(idx, conf.clone(), grp.clone())?;
241        for pipe_conf in conf.pipe.clone() {
242            let pipe = Self::assemble_pipe(idx, &pipe_conf)?;
243            fpu.add_pipe(pipe);
244        }
245        Ok(fpu)
246    }
247    //pub fn fields_proc(&self, data: &mut &str) -> WparseResult<DataRecord> {
248    pub fn parse_groups(&self, e_id: u64, data: &mut &str) -> ModalResult<DataRecord> {
249        let mut result = Vec::with_capacity(100);
250
251        let sep = WplSep::default();
252        for group_unit in self.group_units.iter() {
253            match group_unit.proc(e_id, &sep, data, &mut result) {
254                Ok(_) => {}
255                Err(e) => {
256                    return Err(e);
257                }
258            }
259        }
260        let storage_items: Vec<_> = result
261            .into_iter()
262            .map(wp_model_core::model::FieldStorage::from_owned)
263            .collect();
264        Ok(DataRecord::from(storage_items))
265    }
266}
267
268pub struct StopWatch {
269    continuous: bool,
270    run_cnt: Option<usize>,
271}
272
273impl StopWatch {
274    pub fn tag_used(&mut self) {
275        if self.continuous
276            && let Some(cnt) = &mut self.run_cnt
277        {
278            *cnt -= 1;
279        }
280    }
281    pub fn is_stop(&self) -> bool {
282        if !self.continuous {
283            true
284        } else {
285            self.run_cnt == Some(0)
286        }
287    }
288    pub fn allow_try(&self) -> bool {
289        self.continuous && self.run_cnt.is_none()
290    }
291    pub fn new(continuous: bool, run_cnt: Option<usize>) -> Self {
292        Self {
293            continuous,
294            run_cnt,
295        }
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use crate::ast::fld_fmt::for_test::{fdc2, fdc2_1, fdc3, fdc4_1};
302    use crate::ast::{WplField, WplFieldFmt};
303    use crate::compat::New1;
304    use crate::eval::builtins::raw_to_utf8_string;
305    use crate::eval::runtime::vm_unit::WplEvaluator;
306    use crate::eval::value::parse_def::Hold;
307    use crate::parser::error::IntoWplCodeError;
308    use crate::parser::error::WplCodeResult;
309    use crate::{WparseResult, WplExpress, register_wpl_pipe};
310    use orion_error::testcase::TestAssert;
311    use smol_str::SmolStr;
312    use wp_model_core::raw::RawData;
313    use wp_parse_api::PipeProcessor;
314
315    #[test]
316    fn log_test_ty() -> WplCodeResult<()> {
317        let mut data = r#"<158> May 15 14:19:16 skyeye SyslogClient[1]: 2023-05-15 14:19:16|10.180.8.8|alarm| {"_origin": 1}"#;
318
319        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
320        let ppl = WplEvaluator::from(&conf, None)?;
321
322        let result = ppl.parse_groups(0, &mut data).assert();
323        result.items.iter().for_each(|f| println!("{}", f));
324        Ok(())
325    }
326
327    #[test]
328    fn log_test_ips() -> WplCodeResult<()> {
329        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
330        let ppl = WplEvaluator::from(&conf, None)?;
331        let mut data = r#"id=tos time="2023-05-15 09:11:53" fw=OS  pri=5 type=mgmt user=superman src=10.111.233.51 op="Modify pwd of manager" result=0 recorder=manager_so msg="null""#;
332        let result = ppl.parse_groups(0, &mut data).assert();
333        result.items.iter().for_each(|f| println!("{}", f));
334        let mut data = r#"id=tos time="2023-05-15 09:11:53" fw=OS  pri=5 type=mgmt user=superman src=10.111.233.51 op="system admininfo modify name zhaolei new_password QXF5dW53ZWleMDIwNw== privilege config login_type local comment 安全管理员 add" result=0 recorder=config msg="nuid=tos time="2023-05-15 09:11:53" fw=OS  pri=5 type=mgmt user=superman src=10.111.233.51 op="webtr webadmin show" result=-1 recorder=config msg="error -8010 : 无效输入,分析" "#;
335        let result = ppl.parse_groups(0, &mut data).assert();
336        result.items.iter().for_each(|f| println!("{}", f));
337        Ok(())
338    }
339
340    //59.x.x.x - - [06/Aug/2019:12:12:19 +0800] "GET /nginx-logo.png HTTP/1.1" 200 368 "http://119.x.x.x/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36" "-"
341    #[test]
342    fn log_test_nginx() -> WplCodeResult<()> {
343        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
344        let ppl = WplEvaluator::from(&conf, None)?;
345        let mut data = r#"192.168.1.2 - - [06/Aug/2019:12:12:19 +0800] "GET /nginx-logo.png HTTP/1.1" 200 368 "http://119.122.1.4/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36" "-""#;
346
347        let result = ppl.parse_groups(0, &mut data).assert();
348        assert_eq!(data, "");
349        result.items.iter().for_each(|f| println!("{}", f));
350        Ok(())
351    }
352
353    #[test]
354    fn test_huawei_default() -> WplCodeResult<()> {
355        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6, source-ip=10.111.117.49, source-port=34616, destination-ip=10.111.48.230, destination-port=50051, time=2023/5/15 15:09:12, source-zone=untrust, destination-zone=trust, application-name=, line-name=HO202212080377705-1.%"#;
356
357        let conf = WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
358        let ppl = WplEvaluator::from(&conf, None)?;
359        let result = ppl.parse_groups(0, &mut data).assert();
360
361        assert_eq!(data, "");
362        result.items.iter().for_each(|f| println!("{}", f));
363        Ok(())
364    }
365
366    #[test]
367    fn test_huawei_detail() -> WplCodeResult<()> {
368        //*auto chars: auto; *auto,
369        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6"#;
370        let fmt = WplFieldFmt {
371            //separator: PrioSep::default(),
372            scope_beg: Some("<".to_string()),
373            scope_end: Some(">".to_string()),
374            field_cnt: None,
375            sub_fmt: None,
376        };
377        let conf = WplExpress::new(vec![
378            fdc2_1("digit", fmt).map_err(|e| e.into_wpl_err())?,
379            fdc2("auto", " ").map_err(|e| e.into_wpl_err())?,
380            fdc2("chars", " ").map_err(|e| e.into_wpl_err())?,
381            fdc2("chars", ":").map_err(|e| e.into_wpl_err())?,
382            fdc2("kv", ";").map_err(|e| e.into_wpl_err())?,
383            fdc2("auto", ",").map_err(|e| e.into_wpl_err())?,
384            fdc2("auto", ",").map_err(|e| e.into_wpl_err())?,
385        ]);
386
387        let ppl = WplEvaluator::from(&conf, None)?;
388        let result = ppl.parse_groups(0, &mut data).assert();
389        assert_eq!(data, "");
390        result.items.iter().for_each(|f| println!("{}", f));
391        Ok(())
392    }
393
394    #[test]
395    fn test_huawei_simple() -> WplCodeResult<()> {
396        //*auto chars: auto; *auto,
397        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6"#;
398        let conf = WplExpress::new(vec![
399            fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?,
400            fdc2("chars", ":").map_err(|e| e.into_wpl_err())?,
401            fdc3("auto", ";", false).map_err(|e| e.into_wpl_err())?,
402            fdc3("auto", ",", true).map_err(|e| e.into_wpl_err())?,
403        ]);
404        let ppl = WplEvaluator::from(&conf, None)?;
405        let result = ppl.parse_groups(0, &mut data).assert();
406        assert_eq!(data, "");
407        result.items.iter().for_each(|f| println!("{:?}", f));
408        Ok(())
409    }
410
411    #[test]
412    fn test_huawei_simple2() -> WplCodeResult<()> {
413        let mut data = r#"<190>May 15 2023 07:09:12 KM-KJY-DC-USG12004-B02 %%01POLICY/6/POLICYPERMIT(l):CID=0x814f041e;vsys=CSG_Security, protocol=6"#;
414        let conf = WplExpress::new(vec![
415            WplField::try_parse("symbol(<190>)[5]").assert(),
416            fdc3("time", " ", false).map_err(|e| e.into_wpl_err())?,
417            WplField::try_parse("symbol(KM)[2]").assert(),
418            fdc2("chars", ":").map_err(|e| e.into_wpl_err())?,
419            fdc3("auto", ";", false).map_err(|e| e.into_wpl_err())?,
420            fdc3("auto", ",", true).map_err(|e| e.into_wpl_err())?,
421        ]);
422        let ppl = WplEvaluator::from(&conf, None)?;
423        let result = ppl.parse_groups(0, &mut data).assert();
424        assert_eq!(data, "");
425        result.items.iter().for_each(|f| println!("{:?}", f));
426        Ok(())
427    }
428
429    #[test]
430    fn test_gen() -> WplCodeResult<()> {
431        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740,2022-1-18 19:30:30,jki=BkRzBo0f,138.11.13.43,tEu=GRcCwKkR,chars_493,Mrc=EskxskU3,sYp=jfKkn7th,UBa=eKhcfd9h,nXa=ZQSta6Je"#;
432        let conf = WplExpress::new(vec![
433            fdc3("digit", ",", false).map_err(|e| e.into_wpl_err())?,
434            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
435            fdc3("sn", ",", false).map_err(|e| e.into_wpl_err())?,
436            fdc3("chars", ",", false).map_err(|e| e.into_wpl_err())?,
437            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
438            fdc3("auto", ",", true).map_err(|e| e.into_wpl_err())?,
439        ]);
440        let ppl = WplEvaluator::from(&conf, None)?;
441        let result = ppl.parse_groups(0, &mut data).assert();
442        assert_eq!(data, "");
443        result.items.iter().for_each(|f| println!("{}", f));
444        Ok(())
445    }
446
447    #[test]
448    fn test_gen2() -> WplCodeResult<()> {
449        let mut data = r#"7106,2020-6-10 2:54:9,U5BH-UC-UQVY-MMKU,chars_472,2020-9-22 13:4:6,Emm=LXJDV5DC,22.161.67.67,nsL=LvVRv5uf,chars_1534,DNw=0xCQKTaQ,UFh=dMPbabRG,q29=aMsZTj83,oUi=ywMsKT2G"#;
450        let conf = WplExpress::new(vec![
451            fdc3("digit", ",", false).map_err(|e| e.into_wpl_err())?,
452            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
453            fdc3("sn", ",", false).map_err(|e| e.into_wpl_err())?,
454            fdc3("chars", ",", false).map_err(|e| e.into_wpl_err())?,
455            fdc3("time", ",", false).map_err(|e| e.into_wpl_err())?,
456            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
457            fdc3("ip", ",", false).map_err(|e| e.into_wpl_err())?,
458            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
459            fdc3("chars", ",", false).map_err(|e| e.into_wpl_err())?,
460            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
461            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
462            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
463            fdc3("kv", ",", false).map_err(|e| e.into_wpl_err())?,
464        ]);
465        let ppl = WplEvaluator::from(&conf, None)?;
466        let result = ppl.parse_groups(0, &mut data).assert();
467        assert_eq!(data, "");
468        result.items.iter().for_each(|f| println!("{}", f));
469
470        let mut data = r#"1857,2021-4-10 0:46:8,R2IP-IF-06UT-7KUU,chars_1914,2021-4-15 2:19:43,u6s=TNSAlucV,228.211.38.109,k02=doYanSlf,chars_276,SIw=nu8atSqT,84e=e6qUb2k7,aVs=pk8M8rQU,5An=9upLU8aa"#;
471        let result = ppl.parse_groups(0, &mut data).assert();
472        assert_eq!(data, "");
473        result.items.iter().for_each(|f| println!("{}", f));
474        Ok(())
475    }
476
477    #[test]
478    fn preorder_plg_pipe_unit_executes() -> WplCodeResult<()> {
479        #[derive(Debug)]
480        struct MockStage;
481
482        impl PipeProcessor for MockStage {
483            fn process(&self, data: RawData) -> WparseResult<RawData> {
484                let mut value = raw_to_utf8_string(&data);
485                value.push_str("-mock");
486                Ok(RawData::from_string(value))
487            }
488
489            fn name(&self) -> &'static str {
490                "mock_stage"
491            }
492        }
493
494        register_wpl_pipe!("plg_pipe/MOCK-STAGE", || Hold::new(MockStage));
495
496        let mut expr =
497            WplExpress::new(vec![fdc3("auto", " ", true).map_err(|e| e.into_wpl_err())?]);
498        expr.pipe_process = vec![SmolStr::from("plg_pipe/MOCK-STAGE")];
499
500        let evaluator = WplEvaluator::from(&expr, None)?;
501        let results = evaluator
502            .preorder_proc(RawData::from_string("data".to_string()))
503            .map_err(|e| e.into_wpl_err())?;
504        assert_eq!(results.len(), 1);
505        assert_eq!(results[0].result, "data-mock");
506        assert_eq!(results[0].name, "mock_stage");
507        Ok(())
508    }
509
510    #[test]
511    fn test_ignore() -> WplCodeResult<()> {
512        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740"#;
513        let conf = WplExpress::new(vec![
514            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
515            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
516            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
517            fdc3("_", ",", false).map_err(|e| e.into_wpl_err())?,
518        ]);
519        let ppl = WplEvaluator::from(&conf, None)?;
520        let result = ppl.parse_groups(0, &mut data).assert();
521        assert_eq!(data, "");
522        result.items.iter().for_each(|f| println!("{}", f));
523        Ok(())
524    }
525
526    #[test]
527    fn test_ignore_cnt() -> WplCodeResult<()> {
528        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740"#;
529        let conf = WplExpress::new(vec![
530            fdc4_1("_", ",", true, 4).map_err(|e| e.into_wpl_err())?,
531        ]);
532        let ppl = WplEvaluator::from(&conf, None)?;
533        let result = ppl.parse_groups(0, &mut data).assert();
534        assert_eq!(data, "");
535        assert_eq!(result.items.len(), 4);
536        result.items.iter().for_each(|f| println!("{}", f));
537
538        let mut data = r#"2345,2021-7-15 7:50:32,9OPP-MU-JME2-YGUW,chars_740"#;
539        let conf = WplExpress::new(vec![
540            fdc4_1("_", ",", true, 3).map_err(|e| e.into_wpl_err())?,
541        ]);
542        let ppl = WplEvaluator::from(&conf, None)?;
543        let result = ppl.parse_groups(0, &mut data).assert();
544        assert_eq!(data, "chars_740");
545        assert_eq!(result.items.len(), 3);
546        Ok(())
547    }
548
549    #[test]
550    fn test_pipe_unit_direct_lookup() -> WplCodeResult<()> {
551        use crate::eval::builtins::raw_to_utf8_string;
552        use crate::{create_preorder_pipe_unit, list_preorder_pipe_units};
553
554        // Define MockStage for this test
555        #[derive(Debug)]
556        struct TestMockStage;
557
558        impl PipeProcessor for TestMockStage {
559            fn process(&self, data: RawData) -> WparseResult<RawData> {
560                let mut value = raw_to_utf8_string(&data);
561                value.push_str("-mock");
562                Ok(RawData::from_string(value))
563            }
564
565            fn name(&self) -> &'static str {
566                "test_mock_stage"
567            }
568        }
569
570        // Test direct lookup after simplification
571        // Register test processors with different naming schemes
572        register_wpl_pipe!("direct-test", || Hold::new(TestMockStage));
573        register_wpl_pipe!("plg_pipe/mock-prefix", || Hold::new(TestMockStage));
574
575        // Test that direct naming works
576        let processor = create_preorder_pipe_unit("direct-test");
577        assert!(processor.is_some(), "Should find direct-test processor");
578
579        // Test that prefixed registration also works (with full name)
580        let processor = create_preorder_pipe_unit("plg_pipe/mock-prefix");
581        assert!(
582            processor.is_some(),
583            "Should find plg_pipe/with-prefix processor"
584        );
585
586        // Test actual processing with different naming
587        let test_data = RawData::from_string("hello".to_string());
588
589        if let Some(processor) = create_preorder_pipe_unit("direct-test") {
590            let result = processor
591                .process(test_data.clone())
592                .map_err(|e| e.into_wpl_err())?;
593            assert_eq!(raw_to_utf8_string(&result), "hello-mock");
594        }
595
596        if let Some(processor) = create_preorder_pipe_unit("plg_pipe/mock-prefix") {
597            let result = processor.process(test_data).map_err(|e| e.into_wpl_err())?;
598            assert_eq!(raw_to_utf8_string(&result), "hello-mock");
599        }
600
601        // List all processors to see what's registered
602        let processors = list_preorder_pipe_units();
603        println!("All registered processors: {:?}", processors);
604
605        // Verify both naming approaches work (names are converted to uppercase)
606        assert!(processors.contains(&"DIRECT-TEST".into()));
607        assert!(processors.contains(&"PLG_PIPE/MOCK-PREFIX".into()));
608
609        Ok(())
610    }
611
612    #[test]
613    fn test_simplified_assemble_ins_logic() -> WplCodeResult<()> {
614        use crate::eval::builtins::raw_to_utf8_string;
615        use crate::{create_preorder_pipe_unit, list_preorder_pipe_units};
616
617        // Define test processor
618        #[derive(Debug)]
619        struct SimplifiedTestStage;
620
621        impl PipeProcessor for SimplifiedTestStage {
622            fn process(&self, data: RawData) -> WparseResult<RawData> {
623                let mut value = raw_to_utf8_string(&data);
624                value.push_str("-simplified");
625                Ok(RawData::from_string(value))
626            }
627
628            fn name(&self) -> &'static str {
629                "simplified_test"
630            }
631        }
632
633        // Register processors with both naming styles
634        register_wpl_pipe!("simple-test", || Hold::new(SimplifiedTestStage));
635        register_wpl_pipe!("plg_pipe/simple-prefix", || Hold::new(SimplifiedTestStage));
636
637        // Test that both can be found directly
638        let processor1 = create_preorder_pipe_unit("simple-test");
639        assert!(processor1.is_some(), "Should find simple-test");
640
641        let processor2 = create_preorder_pipe_unit("plg_pipe/simple-prefix");
642        assert!(processor2.is_some(), "Should find plg_pipe/with-prefix");
643
644        // Test that processors registered with plg_pipe/ prefix can be found without it
645        // This would fail because registration is case-sensitive and stores full name
646        let processor3 = create_preorder_pipe_unit("simple-prefix");
647        assert!(
648            processor3.is_none(),
649            "Should NOT find with-prefix (was registered as plg_pipe/with-prefix)"
650        );
651
652        // Show all registered processors
653        let processors = list_preorder_pipe_units();
654        println!("All processors: {:?}", processors);
655
656        Ok(())
657    }
658}