dm-database-sqllog2db 1.16.0

高性能 CLI 工具:流式解析达梦数据库 SQL 日志并导出到 CSV 或 SQLite
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
use super::collector;
use super::orchestrator::handle_run;
use crate::config::Config;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;

#[test]
fn test_include_performance_metrics_false_csv_excludes_pm_columns() {
    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("t.log");
    std::fs::write(
        &log_path,
        "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
    )
    .unwrap();
    let csv_path = dir.path().join("out.csv");
    let error_log = dir.path().join("errors.log");
    let app_log = dir.path().join("app.log");

    let toml = format!(
        "[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\ninclude_performance_metrics = false\n",
        logdir = dir.path().to_string_lossy().replace('\\', "/"),
        errlog = error_log.to_string_lossy().replace('\\', "/"),
        applog = app_log.to_string_lossy().replace('\\', "/"),
        csv = csv_path.to_string_lossy().replace('\\', "/"),
    );
    let cfg: Config = toml::from_str(&toml).unwrap();

    handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None).unwrap();

    let content = std::fs::read_to_string(&csv_path).unwrap();
    let header = content.lines().next().unwrap();
    assert!(
        !header.contains("exec_time_ms"),
        "header should skip exec_time_ms: {header}"
    );
    assert!(
        !header.contains("row_count"),
        "header should skip row_count: {header}"
    );
    assert!(
        !header.contains("exec_id"),
        "header should skip exec_id: {header}"
    );
    assert!(header.contains("sql"), "sql column should remain: {header}");
}

#[test]
fn test_handle_run_default_config_succeeds() {
    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("t.log");
    std::fs::write(
        &log_path,
        "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
    )
    .unwrap();
    let csv_path = dir.path().join("out.csv");
    let error_log = dir.path().join("errors.log");
    let app_log = dir.path().join("app.log");

    let toml = format!(
        "[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
        logdir = dir.path().to_string_lossy().replace('\\', "/"),
        errlog = error_log.to_string_lossy().replace('\\', "/"),
        applog = app_log.to_string_lossy().replace('\\', "/"),
        csv = csv_path.to_string_lossy().replace('\\', "/"),
    );
    let cfg: Config = toml::from_str(&toml).unwrap();

    let result = handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None);
    assert!(result.is_ok(), "handle_run 应在默认配置时成功: {result:?}");
}

#[test]
fn test_filter_path() {
    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("t.log");
    std::fs::write(
        &log_path,
        "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
    )
    .unwrap();
    let csv_path = dir.path().join("out.csv");
    let error_log = dir.path().join("errors.log");
    let app_log = dir.path().join("app.log");

    let toml = format!(
        "[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[filter]\nenable = true\nusernames = [\"U\"]\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
        logdir = dir.path().to_string_lossy().replace('\\', "/"),
        errlog = error_log.to_string_lossy().replace('\\', "/"),
        applog = app_log.to_string_lossy().replace('\\', "/"),
        csv = csv_path.to_string_lossy().replace('\\', "/"),
    );
    let cfg: Config = toml::from_str(&toml).unwrap();

    handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None).unwrap();

    let content = std::fs::read_to_string(&csv_path).unwrap();
    assert!(
        content.contains("SELECT 1"),
        "filtered record should appear in output: {content}"
    );
}

