use super::append::force_append_for_watch_trigger;
use super::offsets;
use super::state::WatchLoopState;
use super::trigger_full::update_status_bar;
use crate::config::Config;
use indicatif::ProgressBar;
use log::warn;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
pub fn trigger_incremental(
path: &Path,
cfg: &Config,
quiet: bool,
verbose: bool,
interrupted: &Arc<AtomicBool>,
state: &mut WatchLoopState,
pb: &ProgressBar,
) {
if !path.exists() {
warn!(
"watch: triggered path no longer exists, skipping: {}",
path.display()
);
return;
}
let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
let Ok(metadata) = std::fs::metadata(path) else {
warn!("watch: metadata error for {}", path.display());
return;
};
let new_size = metadata.len();
let Some(start_offset) = resolve_incremental_offset(&canonical_path, new_size, state) else {
return; };
let Ok(tmp_file) = read_bytes_to_tempfile(path, start_offset) else {
warn!("watch: read_bytes_to_tempfile error for {}", path.display());
return;
};
run_incremental_handle_run(
path,
&canonical_path,
tmp_file,
new_size,
cfg,
quiet,
verbose,
interrupted,
state,
pb,
);
}
fn resolve_incremental_offset(
canonical_path: &Path,
new_size: u64,
state: &mut WatchLoopState,
) -> Option<u64> {
let Some(&start_offset) = state.file_offsets.get(canonical_path) else {
state
.file_offsets
.insert(canonical_path.to_path_buf(), new_size);
if let Some(ref database_url) = state.sqlite_db_url {
offsets::save_offset(database_url, canonical_path, new_size);
}
return None;
};
if new_size < start_offset {
warn!(
"watch: file shrank ({} → {} bytes), resetting offset for {}",
start_offset,
new_size,
canonical_path.display()
);
state.file_offsets.insert(canonical_path.to_path_buf(), 0);
if let Some(ref database_url) = state.sqlite_db_url {
offsets::save_offset(database_url, canonical_path, 0);
}
return Some(0);
}
if new_size == start_offset {
return None;
}
Some(start_offset)
}
pub(super) fn read_bytes_to_tempfile(
path: &Path,
start_offset: u64,
) -> std::io::Result<tempfile::NamedTempFile> {
use std::io::{Read, Seek, SeekFrom, Write};
let mut source_file = std::fs::File::open(path)?;
source_file.seek(SeekFrom::Start(start_offset))?;
let mut buffer = Vec::new();
source_file.read_to_end(&mut buffer)?;
let mut tmp_file = tempfile::Builder::new()
.prefix("sqllog2db-watch-")
.suffix(".log")
.tempfile()?;
tmp_file.write_all(&buffer)?;
tmp_file.flush()?;
Ok(tmp_file)
}
#[allow(clippy::needless_pass_by_value)]
fn run_incremental_handle_run(
original_path: &Path,
canonical_path: &Path,
tmp_file: tempfile::NamedTempFile,
new_size: u64,
cfg: &Config,
quiet: bool,
verbose: bool,
interrupted: &Arc<AtomicBool>,
state: &mut WatchLoopState,
pb: &ProgressBar,
) {
let tmp_cfg = build_incremental_cfg(cfg, &tmp_file);
match crate::cli::run::handle_run(&tmp_cfg, quiet, verbose, interrupted, None) {
Ok(file_stats) => {
state.total_stats.merge(&file_stats);
let last_elapsed = state.last_trigger_at.map(|t| t.elapsed());
state.trigger_count += 1;
state.last_trigger_at = Some(Instant::now());
state
.file_offsets
.insert(canonical_path.to_path_buf(), new_size);
if let Some(ref database_url) = state.sqlite_db_url {
offsets::save_offset(database_url, canonical_path, new_size);
}
update_status_bar(original_path, state, pb, last_elapsed);
}
Err(crate::error::Error::Interrupted) => interrupted.store(true, Ordering::Release),
Err(e) => warn!("watch trigger error (incremental): {e}"),
}
}
fn build_incremental_cfg(cfg: &Config, tmp_file: &tempfile::NamedTempFile) -> Config {
let mut tmp_cfg = cfg.clone();
tmp_cfg.sqllog.inputs = vec![tmp_file.path().to_string_lossy().into_owned()];
force_append_for_watch_trigger(&mut tmp_cfg);
tmp_cfg
}