Skip to main content

dm_database_sqllog2db/cli/watch/
trigger_incremental.rs

1//! 增量触发:对内容追加的 `.log` 文件仅读取新增字节段执行 `handle_run`。
2
3use super::append::force_append_for_watch_trigger;
4use super::offsets;
5use super::state::WatchLoopState;
6use super::trigger_full::update_status_bar;
7use crate::config::Config;
8use indicatif::ProgressBar;
9use log::warn;
10use std::path::Path;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Instant;
14
15/// 对内容追加的 `.log` 文件执行增量处理,仅读取 `[start_offset, new_size)` 字节(per D-01/D-02)。
16pub fn trigger_incremental(
17    path: &Path,
18    cfg: &Config,
19    quiet: bool,
20    verbose: bool,
21    interrupted: &Arc<AtomicBool>,
22    state: &mut WatchLoopState,
23    pb: &ProgressBar,
24) {
25    if !path.exists() {
26        warn!(
27            "watch: triggered path no longer exists, skipping: {}",
28            path.display()
29        );
30        return;
31    }
32    let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
33    let Ok(metadata) = std::fs::metadata(path) else {
34        warn!("watch: metadata error for {}", path.display());
35        return;
36    };
37    let new_size = metadata.len();
38    let Some(start_offset) = resolve_incremental_offset(&canonical_path, new_size, state) else {
39        return; // 首次见到或无新字节——已处理基线,直接跳过
40    };
41    let Ok(tmp_file) = read_bytes_to_tempfile(path, start_offset) else {
42        warn!("watch: read_bytes_to_tempfile error for {}", path.display());
43        return;
44    };
45    run_incremental_handle_run(
46        path,
47        &canonical_path,
48        tmp_file,
49        new_size,
50        cfg,
51        quiet,
52        verbose,
53        interrupted,
54        state,
55        pb,
56    );
57}
58
59/// 返回增量处理的 `start_offset`;`None` 表示应跳过(首次见到或无新字节)。
60/// 首次见到时会将当前文件大小写入 `state.file_offsets` 作为基线(per D-04)。
61fn resolve_incremental_offset(
62    canonical_path: &Path,
63    new_size: u64,
64    state: &mut WatchLoopState,
65) -> Option<u64> {
66    let Some(&start_offset) = state.file_offsets.get(canonical_path) else {
67        // D-04: 首次 Modify 无记录 → 记录基线 = 当前大小,跳过处理
68        state
69            .file_offsets
70            .insert(canonical_path.to_path_buf(), new_size);
71        if let Some(ref database_url) = state.sqlite_db_url {
72            offsets::save_offset(database_url, canonical_path, new_size);
73        }
74        return None;
75    };
76    if new_size < start_offset {
77        // 文件缩小——日志轮转(copytruncate)。重置 offset 从头处理,避免读取错误偏移内容。
78        warn!(
79            "watch: file shrank ({} → {} bytes), resetting offset for {}",
80            start_offset,
81            new_size,
82            canonical_path.display()
83        );
84        state.file_offsets.insert(canonical_path.to_path_buf(), 0);
85        if let Some(ref database_url) = state.sqlite_db_url {
86            offsets::save_offset(database_url, canonical_path, 0);
87        }
88        return Some(0);
89    }
90
91    // D-02: 无新字节快速跳过
92    if new_size == start_offset {
93        return None;
94    }
95
96    Some(start_offset)
97}
98
99/// 从文件 `start_offset` 处读取剩余字节,写入 `NamedTempFile` 并返回(per D-01)。
100pub(super) fn read_bytes_to_tempfile(
101    path: &Path,
102    start_offset: u64,
103) -> std::io::Result<tempfile::NamedTempFile> {
104    use std::io::{Read, Seek, SeekFrom, Write};
105    let mut source_file = std::fs::File::open(path)?;
106    source_file.seek(SeekFrom::Start(start_offset))?;
107    let mut buffer = Vec::new();
108    source_file.read_to_end(&mut buffer)?;
109    let mut tmp_file = tempfile::Builder::new()
110        .prefix("sqllog2db-watch-")
111        .suffix(".log")
112        .tempfile()?;
113    tmp_file.write_all(&buffer)?;
114    tmp_file.flush()?;
115    Ok(tmp_file)
116}
117
118/// 使用临时文件路径调用 `handle_run`,更新 state,并持久化新 offset(per D-07/D-09)。
119// tmp_file 按值传入确保函数返回时 NamedTempFile 自动删除临时文件
120#[allow(clippy::needless_pass_by_value)]
121fn run_incremental_handle_run(
122    original_path: &Path,
123    canonical_path: &Path,
124    tmp_file: tempfile::NamedTempFile,
125    new_size: u64,
126    cfg: &Config,
127    quiet: bool,
128    verbose: bool,
129    interrupted: &Arc<AtomicBool>,
130    state: &mut WatchLoopState,
131    pb: &ProgressBar,
132) {
133    let tmp_cfg = build_incremental_cfg(cfg, &tmp_file);
134    match crate::cli::run::handle_run(&tmp_cfg, quiet, verbose, interrupted, None) {
135        Ok(file_stats) => {
136            state.total_stats.merge(&file_stats);
137            let last_elapsed = state.last_trigger_at.map(|t| t.elapsed());
138            state.trigger_count += 1;
139            state.last_trigger_at = Some(Instant::now());
140            // handle_run 返回后 SqliteExporter 已 drop(per Pitfall 4),安全写 offset
141            state
142                .file_offsets
143                .insert(canonical_path.to_path_buf(), new_size);
144            if let Some(ref database_url) = state.sqlite_db_url {
145                offsets::save_offset(database_url, canonical_path, new_size);
146            }
147            update_status_bar(original_path, state, pb, last_elapsed);
148        }
149        Err(crate::error::Error::Interrupted) => interrupted.store(true, Ordering::Release),
150        Err(e) => warn!("watch trigger error (incremental): {e}"),
151    }
152}
153
154/// 构造增量处理用的临时 `Config`:指向 `tmp_file` 路径,强制 append=true(per D-09)。
155fn build_incremental_cfg(cfg: &Config, tmp_file: &tempfile::NamedTempFile) -> Config {
156    let mut tmp_cfg = cfg.clone();
157    tmp_cfg.sqllog.inputs = vec![tmp_file.path().to_string_lossy().into_owned()];
158    // WATCH-07 (D-01): 增量路径强制 CSV + SQLite 追加;WATCH-08 (D-05): error log 追加模式
159    force_append_for_watch_trigger(&mut tmp_cfg);
160    tmp_cfg
161}