#[test]
fn test_parallel_merge_consistent() {
    let dir = tempfile::TempDir::new().unwrap();
    let log_line = "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT id FROM orders WHERE user_id = 42. EXECTIME: 5(ms) ROWCOUNT: 3(rows) EXEC_ID: 1.\n";
    let error_log = dir.path().join("errors.log");
    let app_log = dir.path().join("app.log");

    let make_cfg_dir = |logdir: &std::path::Path, csv_file: &str| {
        let toml = format!(
            "[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
            logdir = logdir.to_string_lossy().replace('\\', "/"),
            errlog = error_log.to_string_lossy().replace('\\', "/"),
            applog = app_log.to_string_lossy().replace('\\', "/"),
            csv = csv_file,
        );
        toml::from_str::<Config>(&toml).unwrap()
    };

    // Sequential: single file in its own directory → log_files.len() == 1 → parallel never
    // triggered regardless of available_parallelism(). This is the pattern used in
    // test_sqlite_parallel_matches_sequential.
    let seq_dir = dir.path().join("seq");
    std::fs::create_dir(&seq_dir).unwrap();
    std::fs::write(seq_dir.join("only.log"), log_line).unwrap();
    let csv_seq = dir
        .path()
        .join("out_seq.csv")
        .to_string_lossy()
        .replace('\\', "/");
    let cfg_seq = make_cfg_dir(&seq_dir, &csv_seq);
    let result_seq = handle_run(
        &cfg_seq,
        true,
        false,
        &Arc::new(AtomicBool::new(false)),
        None,
    );
    assert!(result_seq.is_ok(), "顺序路径应成功: {result_seq:?}");

    // Parallel: two files trigger multi-file parallel path on modern multi-core machines
    let par_dir = dir.path().join("par");
    std::fs::create_dir(&par_dir).unwrap();
    for name in ["a.log", "b.log"] {
        std::fs::write(par_dir.join(name), log_line).unwrap();
    }
    let csv_par = dir
        .path()
        .join("out_par.csv")
        .to_string_lossy()
        .replace('\\', "/");
    let cfg_par = make_cfg_dir(&par_dir, &csv_par);
    let result_par = handle_run(
        &cfg_par,
        true,
        false,
        &Arc::new(AtomicBool::new(false)),
        Some(2),
    );
    assert!(result_par.is_ok(), "并行路径应成功: {result_par:?}");

    // Sequential has 1 file (1 data row + 1 header), parallel has 2 files (2 data rows + 1 header)
    let seq_lines = std::fs::read_to_string(&csv_seq).unwrap().lines().count();
    let par_lines = std::fs::read_to_string(&csv_par).unwrap().lines().count();
    assert_eq!(
        par_lines,
        seq_lines + 1,
        "并行路径(2 个文件)应比顺序路径(1 个文件)多 1 条数据行"
    );
}

#[test]
fn test_sqlite_parallel_matches_sequential() {
    let dir = tempfile::TempDir::new().unwrap();
    let error_log = dir.path().join("errors.log");
    let app_log = dir.path().join("app.log");

    // Each file: plain INS + PARAMS(same sess/stmt) + parametrized INS using those PARAMS.
    // PARAMS line intentionally omits trailing '.' so parse_params succeeds (no EXEC_ID/ROWCOUNT).
    let log_a = "\
2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [INS] INSERT INTO t VALUES (1). EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 100.\n\
2025-01-15 10:30:28.010 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x2 appname:A ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'alice')}\n\
2025-01-15 10:30:28.011 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x2 appname:A ip:10.0.0.1) [INS] INSERT INTO t(name) VALUES (?). EXECTIME: 2(ms) ROWCOUNT: 1(rows) EXEC_ID: 101.\n";
    let log_b = "\
2025-01-15 10:30:28.001 (EP[0] sess:0x0002 user:U trxid:2 stmt:0x3 appname:A ip:10.0.0.1) [INS] INSERT INTO t VALUES (2). EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 200.\n\
2025-01-15 10:30:28.010 (EP[0] sess:0x0002 user:U trxid:2 stmt:0x4 appname:A ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'bob')}\n\
2025-01-15 10:30:28.011 (EP[0] sess:0x0002 user:U trxid:2 stmt:0x4 appname:A ip:10.0.0.1) [INS] INSERT INTO t(name) VALUES (?). EXECTIME: 2(ms) ROWCOUNT: 1(rows) EXEC_ID: 201.\n";
    let log_c = "\
2025-01-15 10:30:28.001 (EP[0] sess:0x0003 user:U trxid:3 stmt:0x5 appname:A ip:10.0.0.1) [INS] INSERT INTO t VALUES (3). EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 300.\n\
2025-01-15 10:30:28.010 (EP[0] sess:0x0003 user:U trxid:3 stmt:0x6 appname:A ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'carol')}\n\
2025-01-15 10:30:28.011 (EP[0] sess:0x0003 user:U trxid:3 stmt:0x6 appname:A ip:10.0.0.1) [INS] INSERT INTO t(name) VALUES (?). EXECTIME: 2(ms) ROWCOUNT: 1(rows) EXEC_ID: 301.\n";

    let par_dir = dir.path().join("par");
    std::fs::create_dir(&par_dir).unwrap();
    std::fs::write(par_dir.join("a.log"), log_a).unwrap();
    std::fs::write(par_dir.join("b.log"), log_b).unwrap();
    std::fs::write(par_dir.join("c.log"), log_c).unwrap();

    let seq_dir = dir.path().join("seq");
    std::fs::create_dir(&seq_dir).unwrap();
    std::fs::write(seq_dir.join("all.log"), format!("{log_a}{log_b}{log_c}")).unwrap();

    let make_cfg = |logdir: &std::path::Path, db_path: &str| {
        let toml = format!(
            "[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.sqlite]\ndatabase_url = \"{db}\"\ntable_name = \"sqllog\"\noverwrite = true\nappend = false\nbatch_size = 1000\n",
            logdir = logdir.to_string_lossy().replace('\\', "/"),
            errlog = error_log.to_string_lossy().replace('\\', "/"),
            applog = app_log.to_string_lossy().replace('\\', "/"),
            db = db_path,
        );
        toml::from_str::<Config>(&toml).unwrap()
    };

    let seq_db = dir
        .path()
        .join("seq.db")
        .to_string_lossy()
        .replace('\\', "/");
    let par_db = dir
        .path()
        .join("par.db")
        .to_string_lossy()
        .replace('\\', "/");

    handle_run(
        &make_cfg(&seq_dir, &seq_db),
        true,
        false,
        &Arc::new(AtomicBool::new(false)),
        None,
    )
    .unwrap();
    handle_run(
        &make_cfg(&par_dir, &par_db),
        true,
        false,
        &Arc::new(AtomicBool::new(false)),
        None,
    )
    .unwrap();

    let read_rows = |path: &str| {
        let conn = rusqlite::Connection::open(path).unwrap();
        let mut stmt = conn
            .prepare("SELECT exec_id, sql, normalized_sql FROM sqllog ORDER BY exec_id, sql")
            .unwrap();
        stmt.query_map([], |row| {
            Ok((
                row.get::<_, Option<i64>>(0)?,
                row.get::<_, String>(1)?,
                row.get::<_, Option<String>>(2)?,
            ))
        })
        .unwrap()
        .collect::<rusqlite::Result<Vec<_>>>()
        .unwrap()
    };

    let seq_rows = read_rows(&seq_db);
    let par_rows = read_rows(&par_db);

    assert_eq!(
        seq_rows, par_rows,
        "并行 SQLite 输出与顺序模式记录集合应一致"
    );
    assert!(std::fs::metadata(&par_db).unwrap().len() > 0);
}

