dm-database-parser-sqllog 0.4.0

一个高性能的达梦数据库 sqllog 日志解析库,提供零分配或低分配的记录切分与解析功能,支持实时监控
Documentation

dm-database-parser-sqllog

Crates.io Documentation License: MIT Coverage

一个高性能的达梦数据库 sqllog 日志解析库,提供零分配或低分配的记录切分与解析功能,以及实时日志监控能力。

主要特点

  • 零分配解析:基于时间戳的记录切分,使用流式 API 避免额外内存分配
  • 实时监控:支持实时监控 SQL 日志文件变化,增量解析新增内容(v0.3.0+)
  • 高效模式匹配:使用双数组 Aho-Corasick(daachorse)进行高效模式匹配
  • 轻量级结构:解析结果使用引用(&str),避免不必要的字符串复制
  • 灵活的 API:提供批量解析、流式解析、实时监控等多种使用方式
  • 详细的错误信息:所有解析错误都包含原始数据,便于调试和问题定位
  • 高性能:适合在高吞吐日志处理场景中使用

安装

Cargo.toml 中添加依赖:

[dependencies]

dm-database-parser-sqllog = "0.3"



# 如果需要实时监控功能,启用 realtime 特性

dm-database-parser-sqllog = { version = "0.3", features = ["realtime"] }

快速开始

基本用法

从字符串解析

use dm_database_parser_sqllog::{parse_records_from_string, parse_sqllogs_from_string};

let log_text = r#"2025-08-12 10:57:09.562 (EP[0] sess:123 thrd:456 user:alice trxid:789 stmt:999 appname:app) SELECT 1"#;

// 方法 1: 解析为 Record 列表(自动跳过无效行)
let records = parse_records_from_string(log_text);
println!("找到 {} 条记录", records.len());

// 方法 2: 解析为 Sqllog 列表(包含成功和失败的)
let results = parse_sqllogs_from_string(log_text);
for result in results {
    match result {
        Ok(sqllog) => {
            println!("用户: {}, 事务ID: {}, SQL: {}",
                sqllog.meta.username, sqllog.meta.trxid, sqllog.body);

            // 获取性能指标(如果有)
            if let Some(time) = sqllog.execute_time() {
                println!("执行时间: {:.2}ms", time);
            }
        }
        Err(e) => eprintln!("解析错误: {}", e),
    }
}

流式处理(回调模式)

use dm_database_parser_sqllog::for_each_sqllog_in_string;

let log_text = r#"..."#; // 大量日志文本

// 对每个 Sqllog 调用回调函数(适合大数据流式处理)
let count = for_each_sqllog_in_string(log_text, |sqllog| {
    println!("EP: {}, 用户: {}, SQL: {}",
        sqllog.meta.ep, sqllog.meta.username, sqllog.body);
}).unwrap();

println!("处理了 {} 条记录", count);

从文件读取

方式一:流式迭代(推荐用于大文件)

对于大文件(> 100MB),推荐使用迭代器模式,内存高效(批量缓冲 + 并行处理):

use dm_database_parser_sqllog::iter_records_from_file;

// 迭代处理 SQL 日志(带性能统计)
let mut total_time = 0.0;
let mut count = 0;
let mut slow_queries = 0;

for result in iter_records_from_file("large_log.sqllog")? {
    match result {
        Ok(sqllog) => {
            if let Some(time) = sqllog.execute_time() {
                total_time += time;
                count += 1;
                if time > 100.0 {
                    slow_queries += 1;
                    println!("慢查询: {:.2}ms - {}", time, sqllog.body);
                }
            }
        }
        Err(e) => eprintln!("解析错误: {}", e),
    }
}

println!("平均执行时间: {:.2}ms", total_time / count as f64);
println!("慢查询数量: {}", slow_queries);

// 使用迭代器组合器(筛选慢查询)
let slow_queries: Vec<_> = iter_records_from_file("large_log.sqllog")?
    .filter_map(Result::ok)  // 忽略解析错误
    .filter(|log| log.execute_time().map_or(false, |t| t > 100.0))
    .take(10)  // 只取前 10 条
    .collect();

println!("找到 {} 条慢查询", slow_queries.len());

