use super::append::force_append_for_watch_trigger;
use super::offsets;
use super::state::WatchLoopState;
use super::status::render_active_status;
use crate::config::Config;
use indicatif::ProgressBar;
use log::warn;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
pub fn trigger_full_file(
path: &Path,
cfg: &Config,
quiet: bool,
verbose: bool,
interrupted: &Arc<AtomicBool>,
state: &mut WatchLoopState,
pb: &ProgressBar,
) {
if !path.exists() {
warn!(
"watch: triggered path no longer exists, skipping: {}",
path.display()
);
return;
}
let mut tmp_cfg = cfg.clone();
tmp_cfg.sqllog.inputs = vec![path.to_string_lossy().into_owned()];
force_append_for_watch_trigger(&mut tmp_cfg);
match crate::cli::run::handle_run(&tmp_cfg, quiet, verbose, interrupted, None) {
Ok(file_stats) => {
state.total_stats.merge(&file_stats);
let last_elapsed = state.last_trigger_at.map(|t| t.elapsed());
state.trigger_count += 1;
state.last_trigger_at = Some(Instant::now());
record_offset_after_trigger(path, state);
update_status_bar(path, state, pb, last_elapsed);
}
Err(crate::error::Error::Interrupted) => interrupted.store(true, Ordering::Release),
Err(e) => warn!("watch trigger error (full): {e}"),
}
}
fn record_offset_after_trigger(path: &Path, state: &mut WatchLoopState) {
match std::fs::metadata(path) {
Ok(meta) => {
let new_size = meta.len();
let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
state.file_offsets.insert(canonical_path.clone(), new_size);
if let Some(ref database_url) = state.sqlite_db_url {
if let Err(e) = offsets::ensure_offset_table(database_url) {
warn!("watch: ensure_offset_table in record_offset_after_trigger: {e}");
}
offsets::save_offset(database_url, &canonical_path, new_size);
}
}
Err(e) => {
warn!(
"watch: metadata failed after trigger, offset not updated for {}: {e}",
path.display()
);
}
}
}
pub(super) fn update_status_bar(
path: &Path,
state: &WatchLoopState,
pb: &ProgressBar,
last_elapsed: Option<Duration>,
) {
let dir = path
.parent()
.map(|p| p.display().to_string())
.unwrap_or_default();
pb.set_message(render_active_status(
&dir,
state.trigger_count,
state.total_stats.records_exported,
last_elapsed.unwrap_or_default(),
));
}