// ── Gap 1: normalize_and_export with passes=false + do_normalize=true ──────────
//
// 行为要求:当 passes=false 且 do_normalize=true 且 record.tag.is_none() 时,
// 函数应更新 params_buffer(不导出),返回 ExportAction::Continue,
// records_in_file 保持为 0。
#[test]
fn test_normalize_and_export_filtered_params_updates_buffer() {
    use super::processor::{ExportAction, normalize_and_export};
    use crate::error::ErrorStats;
    use crate::exporter::CsvExporter;
    use crate::exporter::ExporterManager;
    use crate::pipeline::normalizer::ParamBuffer;
    use dm_database_parser_sqllog::Sqllog;

    let dir = tempfile::TempDir::new().unwrap();
    let csv_path = dir.path().join("out.csv");

    let exporter = CsvExporter::new(&csv_path);
    let mut manager = ExporterManager::from_csv(exporter);
    manager.initialize().unwrap();

    // PARAMS 记录:tag=None,sql 包含 PARAMS(…) 语法
    let record = Sqllog {
        ts: "2024-01-01 00:00:00.000".to_string(),
        tag: None,
        ep: 0,
        sess_id: "sess_gap1".to_string(),
        thrd_id: "t1".to_string(),
        username: "usr".to_string(),
        trxid: "tx1".to_string(),
        statement: "stmt_gap1".to_string(),
        appname: "app".to_string(),
        client_ip: "127.0.0.1".to_string(),
        sql: "PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'hello')}".to_string(),
        exectime: 0.0,
        rowcount: 0,
        exec_id: 0,
    };

    let mut params_buffer: ParamBuffer = ParamBuffer::new();
    let mut ns_scratch: Vec<u8> = Vec::new();
    let mut records_in_file: usize = 0;
    let mut file_stats = ErrorStats::default();

    let action = normalize_and_export(
        &record,
        &mut manager,
        true, // include_pm
        true, // do_normalize: PARAMS buf 应被更新
        &mut params_buffer,
        None, // placeholder_override
        &mut ns_scratch,
        None, // remaining (no quota)
        &mut records_in_file,
        &mut file_stats,
        "test_file.log",
        false, // passes=false → 不导出
    );

    // 必须返回 Continue(不是 BreakQuota 或 BreakFatal)
    assert!(
        matches!(action, ExportAction::Continue),
        "passes=false 路径应返回 Continue"
    );
    // 不应有任何记录被导出
    assert_eq!(
        records_in_file, 0,
        "passes=false 时 records_in_file 应保持为 0,实际为 {records_in_file}"
    );
    // params_buffer 应已被更新(PARAMS 记录已解析入缓冲区)
    let buf_key = ("sess_gap1".to_string(), "stmt_gap1".to_string());
    assert!(
        params_buffer.contains_key(&buf_key),
        "passes=false+do_normalize=true 下 PARAMS 记录应写入 params_buffer,\
         但 key ({:?}) 不存在; buffer keys={:?}",
        buf_key,
        params_buffer.keys().collect::<Vec<_>>()
    );
}