方式二:批量加载(适合需要多次遍历)

使用批量 API 可以一次性加载所有 SQL 日志,内部自动使用并行处理:

use dm_database_parser_sqllog::parse_records_from_file;

// 一次性加载所有 SQL 日志(自动并行处理)
let (sqllogs, errors) = parse_records_from_file("log.sqllog")?;
println!("成功解析 {} 条 SQL 日志,遇到 {} 个错误", sqllogs.len(), errors.len());````

方式二:批量加载(适合需要一次性获取所有结果)

使用批量 API 可以一次性加载所有 SQL 日志,内部自动使用并行处理:

use dm_database_parser_sqllog::parse_records_from_file;

// 一次性加载所有 SQL 日志(自动并行处理)
let (sqllogs, errors) = parse_records_from_file("log.sqllog")?;
println!("成功解析 {} 条 SQL 日志,遇到 {} 个错误", sqllogs.len(), errors.len());

// 处理解析好的 SQL 日志
for sqllog in sqllogs {
    println!("用户: {}, SQL: {}", sqllog.meta.username, sqllog.body);
    if let Some(exec_time) = sqllog.execute_time() {
        println!("执行时间: {:.2}ms", exec_time);
    }
}

// 处理解析错误
for error in errors {
    eprintln!("解析错误: {}", error);
}

错误处理和调试

所有解析错误都包含详细的原始数据,便于调试和定位问题:

use dm_database_parser_sqllog::{iter_records_from_file, ParseError};

for result in iter_records_from_file("log.sqllog")? {
    match result {
        Ok(sqllog) => {
            // 处理成功的记录
        }
        Err(e) => {
            // 错误信息包含原始数据
            match e {
                ParseError::InvalidRecordStartLine { raw } => {
                    eprintln!("无效的记录起始行: {}", raw);
                }
                ParseError::LineTooShort { length, raw } => {
                    eprintln!("行太短 (长度: {}): {}", length, raw);
                }
                ParseError::InsufficientMetaFields { count, raw } => {
                    eprintln!("字段不足 (只有 {} 个): {}", count, raw);
                }
                ParseError::InvalidEpFormat { value, raw } => {
                    eprintln!("EP 格式错误 '{}' 在: {}", value, raw);
                }
                ParseError::FileNotFound { path } => {
                    eprintln!("文件未找到: {}", path);
                }
                _ => eprintln!("其他错误: {}", e),
            }
        }
    }
}

所有错误类型的 Display 实现都遵循格式:错误描述 | raw: 原始数据,例如:

invalid EP format: EPX0] | raw: EPX0] sess:123 thrd:456 user:alice trxid:0 stmt:999 appname:app

这使得在生产环境中快速定位问题变得更加容易。

实时监控日志文件(v0.3.0+)

使用 realtime 特性可以实时监控 SQL 日志文件的变化:

use dm_database_parser_sqllog::realtime::RealtimeSqllogParser;
use std::time::Duration;

// 从文件末尾开始监控新增日志
let parser = RealtimeSqllogParser::new("sqllog.txt")?;
parser.watch(|sqllog| {
    println!("[{}] 用户: {}, SQL: {}",
        sqllog.ts, sqllog.meta.username, sqllog.body);

    // 检测慢查询
    if let Some(time) = sqllog.execute_time() {
        if time > 1000.0 {
            eprintln!("⚠️  慢查询告警: {:.2}ms", time);
        }
    }
})?;

// 监控指定时长后停止
let parser = RealtimeSqllogParser::new("sqllog.txt")?;
parser.watch_for(Duration::from_secs(60), |sqllog| {
    // 处理日志...
})?;

// 从文件开头开始解析
let parser = RealtimeSqllogParser::new("sqllog.txt")?
    .from_beginning()?;
parser.watch(|sqllog| {
    // 处理所有历史日志...
})?;

详细文档请查看:REALTIME_FEATURE.md

API 对比

API 返回类型 内存占用 性能 适用场景
iter_records_from_file() SqllogIterator<BufReader<File>> 低(批量缓冲) 2.7秒 流式处理、需要提前中断
parse_records_from_file() (Vec<Sqllog>, Vec<ParseError>) 高(一次性) 2.5秒 批量处理、需要多次遍历

