dm_database_sqllog2db/cli/watch/
trigger_incremental.rs1use super::append::force_append_for_watch_trigger;
4use super::offsets;
5use super::state::WatchLoopState;
6use super::trigger_full::update_status_bar;
7use crate::config::Config;
8use indicatif::ProgressBar;
9use log::warn;
10use std::path::Path;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Instant;
14
15pub fn trigger_incremental(
17 path: &Path,
18 cfg: &Config,
19 quiet: bool,
20 verbose: bool,
21 interrupted: &Arc<AtomicBool>,
22 state: &mut WatchLoopState,
23 pb: &ProgressBar,
24) {
25 if !path.exists() {
26 warn!(
27 "watch: triggered path no longer exists, skipping: {}",
28 path.display()
29 );
30 return;
31 }
32 let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
33 let Ok(metadata) = std::fs::metadata(path) else {
34 warn!("watch: metadata error for {}", path.display());
35 return;
36 };
37 let new_size = metadata.len();
38 let Some(start_offset) = resolve_incremental_offset(&canonical_path, new_size, state) else {
39 return; };
41 let Ok(tmp_file) = read_bytes_to_tempfile(path, start_offset) else {
42 warn!("watch: read_bytes_to_tempfile error for {}", path.display());
43 return;
44 };
45 run_incremental_handle_run(
46 path,
47 &canonical_path,
48 tmp_file,
49 new_size,
50 cfg,
51 quiet,
52 verbose,
53 interrupted,
54 state,
55 pb,
56 );
57}
58
59fn resolve_incremental_offset(
62 canonical_path: &Path,
63 new_size: u64,
64 state: &mut WatchLoopState,
65) -> Option<u64> {
66 let Some(&start_offset) = state.file_offsets.get(canonical_path) else {
67 state
69 .file_offsets
70 .insert(canonical_path.to_path_buf(), new_size);
71 if let Some(ref database_url) = state.sqlite_db_url {
72 offsets::save_offset(database_url, canonical_path, new_size);
73 }
74 return None;
75 };
76 if new_size < start_offset {
77 warn!(
79 "watch: file shrank ({} → {} bytes), resetting offset for {}",
80 start_offset,
81 new_size,
82 canonical_path.display()
83 );
84 state.file_offsets.insert(canonical_path.to_path_buf(), 0);
85 if let Some(ref database_url) = state.sqlite_db_url {
86 offsets::save_offset(database_url, canonical_path, 0);
87 }
88 return Some(0);
89 }
90
91 if new_size == start_offset {
93 return None;
94 }
95
96 Some(start_offset)
97}
98
99pub(super) fn read_bytes_to_tempfile(
101 path: &Path,
102 start_offset: u64,
103) -> std::io::Result<tempfile::NamedTempFile> {
104 use std::io::{Read, Seek, SeekFrom, Write};
105 let mut source_file = std::fs::File::open(path)?;
106 source_file.seek(SeekFrom::Start(start_offset))?;
107 let mut buffer = Vec::new();
108 source_file.read_to_end(&mut buffer)?;
109 let mut tmp_file = tempfile::Builder::new()
110 .prefix("sqllog2db-watch-")
111 .suffix(".log")
112 .tempfile()?;
113 tmp_file.write_all(&buffer)?;
114 tmp_file.flush()?;
115 Ok(tmp_file)
116}
117
118#[allow(clippy::needless_pass_by_value)]
121fn run_incremental_handle_run(
122 original_path: &Path,
123 canonical_path: &Path,
124 tmp_file: tempfile::NamedTempFile,
125 new_size: u64,
126 cfg: &Config,
127 quiet: bool,
128 verbose: bool,
129 interrupted: &Arc<AtomicBool>,
130 state: &mut WatchLoopState,
131 pb: &ProgressBar,
132) {
133 let tmp_cfg = build_incremental_cfg(cfg, &tmp_file);
134 match crate::cli::run::handle_run(&tmp_cfg, quiet, verbose, interrupted, None) {
135 Ok(file_stats) => {
136 state.total_stats.merge(&file_stats);
137 let last_elapsed = state.last_trigger_at.map(|t| t.elapsed());
138 state.trigger_count += 1;
139 state.last_trigger_at = Some(Instant::now());
140 state
142 .file_offsets
143 .insert(canonical_path.to_path_buf(), new_size);
144 if let Some(ref database_url) = state.sqlite_db_url {
145 offsets::save_offset(database_url, canonical_path, new_size);
146 }
147 update_status_bar(original_path, state, pb, last_elapsed);
148 }
149 Err(crate::error::Error::Interrupted) => interrupted.store(true, Ordering::Release),
150 Err(e) => warn!("watch trigger error (incremental): {e}"),
151 }
152}
153
154fn build_incremental_cfg(cfg: &Config, tmp_file: &tempfile::NamedTempFile) -> Config {
156 let mut tmp_cfg = cfg.clone();
157 tmp_cfg.sqllog.inputs = vec![tmp_file.path().to_string_lossy().into_owned()];
158 force_append_for_watch_trigger(&mut tmp_cfg);
160 tmp_cfg
161}