// ── Gap 2: normalize_and_export BreakQuota path ─────────────────────────────
//
// 行为要求:当 passes=true 且 remaining=Some(0)(配额耗尽)时,
// 函数应返回 ExportAction::BreakQuota,records_in_file 保持为 0(不导出)。
#[test]
fn test_normalize_and_export_quota_hit_returns_break_quota() {
    use super::processor::{ExportAction, normalize_and_export};
    use crate::error::ErrorStats;
    use crate::exporter::CsvExporter;
    use crate::exporter::ExporterManager;
    use crate::pipeline::normalizer::ParamBuffer;
    use dm_database_parser_sqllog::Sqllog;

    let dir = tempfile::TempDir::new().unwrap();
    let csv_path = dir.path().join("out.csv");

    let mut manager = ExporterManager::from_csv(CsvExporter::new(&csv_path));
    manager.initialize().unwrap();

    // 普通 SEL 记录,passes=true
    let record = Sqllog {
        ts: "2024-01-01 00:00:00.000".to_string(),
        tag: Some("SEL".to_string()),
        ep: 0,
        sess_id: "sess_gap2".to_string(),
        thrd_id: "t2".to_string(),
        username: "usr".to_string(),
        trxid: "tx2".to_string(),
        statement: "stmt_gap2".to_string(),
        appname: "app".to_string(),
        client_ip: "127.0.0.1".to_string(),
        sql: "SELECT 1".to_string(),
        exectime: 1.0,
        rowcount: 1,
        exec_id: 42,
    };

    let mut params_buffer: ParamBuffer = ParamBuffer::new();
    let mut ns_scratch: Vec<u8> = Vec::new();
    let mut records_in_file: usize = 0;
    let mut file_stats = ErrorStats::default();

    // remaining=Some(0) 表示配额已耗尽(records_in_file=0 >= remaining=0)
    let action = normalize_and_export(
        &record,
        &mut manager,
        true,  // include_pm
        false, // do_normalize
        &mut params_buffer,
        None,
        &mut ns_scratch,
        Some(0), // remaining=0 → 配额已耗尽
        &mut records_in_file,
        &mut file_stats,
        "test_file.log",
        true, // passes=true → 否则直接 Continue
    );

    // 必须返回 BreakQuota
    assert!(
        matches!(action, ExportAction::BreakQuota),
        "remaining=Some(0) 且 passes=true 应返回 BreakQuota,\
         但得到了 Continue 或 BreakFatal"
    );
    // 不应有任何记录被导出
    assert_eq!(
        records_in_file, 0,
        "BreakQuota 路径下 records_in_file 应保持为 0,实际为 {records_in_file}"
    );
}

// ── PROG-01/02: 进度条模板单元测试 ────────────────────────────────────────────

/// 验证 `make_progress_bar(true, 3)` 返回 `Some(pb)`,
/// `pb.length() == Some(3)`,`pb.position() == 0`,模板设置不 panic。
#[test]
fn test_progress_bar_template() {
    let pb = super::input::make_progress_bar(true, 3);
    assert!(pb.is_some(), "show_progress=true 应返回 Some(ProgressBar)");
    let pb = pb.unwrap();
    assert_eq!(
        pb.length(),
        Some(3),
        "length() 应为 Some(3),实际为 {:?}",
        pb.length()
    );
    assert_eq!(
        pb.position(),
        0,
        "初始 position() 应为 0,实际为 {}",
        pb.position()
    );
    // 确认模板已设置(调用 set_message 不应 panic)
    pb.set_message("test message");
    pb.finish_and_clear();
}

/// 验证 `make_progress_bar(false, 3)` 返回 `None`。
#[test]
fn test_progress_bar_disabled() {
    let pb = super::input::make_progress_bar(false, 3);
    assert!(
        pb.is_none(),
        "show_progress=false 应返回 None,实际返回了 Some"
    );
}

// ── PROG-03/DIAG-03: error log 写出 + hint + 摘要扩展 ───────────────────────