选择建议

  • 迭代器模式 (iter_*):一次只处理一条记录,支持 GB 级大文件,可使用 .filter(), .take() 等组合器
  • 一次性加载 (parse_*):简单直接,适合需要多次遍历或随机访问的场景

更多示例

查看 examples/ 目录获取更多使用示例:

  • parse_example.rs - 基本解析示例
  • iterator_mode.rs - 迭代器模式示例(推荐用于大文件)
  • parse_from_file.rs - 从文件读取和解析
  • stream_processing.rs - 流式处理示例
  • using_parsers.rs - 直接使用 RecordParser 和 SqllogParser
  • error_messages.rs - 错误处理示例
  • parse_records.rs - Record 解析示例
  • performance_demo.rs - 性能演示

运行示例:

cargo run --example parse_example

cargo run --example iterator_mode

cargo run --example parse_from_file

cargo run --example stream_processing

API 文档

完整的 API 文档请查看 docs.rs

主要类型

  • [Sqllog] - 解析后的 SQL 日志结构体(包含时间戳、元数据、SQL 文本、性能指标等)
  • [Record] - 原始日志记录结构(包含起始行和总行数)
  • [ParseError] - 解析错误类型

核心解析器

  • [RecordParser] - 记录解析迭代器,将日志文本按时间戳切分为记录
  • [SqllogParser] - SQL 日志解析迭代器,将记录解析为 Sqllog 结构体

字符串解析 API

文件解析 API(推荐)

  • [iter_records_from_file] - 从文件流式读取 SQL 日志,返回 SqllogIterator(内存高效,批量缓冲 + 并行处理)
  • [parse_records_from_file] - 从文件批量加载 SQL 日志,返回 (Vec<Sqllog>, Vec<ParseError>)(自动并行处理)

核心类型

  • [Sqllog] - SQL 日志结构体(包含时间戳、元数据、SQL 正文等)
  • [ParseError] - 解析错误类型(包含详细错误信息)
  • [Record] - 原始记录结构(内部使用,一般不需要直接操作)

设计与注意事项

  • 所有 API 都直接返回解析好的 Sqllog,无需手动调用解析方法
  • 自动使用批量缓冲 + 并行处理优化性能
  • 适合处理大型日志文件(1GB 文件约 2.5 秒)
  • 流式 API 内存占用低,适合超大文件或需要提前中断的场景

构建与测试

# 构建

cargo build


# 运行测试

cargo test


# 运行所有 benchmark

cargo bench


# 运行特定 benchmark

cargo bench --bench api_bench

cargo bench --bench parse_functions_bench

cargo bench --bench record_bench

cargo bench --bench record_parser_bench

cargo bench --bench tools_bench


# 查看性能报告

# HTML 可视化报告: target/criterion/report/index.html

# Benchmark 文档: BENCHMARKS.md


# 运行示例

cargo run --example basic


# 生成文档

cargo doc --open

性能

🚀 性能亮点

经过深度优化,本库在处理大型日志文件时表现出色:

1GB 日志文件(301 万条记录)处理性能:

处理模式 耗时 速度 说明
记录识别 ~1.6秒 188万条/秒 仅识别和切分记录
完整解析 ~4.5秒 67万条/秒 识别 + 完整解析(包含 meta、body、indicators)
原始版本(未优化) ~8秒 38万条/秒 优化前性能

性能提升:

  • ✅ 记录识别速度提升 5倍(8秒 → 1.6秒,降低 80%)
  • ✅ 完整解析速度提升 44%(8秒 → 4.5秒)
  • ✅ 远超性能目标(3-4秒),达到生产级性能要求

性能优化技术

  1. 字节级操作(贡献 40-50%)

    • 使用 &[u8] 直接内存比较代替字符串操作
    • 避免 UTF-8 验证和字符边界检查
    • 支持 CPU SIMD 向量化优化
  2. 零拷贝内存管理(贡献 20-30%)

    • std::mem::take 移动所有权避免克隆
    • 原地修改代替创建新字符串
    • 从 900万次内存分配减少到 <100次
  3. 早期退出策略(贡献 10-15%)

    • 分层验证:长度 → 时间戳 → 括号 → 字段
    • 70% 的续行在前 2 步就被过滤
    • CPU 缓存友好的热点代码
  4. 函数内联优化(贡献 5-10%)

    • 关键路径使用 #[inline(always)]
    • 消除 301万次函数调用开销
    • 支持编译器跨函数优化
  5. 一次扫描解析(贡献 10-15%)

    • 手工解析比正则表达式快 10-20倍
    • 零回溯、零临时分配
    • CPU 缓存局部性好