/// 验证 `handle_run` 在有解析错误时写出 error log 文件。
/// 无效行放文件前面(独立记录),解析器以 `\n20` 时间戳为记录边界,
/// 前置无效行无时间戳前缀会独立返回 `InvalidFormat`,从而触发 `parse_error_records`。
#[test]
fn test_error_log_written() {
    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("t.log");
    // 无效行放前面(独立记录)+ 合法 SEL 放后面
    std::fs::write(
        &log_path,
        "garbage line that cannot be parsed\n2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:U trxid:1 stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT 1. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n",
    )
    .unwrap();
    let csv_path = dir.path().join("out.csv");
    let error_log = dir.path().join("errors.log");
    let app_log = dir.path().join("app.log");

    let toml = format!(
        "[sqllog]\ninputs = [\"{logdir}\"]\n[error]\nfile = \"{errlog}\"\n[logging]\nfile = \"{applog}\"\nlevel = \"warn\"\nretention_days = 1\n[exporter.csv]\nfile = \"{csv}\"\noverwrite = true\nappend = false\n",
        logdir = dir.path().to_string_lossy().replace('\\', "/"),
        errlog = error_log.to_string_lossy().replace('\\', "/"),
        applog = app_log.to_string_lossy().replace('\\', "/"),
        csv = csv_path.to_string_lossy().replace('\\', "/"),
    );
    let cfg: Config = toml::from_str(&toml).unwrap();

    handle_run(&cfg, true, false, &Arc::new(AtomicBool::new(false)), None).unwrap();

    assert!(
        error_log.exists(),
        "error log 文件应在有解析错误时被写出,但未找到: {}",
        error_log.display()
    );
    let content = std::fs::read_to_string(&error_log).unwrap();
    assert!(
        content.contains("[ERROR] line "),
        "error log 应含有 '[ERROR] line ',实际内容: {content}"
    );
    assert!(
        content.contains("reason:"),
        "error log 应含有 'reason:',实际内容: {content}"
    );
}

/// 验证 `print_run_summary` 接受含 `EncodingError` 的 `ErrorStats` 时不 panic。
/// hint 输出走 stderr,集成层手动验证;本测试只保证编译 + 不 panic(防回归)。
#[test]
fn test_hint_output() {
    use crate::error::{ErrorKind, ErrorStats};

    let stats = ErrorStats {
        total_errors: 3,
        parse_errors: 3,
        by_type: {
            let mut m = std::collections::HashMap::new();
            m.insert(ErrorKind::EncodingError, 3u64);
            m
        },
        ..Default::default()
    };
    // 验证 ErrorStats 字段正确构造(hint 行为由手动验证支撑)
    assert_eq!(
        stats
            .by_type
            .get(&ErrorKind::EncodingError)
            .copied()
            .unwrap_or(0),
        3,
        "by_type[EncodingError] 应为 3"
    );
    // 调用 print_run_summary 确认不 panic
    super::summary::print_run_summary(false, false, false, 0.1, &[], 0, 0, &stats);
}

// WATCH-08: run 路径仍为覆盖写(append_error_log=false 默认值防回归)
/// 验证 `append_error_log=false` 时 `write_error_log` 以截断模式打开文件,旧内容被覆盖。
#[test]
fn test_write_error_log_run_still_truncates() {
    use crate::config::ErrorLogConfig;
    use crate::error::{ErrorKind, ErrorStats, ParseErrorRecord};

    let tmp_file = tempfile::NamedTempFile::new().expect("failed to create tempfile");
    let tmp_path = tmp_file.path().to_string_lossy().into_owned();

    // 预置旧内容
    std::fs::write(&tmp_path, b"OLD CONTENT\n").expect("failed to write old content");

    let cfg = Config {
        error: Some(ErrorLogConfig {
            file: tmp_path.clone(),
        }),
        append_error_log: false, // run 路径:覆盖写
        ..Config::default()
    };

    let stats = ErrorStats {
        parse_errors: 1,
        total_errors: 1,
        parse_error_records: vec![ParseErrorRecord {
            line_number: 1,
            raw_truncated: "bad line".to_string(),
            kind: ErrorKind::ParseFailed,
        }],
        ..ErrorStats::default()
    };

    super::error_log::write_error_log(&cfg, &stats);

    let content = std::fs::read_to_string(&tmp_path).expect("failed to read error log");
    assert!(
        !content.contains("OLD CONTENT"),
        "append_error_log=false 时旧内容应被截断,实际内容: {content}"
    );
    assert!(
        content.contains("[ERROR] line "),
        "error log 应含有新写入的 [ERROR] 行,实际内容: {content}"
    );
}

// WATCH-08: watch 路径为追加写(append_error_log=true),旧内容应被保留
/// 验证 `append_error_log=true` 时 `write_error_log` 以追加模式打开文件,旧内容被保留。
#[test]
fn test_write_error_log_watch_appends() {
    use crate::config::ErrorLogConfig;
    use crate::error::{ErrorKind, ErrorStats, ParseErrorRecord};

    let tmp = tempfile::NamedTempFile::new().expect("failed to create tempfile");
    let path = tmp.path().to_string_lossy().into_owned();
    std::fs::write(&path, b"EXISTING\n").expect("failed to write existing content");

    let cfg = Config {
        error: Some(ErrorLogConfig { file: path.clone() }),
        append_error_log: true, // watch 路径:追加写
        ..Config::default()
    };

    let stats = ErrorStats {
        parse_errors: 1,
        total_errors: 1,
        parse_error_records: vec![ParseErrorRecord {
            line_number: 1,
            raw_truncated: "bad".to_string(),
            kind: ErrorKind::ParseFailed,
        }],
        ..ErrorStats::default()
    };

    super::error_log::write_error_log(&cfg, &stats);

    let content = std::fs::read_to_string(&path).expect("failed to read error log");
    assert!(
        content.contains("EXISTING"),
        "append_error_log=true 时旧内容应被保留(追加模式),实际内容: {content}"
    );
    assert!(
        content.contains("[ERROR] line "),
        "新错误行应追加到文件末尾,实际内容: {content}"
    );
}

/// 验证含 `filtered_out` 的 `ErrorStats` 传入 `print_run_summary` 时不 panic(防回归)。
#[test]
fn test_run_summary() {
    use crate::error::{ErrorKind, ErrorStats};

    let stats = ErrorStats {
        total_errors: 2,
        parse_errors: 2,
        filtered_out: 5,
        by_type: {
            let mut m = std::collections::HashMap::new();
            m.insert(ErrorKind::EncodingError, 2u64);
            m
        },
        ..Default::default()
    };
    // 调用 print_run_summary 确认不 panic
    super::summary::print_run_summary(false, false, false, 1.5, &[], 10, 0, &stats);
}
// ── Group 1-4: collector.rs 全分支单元测试 ──────────────────────────────────

#[derive(Debug)]
struct AlwaysFail;
impl crate::pipeline::LogProcessor for AlwaysFail {
    fn process(&self, _: &dm_database_parser_sqllog::Sqllog) -> bool {
        false
    }
}

// Group 1 — InvalidPath 错误路径(collector.rs lines 26-34)
// 传入不存在路径应返回 Err(Error::Parser(ParserError::InvalidPath { .. }))
#[test]
fn test_collector_invalid_path_returns_error() {
    use crate::pipeline::Pipeline;
    use std::sync::Arc;
    use std::sync::atomic::AtomicBool;

    let pipeline = Pipeline::default();
    let interrupted = Arc::new(AtomicBool::new(false));
    let result = collector::collect_log_file(
        std::path::Path::new("/nonexistent/absolutely/not/there.log"),
        &pipeline,
        false,
        None,
        &interrupted,
    );
    assert!(result.is_err(), "不存在路径应返回 Err,实际: {result:?}");
    assert!(
        matches!(
            result.unwrap_err(),
            crate::error::Error::Parser(crate::error::ParserError::InvalidPath { .. })
        ),
        "应匹配 ParserError::InvalidPath"
    );
}

// Group 2 — parse error 累积循环(collector.rs lines 41-63)
// 含无效行的日志文件应累积 parse_errors 计数,rows 为空
#[test]
fn test_collector_parse_error_accumulation() {
    use crate::pipeline::Pipeline;
    use std::sync::Arc;
    use std::sync::atomic::AtomicBool;

    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("bad.log");
    std::fs::write(&log_path, "not a valid log line\nalso invalid\n").unwrap();
    let pipeline = Pipeline::default();
    let interrupted = Arc::new(AtomicBool::new(false));
    let (rows, stats) =
        collector::collect_log_file(&log_path, &pipeline, false, None, &interrupted)
            .expect("collect_log_file 应不返回 Err");
    assert!(
        rows.is_empty(),
        "全部非法行应不产生记录,实际 rows.len()={}",
        rows.len()
    );
    assert!(
        stats.parse_errors > 0,
        "应记录至少 1 条解析错误,实际 parse_errors={}",
        stats.parse_errors
    );
}

// Group 3 — !needs_processing 过滤分支(collector.rs lines 74-76)
// AlwaysFail 处理器 + DML 记录(tag.is_some())+ do_normalize=false
// 使 passes=false 且 needs_processing=false,触发 early return
#[test]
fn test_collector_not_needed_filtering() {
    use crate::pipeline::Pipeline;
    use std::sync::Arc;
    use std::sync::atomic::AtomicBool;

    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("dml.log");
    let valid_dml = "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:TESTUSER trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT id FROM t. EXECTIME: 5(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n";
    std::fs::write(&log_path, valid_dml).unwrap();

    let mut pipeline = Pipeline::new();
    pipeline.add(Box::new(AlwaysFail));
    assert!(!pipeline.is_empty(), "AlwaysFail 添加后 pipeline 应非空");

    let interrupted = Arc::new(AtomicBool::new(false));
    let (rows, _parse_errors) =
        collector::collect_log_file(&log_path, &pipeline, false, None, &interrupted)
            .expect("collect_log_file 应 Ok");
    assert!(
        rows.is_empty(),
        "AlwaysFail 过滤所有记录,rows 应为空,实际 {}",
        rows.len()
    );
}