详细性能分析请查看:PERFORMANCE_ANALYSIS.md

API 性能对比

对外公开的主要 API 性能对比(使用真实日志文件测试):

API 1GB文件耗时 580MB文件耗时 适用场景
iter_records_from_file ~2.7秒 ~200ms 流式处理,内存高效
parse_records_from_file ~2.5秒 ~185ms 批量处理,性能最佳

性能特性

  • 两个 API 都直接返回 Sqllog,无需手动调用解析
  • 内部自动使用批量并行处理(每批 10,000 条记录)
  • 1GB 文件包含约 302 万条记录,解析速度达 112 万条/秒
  • 批量 API 略快于流式 API(~8% 优势)

选择建议

  • 优先使用 parse_records_from_file:代码简洁,性能最佳
  • 大文件或需要提前中断时使用 iter_records_from_file:内存友好

详细的 API 性能测试报告请查看:docs/BENCHMARK_API.md

测试

本项目包含了全面的测试套件:

  • 107 个测试用例: 78 个集成测试 + 29 个单元测试
  • 50+ 个基准场景: 使用 Criterion.rs 进行性能基准测试
  • 100% 通过率: 所有测试当前状态均为通过
  • 94.69% 代码覆盖率: 行覆盖率,函数覆盖率达 98.80%

运行测试

# 运行所有测试

cargo test


# 运行特定测试文件

cargo test --test api

cargo test --test parse_functions


# 运行所有 benchmark

cargo bench


# 运行特定 benchmark

cargo bench --bench api_bench           # API 性能测试

cargo bench --bench parse_functions_bench  # 解析函数性能测试

cargo bench --bench record_bench         # Record 结构性能测试

cargo bench --bench record_parser_bench  # RecordParser 性能测试

cargo bench --bench tools_bench          # 工具函数性能测试


# 生成覆盖率报告

cargo llvm-cov --html --ignore-filename-regex='target|tests'

# 报告位于: target/llvm-cov/html/index.html

测试结构

集成测试 (tests/ 目录):

  • api.rs - API 函数测试 (14 tests)
  • record.rs - Record 结构测试 (9 tests)
  • record_parser.rs - RecordParser 迭代器测试 (9 tests)
  • parse_functions.rs - 核心解析函数测试 (46 tests)

单元测试 (源码中):

  • sqllog.rs - Sqllog 结构体测试 (8 tests)
  • tools.rs - 工具函数测试 (21 tests)

Benchmark 测试 (benches/ 目录):

  • api_bench.rs - API 函数性能测试
  • parse_functions_bench.rs - 解析函数性能测试 (8 组测试)
  • record_bench.rs - Record 结构性能测试 (6 组测试)
  • record_parser_bench.rs - RecordParser 性能测试 (6 组测试)
  • tools_bench.rs - 工具函数性能测试 (7 组测试)

详细说明:

测试覆盖率

当前覆盖率: 94.69%

模块 行覆盖率 函数覆盖率
parser/api.rs 89.66% 100.00%
parser/parse_functions.rs 90.71% 95.65%
parser/record.rs 100.00% 100.00%
parser/record_parser.rs 96.72% 100.00%
sqllog.rs 100.00% 100.00%
tools.rs 96.07% 100.00%

覆盖功能:

  • ✅ 所有解析函数(parse_record, parse_meta, parse_indicators)
  • ✅ 所有错误路径和边界情况
  • ✅ 流式和批量 API
  • ✅ 多行记录处理
  • ✅ Windows/Unix 换行符兼容
  • ✅ 并行处理正确性
  • ✅ 大数据集处理(1GB+ 文件)

许可证

MIT License - 详见 LICENSE 文件

相关链接