// Group 4 — 被过滤的 PARAMS else 分支(collector.rs lines 91-100)
// AlwaysFail 处理器 + PARAMS 记录(tag.is_none())+ do_normalize=true
// 使 passes=false 但 needs_processing=true,触发 compute_normalized 更新 params_buf 但不 push 到 rows
#[test]
fn test_collector_filtered_params_normalize() {
    use crate::pipeline::Pipeline;
    use std::sync::Arc;
    use std::sync::atomic::AtomicBool;

    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("params.log");
    // PARAMS 行:tag=None,格式与 test_sqlite_parallel_matches_sequential 中已知可解析的样本一致
    let params_line = "2025-01-15 10:30:28.010 (EP[0] sess:0x0001 user:TESTUSER trxid:1 stmt:0x2 appname:App ip:10.0.0.1) PARAMS(SEQNO, TYPE, DATA)={(0, VARCHAR, 'testvalue')}\n";
    std::fs::write(&log_path, params_line).unwrap();

    let mut pipeline = Pipeline::new();
    pipeline.add(Box::new(AlwaysFail));

    let interrupted = Arc::new(AtomicBool::new(false));
    // do_normalize=true 使被过滤的 PARAMS 行仍走 compute_normalized 分支
    let (rows, _parse_errors) =
        collector::collect_log_file(&log_path, &pipeline, true, None, &interrupted)
            .expect("collect_log_file 应 Ok");
    assert!(
        rows.is_empty(),
        "AlwaysFail 过滤 PARAMS 记录,rows 应为空,实际 {}",
        rows.len()
    );
}

// interrupted=true 在第一条记录前命中 break 分支(collector.rs line 42-44)
#[test]
fn test_collector_interrupted_returns_empty() {
    use crate::pipeline::Pipeline;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};

    let dir = tempfile::TempDir::new().unwrap();
    let log_path = dir.path().join("data.log");
    let valid_dml = "2025-01-15 10:30:28.001 (EP[0] sess:0x0001 user:TESTUSER trxid:1 stmt:0x1 appname:App ip:10.0.0.1) [SEL] SELECT id FROM t. EXECTIME: 5(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.\n";
    std::fs::write(&log_path, valid_dml).unwrap();
    let pipeline = Pipeline::default();
    let interrupted = Arc::new(AtomicBool::new(true));
    interrupted.store(true, Ordering::Release);
    let (rows, _stats) =
        collector::collect_log_file(&log_path, &pipeline, false, None, &interrupted)
            .expect("collect_log_file 应 Ok");
    assert!(
        rows.is_empty(),
        "interrupted=true 应使循环立即 break,rows 应为空"
    );
}

// ── prescan: build_indicator_filters / build_sql_*_filters 单元测试 ─────────

#[test]
fn test_build_indicator_filters_min_row_count_zero() {
    use crate::pipeline::filters::IndicatorFilters;
    let indicators = IndicatorFilters {
        min_row_count: Some(0),
        ..IndicatorFilters::default()
    };
    let filters = super::prescan::build_indicator_filters(&indicators);
    assert_eq!(
        filters.len(),
        1,
        "min_row_count=0 应构建一个全匹配 Filter(FilterBuilder::new().build() 分支)"
    );
}

#[test]
fn test_build_indicator_filters_min_row_count_positive() {
    use crate::pipeline::filters::IndicatorFilters;
    let indicators = IndicatorFilters {
        min_row_count: Some(5),
        ..IndicatorFilters::default()
    };
    let filters = super::prescan::build_indicator_filters(&indicators);
    assert_eq!(
        filters.len(),
        1,
        "min_row_count=5 应构建一个带 rowcount_gt(4) 约束的 Filter"
    );
}

#[test]
fn test_build_indicator_filters_empty_returns_empty() {
    use crate::pipeline::filters::IndicatorFilters;
    let indicators = IndicatorFilters::default();
    let filters = super::prescan::build_indicator_filters(&indicators);
    assert_eq!(filters.len(), 0, "所有字段均为 None 时应返回空 Vec<Filter>");
}

#[test]
fn test_build_sql_exclude_filters_multiple_returns_correct_count() {
    use crate::pipeline::filters::SqlFilters;
    let sf = SqlFilters {
        excludes: Some(vec![
            "SELECT 1".into(),
            "DROP".into(),
            "DELETE FROM x".into(),
        ]),
        includes: None,
    };
    let filters = super::prescan::build_sql_exclude_filters(&sf);
    assert_eq!(
        filters.len(),
        3,
        "3 个 exclude 模式应构建 3 个 Filter(非空 excludes 分支)"
    );
}

#[test]
fn test_build_sql_exclude_filters_none_returns_empty() {
    use crate::pipeline::filters::SqlFilters;
    let sf = SqlFilters::default();
    let filters = super::prescan::build_sql_exclude_filters(&sf);
    assert_eq!(
        filters.len(),
        0,
        "excludes=None 应通过 unwrap_or(&[]) 返回空 Vec<Filter>"
    );
}

#[test]
fn test_build_sql_include_filters_multiple() {
    use crate::pipeline::filters::SqlFilters;
    let sf = SqlFilters {
        includes: Some(vec!["SELECT".into(), "UPDATE".into()]),
        excludes: None,
    };
    let filters = super::prescan::build_sql_include_filters(&sf);
    assert_eq!(filters.len(), 2, "2 个 include 模式应构建 2 个 Filter");
}

#[test]
fn test_build_indicator_filters_exec_ids_multiple() {
    use crate::pipeline::filters::IndicatorFilters;
    use std::collections::HashSet;
    let indicators = IndicatorFilters {
        exec_ids: Some(HashSet::from([1_i64, 2, 42])),
        ..IndicatorFilters::default()
    };
    let filters = super::prescan::build_indicator_filters(&indicators);
    assert_eq!(
        filters.len(),
        3,
        "3 个 exec_ids 应产生 3 个独立的 Filter(每个 ID 一个)"
    );
}

#[test]
fn test_min_row_count_zero_matches_all_records() {
    use crate::pipeline::FiltersFeature;
    use crate::pipeline::filters::IndicatorFilters;
    use std::fmt::Write as _;

    let dir = tempfile::TempDir::new().unwrap();
    let logfile = dir.path().join("test.log");

    let mut buf = String::new();
    for i in 0..3_usize {
        writeln!(
            buf,
            "2025-01-15 10:30:28.001 (EP[0] sess:0x{i:04x} user:U trxid:{i} stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT {i}. EXECTIME: 1(ms) ROWCOUNT: {i}(rows) EXEC_ID: {i}.",
        ).unwrap();
    }
    std::fs::write(&logfile, &buf).unwrap();

    let cfg = Config {
        filter: Some(FiltersFeature {
            enable: true,
            indicators: IndicatorFilters {
                min_row_count: Some(0),
                ..IndicatorFilters::default()
            },
            ..FiltersFeature::default()
        }),
        ..Config::default()
    };

    let matched = super::prescan::scan_log_file_for_matches(logfile.to_str().unwrap(), &cfg);
    assert_eq!(
        matched.len(),
        3,
        "min_row_count=0 应匹配所有记录(全匹配 Filter),实际匹配: {matched:?}",
    );
}

#[test]
fn test_scan_for_trxids_by_transaction_filters_dedup_across_files() {
    use crate::pipeline::FiltersFeature;
    use crate::pipeline::filters::IndicatorFilters;
    use std::fmt::Write as _;

    let dir = tempfile::TempDir::new().unwrap();

    let write_log = |filename: &str, ids: &[usize]| {
        let path = dir.path().join(filename);
        let mut buf = String::new();
        for &i in ids {
            writeln!(
                buf,
                "2025-01-15 10:30:28.001 (EP[0] sess:0x{i:04x} user:U trxid:{i} stmt:0x1 appname:A ip:10.0.0.1) [SEL] SELECT {i}. EXECTIME: 1(ms) ROWCOUNT: 1(rows) EXEC_ID: 1.",
            ).unwrap();
        }
        std::fs::write(&path, &buf).unwrap();
        path
    };

    let file1 = write_log("a.log", &[0, 1]);
    let file2 = write_log("b.log", &[1, 2]);

    let cfg = Config {
        filter: Some(FiltersFeature {
            enable: true,
            indicators: IndicatorFilters {
                min_row_count: Some(0),
                ..IndicatorFilters::default()
            },
            ..FiltersFeature::default()
        }),
        ..Config::default()
    };

    let mut matched =
        super::prescan::scan_for_trxids_by_transaction_filters(&[file1, file2], &cfg, 2).unwrap();
    matched.sort();
    assert_eq!(
        matched,
        vec!["0".to_string(), "1".to_string(), "2".to_string()],
        "跨文件应返回去重后的 trxid 列表,实际: {matched:?}"
    );
}