use anyhow::Result;
use colored::*;
use memmap2::Mmap;
use notify::event::{ModifyKind, RenameMode};
use notify::{EventKind, RecursiveMode};
use notify_debouncer_full::{new_debouncer, DebounceEventResult};
use once_cell::sync::Lazy;
use std::fs::File;
use std::path::{Component, Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::crdt::{Operation, OperationType, Position};
use tracing::{debug, error};
use crate::storage::OperationLog;
use crate::sync::{SyncManager, GLOBAL_CLOCK};
use crate::watcher_legacy::cache_warmer;
use dashmap::DashMap;
use std::sync::Arc as StdArc;
use uuid::Uuid;
static PATH_STRING_CACHE: Lazy<DashMap<PathBuf, String>> = Lazy::new(|| DashMap::new());
static FILE_HASH_CACHE: Lazy<DashMap<PathBuf, (u64, u64, u64)>> = Lazy::new(|| DashMap::new());
static RECENT_LOGS: Lazy<DashMap<PathBuf, u128>> = Lazy::new(|| DashMap::new());
const LOG_THROTTLE_MS: u128 = 5_000;
#[inline(always)]
fn path_to_string(path: &Path) -> String {
if let Some(cached) = PATH_STRING_CACHE.get(path) {
return cached.value().clone();
}
let s = path.display().to_string();
PATH_STRING_CACHE.insert(path.to_path_buf(), s.clone());
s
}
#[inline(always)]
#[allow(dead_code)]
fn file_definitely_changed(path: &Path) -> bool {
let Ok(metadata) = std::fs::metadata(path) else {
return true;
};
let size = metadata.len();
let Ok(mtime) = metadata.modified() else {
return true;
};
let Ok(mtime_secs) = mtime.duration_since(std::time::UNIX_EPOCH) else {
return true;
};
if let Some(cached) = FILE_HASH_CACHE.get(path) {
let (_hash, cached_mtime, cached_size) = *cached.value();
if cached_mtime == mtime_secs.as_secs() && cached_size == size {
return false; }
}
FILE_HASH_CACHE.insert(path.to_path_buf(), (0, mtime_secs.as_secs(), size));
true
}
static RAPID_SEQUENCE: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
const RAPID_MODE_ENABLED: bool = true;
#[inline(always)]
fn detect_rapid_change(path: &Path) -> Option<u64> {
if !RAPID_MODE_ENABLED {
return Some(0);
}
let start = Instant::now();
let sequence = RAPID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
FILE_HASH_CACHE.insert(path.to_path_buf(), (0, sequence, 0));
let elapsed = start.elapsed().as_micros() as u64;
Some(elapsed)
}
fn detect_quality_operations(
path: &Path,
actor_id: &str,
rapid_time_us: u64,
) -> Result<DetectionReport> {
let start = Instant::now();
if !RAPID_MODE_ENABLED {
let report = detect_operations(path, actor_id, false)?;
let total_time = start.elapsed().as_micros();
if !report.ops.is_empty() {
debug!(
"⚙️ [QUALITY ONLY {}µs] {} - {} ops",
total_time,
path_to_string(path).bright_green(),
report.ops.len()
);
}
return Ok(report);
}
let report = detect_operations(path, actor_id, true)?;
let quality_time = start.elapsed().as_micros();
let total_time = rapid_time_us as u128 + quality_time;
if !report.ops.is_empty() {
debug!(
"⚡ [{} rapid+{}µs quality={}µs total] {} - {} ops",
rapid_time_us, quality_time, total_time,
path_to_string(path).bright_green(),
report.ops.len()
);
}
Ok(report)
}
const PROFILE_DETECT: bool = false;
const RAPID_TARGET_US: u128 = 35;
enum WatchMode {
Debounced(Duration), }
const DEBOUNCE_MS: u64 = 1;
impl WatchMode {
fn from_env() -> Self {
WatchMode::Debounced(Duration::from_millis(DEBOUNCE_MS))
}
}
pub async fn start_watching(
path: PathBuf,
oplog: Arc<OperationLog>,
actor_id: String,
_repo_id: String,
sync_mgr: Option<StdArc<SyncManager>>,
) -> Result<()> {
let mode = WatchMode::from_env();
match mode {
WatchMode::Debounced(debounce) => {
start_debounced_watcher(path, oplog, actor_id, sync_mgr, debounce).await
}
}
}
async fn start_debounced_watcher(
path: PathBuf,
oplog: Arc<OperationLog>,
actor_id: String,
sync_mgr: Option<StdArc<SyncManager>>,
debounce: Duration,
) -> Result<()> {
let (tx, rx) = channel();
let mut debouncer = new_debouncer(debounce, None, tx)?;
debouncer.watch(&path, RecursiveMode::Recursive)?;
process_events_loop(rx, actor_id, oplog, sync_mgr).await
}
async fn process_events_loop(
rx: Receiver<DebounceEventResult>,
actor_id: String,
oplog: Arc<OperationLog>,
sync_mgr: Option<StdArc<SyncManager>>,
) -> Result<()> {
while let Ok(result) = rx.recv() {
match result {
Ok(events) => {
for event in events {
let start = Instant::now();
let paths_list: Vec<String> = event
.paths
.iter()
.map(|p| path_to_string(p))
.collect();
debug!(
"Event received: kind={:?} paths={}",
event.kind,
paths_list.join(", ")
);
match &event.kind {
EventKind::Modify(ModifyKind::Name(mode)) => match *mode {
RenameMode::From => {
if let Some(old_path) = event.paths.first() {
if is_temp_path(old_path) {
cache_temp_content(old_path);
}
remember_rename_source(Some(old_path.clone()));
}
}
RenameMode::To => {
let new_path = event.paths.last().cloned();
let mut old_path = take_rename_source();
if old_path.is_none() && event.paths.len() >= 2 {
old_path = event.paths.get(0).cloned();
}
if let (Some(old), Some(new)) = (old_path, new_path) {
handle_rename_transition(
old,
new,
&actor_id,
start,
oplog.as_ref(),
&sync_mgr,
)?;
}
}
RenameMode::Both => {
if event.paths.len() >= 2 {
let old = event.paths[0].clone();
let new = event.paths[1].clone();
handle_rename_transition(
old,
new,
&actor_id,
start,
oplog.as_ref(),
&sync_mgr,
)?;
}
}
_ => {}
},
EventKind::Modify(_) => {
for path in &event.paths {
process_path(path, &actor_id, start, oplog.as_ref(), &sync_mgr)?;
}
}
EventKind::Create(_) => {
for path in &event.paths {
let _ = cache_warmer::warm_file(path);
process_path(path, &actor_id, start, oplog.as_ref(), &sync_mgr)?;
}
}
EventKind::Remove(_) => {
for path in &event.paths {
if is_temp_path(path) {
continue;
}
TEMP_CONTENT_CACHE.remove(path);
if should_track(path) {
let detect_start = Instant::now();
clear_prev_state(path);
clear_last_operation_entry(path);
let op = register_operation(Operation::new(
path_to_string(path),
OperationType::FileDelete,
actor_id.clone(),
));
let detect_us = detect_start.elapsed().as_micros();
emit_operations(
vec![op],
detect_us,
start,
oplog.as_ref(),
&sync_mgr,
)?;
}
}
}
_ => {}
}
}
}
Err(errors) => {
for error in errors {
error!("{} Debouncer error: {}", "⚠️".bright_red(), error);
}
}
}
}
Ok(())
}
#[derive(Clone)]
struct FileSnapshot {
content: String,
byte_len: u64,
char_len: usize,
char_to_byte: Vec<usize>,
line_starts: Vec<usize>,
}
#[derive(Default, Clone, Copy)]
#[allow(dead_code)]
struct DetectionTimings {
cached_us: u128,
metadata_us: u128,
read_us: u128,
tail_us: u128,
diff_us: u128,
total_us: u128,
}
struct DetectionReport {
ops: Vec<Operation>,
timings: DetectionTimings,
}
static PREV_STATE: Lazy<DashMap<PathBuf, FileSnapshot>> = Lazy::new(|| DashMap::new());
static LAST_OPERATION: Lazy<DashMap<String, Uuid>> = Lazy::new(|| DashMap::new());
static TEMP_CONTENT_CACHE: Lazy<DashMap<PathBuf, (Arc<String>, Instant)>> =
Lazy::new(|| DashMap::new());
static LAST_RENAME_SOURCE: Lazy<StdMutex<Option<PathBuf>>> = Lazy::new(|| StdMutex::new(None));
const PREV_CONTENT_LIMIT: usize = 2_048;
const MAX_TRACKED_FILE_BYTES: u64 = 1_000_000; const TEMP_CACHE_LIMIT: usize = 256;
fn enforce_prev_state_limit() {
while PREV_STATE.len() > PREV_CONTENT_LIMIT {
if let Some(entry) = PREV_STATE.iter().next() {
let key = entry.key().clone();
drop(entry);
PREV_STATE.remove(&key);
} else {
break;
}
}
}
fn record_throughput(_micros: u128) {
}
fn emit_operations(
ops: Vec<Operation>,
detect_us: u128,
start: Instant,
oplog: &OperationLog,
sync_mgr: &Option<StdArc<SyncManager>>,
) -> Result<()> {
if ops.is_empty() {
return Ok(());
}
for op in ops {
let append_result = oplog.append(op.clone())?;
if append_result {
if let Some(mgr) = sync_mgr {
let _ = mgr.publish(StdArc::new(op.clone()));
}
let total_us = start.elapsed().as_micros();
if total_us < RAPID_TARGET_US || total_us > 15_000 {
print_operation(&op, total_us, detect_us, 0);
}
record_throughput(total_us);
}
}
Ok(())
}
fn process_path(
path: &Path,
actor_id: &str,
start: Instant,
oplog: &OperationLog,
sync_mgr: &Option<StdArc<SyncManager>>,
) -> Result<()> {
if is_temp_path(path) {
cache_temp_content(path);
return Ok(());
}
if !should_track(path) || path.is_dir() {
return Ok(());
}
let rapid_result = detect_rapid_change(path);
let Some(rapid_time_us) = rapid_result else {
return Ok(());
};
match detect_quality_operations(path, actor_id, rapid_time_us) {
Ok(report) => {
if !report.ops.is_empty() {
let detect_us = report.timings.total_us;
emit_operations(report.ops, detect_us, start, oplog, sync_mgr)?;
}
}
Err(_) => {
}
}
Ok(())
}
fn handle_rename_transition(
old_path: PathBuf,
new_path: PathBuf,
actor_id: &str,
start: Instant,
oplog: &OperationLog,
sync_mgr: &Option<StdArc<SyncManager>>,
) -> Result<()> {
remember_rename_source(None);
move_cached_content(&old_path, &new_path);
let old_is_temp = is_temp_path(&old_path);
let new_is_temp = is_temp_path(&new_path);
if old_is_temp && !new_is_temp {
if !should_track(&new_path) {
TEMP_CONTENT_CACHE.remove(&new_path);
return Ok(());
}
if let Some(content) = take_cached_content(&new_path) {
let report = detect_operations_with_content(&new_path, actor_id, Some(content), false)?;
if !report.ops.is_empty() {
emit_operations(report.ops, report.timings.total_us, start, oplog, sync_mgr)?;
}
return Ok(());
}
process_path(&new_path, actor_id, start, oplog, sync_mgr)?;
return Ok(());
}
let old_trackable = should_track(&old_path);
let new_trackable = should_track(&new_path);
if old_trackable && new_trackable {
move_prev_state_entry(&old_path, &new_path);
move_last_operation_entry(&old_path, &new_path);
let detect_start = Instant::now();
let op = register_operation(Operation::new(
path_to_string(&new_path),
OperationType::FileRename {
old_path: path_to_string(&old_path),
new_path: path_to_string(&new_path),
},
actor_id.to_string(),
));
let detect_us = detect_start.elapsed().as_micros();
emit_operations(vec![op], detect_us, start, oplog, sync_mgr)?;
} else if !old_trackable && new_trackable {
process_path(&new_path, actor_id, start, oplog, sync_mgr)?;
} else if old_trackable && !new_trackable {
TEMP_CONTENT_CACHE.remove(&old_path);
clear_prev_state(&old_path);
clear_last_operation_entry(&old_path);
let detect_start = Instant::now();
let op = register_operation(Operation::new(
path_to_string(&old_path),
OperationType::FileDelete,
actor_id.to_string(),
));
let detect_us = detect_start.elapsed().as_micros();
emit_operations(vec![op], detect_us, start, oplog, sync_mgr)?;
}
Ok(())
}
#[inline(always)]
fn detect_operations(
path: &Path,
actor_id: &str,
suppress_logging: bool,
) -> Result<DetectionReport> {
detect_operations_with_content(path, actor_id, None, suppress_logging)
}
#[inline(always)]
fn detect_operations_with_content(
path: &Path,
actor_id: &str,
override_content: Option<String>,
suppress_logging: bool,
) -> Result<DetectionReport> {
let detect_start = Instant::now();
let timings = DetectionTimings::default();
let mut cached_content = match override_content {
Some(content) => Some(content),
None => take_cached_content(path),
};
let previous_snapshot = PREV_STATE.get(path).map(|entry| entry.value().clone());
if previous_snapshot.is_none() {
let new_content = match cached_content.take() {
Some(text) => text,
None => match read_file_fast(path) {
Ok(text) => text,
Err(_) => {
return Ok(finalize_detection(
path,
detect_start,
timings,
Vec::new(),
suppress_logging,
))
}
},
};
if new_content.len() as u64 > MAX_TRACKED_FILE_BYTES {
return Ok(finalize_detection(
path,
detect_start,
timings,
Vec::new(),
suppress_logging,
));
}
let snapshot = build_snapshot_fast(&new_content);
update_prev_state(path, Some(snapshot));
let op = register_operation(Operation::new(
path_to_string(path),
OperationType::FileCreate {
content: new_content,
},
actor_id.to_string(),
));
return Ok(finalize_detection(
path,
detect_start,
timings,
vec![op],
suppress_logging,
));
}
let mut prev = previous_snapshot.unwrap();
let new_content = match cached_content.take() {
Some(text) => text,
None => match read_file_fast(path) {
Ok(text) => text,
Err(_) => {
return Ok(finalize_detection(
path,
detect_start,
timings,
Vec::new(),
suppress_logging,
))
}
},
};
if new_content.len() == prev.content.len() {
if new_content.as_bytes() == prev.content.as_bytes() {
return Ok(finalize_detection(
path,
detect_start,
timings,
Vec::new(),
suppress_logging,
));
}
}
if new_content.len() > prev.content.len() && new_content.starts_with(&prev.content) {
let appended_slice = &new_content[prev.content.len()..];
if !appended_slice.is_empty() {
let appended = appended_slice.to_string();
let char_offset = prev.char_len;
let (line, col) = line_col_from_snapshot(&prev, char_offset);
let lamport = GLOBAL_CLOCK.tick();
let appended_len = appended.chars().count();
let op = register_operation(Operation::new(
path_to_string(path),
OperationType::Insert {
position: Position::new(line, col, char_offset, actor_id.to_string(), lamport),
content: appended.clone(),
length: appended_len,
},
actor_id.to_string(),
));
extend_snapshot(&mut prev, &appended);
update_prev_state(path, Some(prev));
return Ok(finalize_detection(
path,
detect_start,
timings,
vec![op],
suppress_logging,
));
}
}
let new_snapshot = build_snapshot_fast(&new_content);
if new_snapshot.byte_len > MAX_TRACKED_FILE_BYTES {
update_prev_state(path, None);
return Ok(finalize_detection(
path,
detect_start,
timings,
Vec::new(),
suppress_logging,
));
}
let ops = fast_diff_ops(path, actor_id, &prev, &new_snapshot);
update_prev_state(path, Some(new_snapshot));
Ok(finalize_detection(
path,
detect_start,
timings,
ops,
suppress_logging,
))
}
#[inline(always)]
fn build_snapshot_fast(content: &str) -> FileSnapshot {
let byte_len = content.len() as u64;
let char_len = if content.is_ascii() {
content.len()
} else {
content.chars().count()
};
let char_to_byte = if content.is_ascii() {
Vec::new() } else {
let mut mapping = Vec::with_capacity(char_len + 1);
for (byte_idx, _) in content.char_indices() {
mapping.push(byte_idx);
}
mapping.push(content.len());
mapping
};
let mut line_starts = vec![0];
if memchr::memrchr(b'\n', content.as_bytes()).is_some() {
let bytes = content.as_bytes();
let mut pos = 0;
while let Some(idx) = memchr::memchr(b'\n', &bytes[pos..]) {
pos += idx + 1;
line_starts.push(if content.is_ascii() {
pos } else {
content[..pos].chars().count() });
}
}
FileSnapshot {
content: content.to_string(),
byte_len,
char_len,
char_to_byte,
line_starts,
}
}
fn finalize_detection(
path: &Path,
detect_start: Instant,
mut timings: DetectionTimings,
ops: Vec<Operation>,
suppress_logging: bool,
) -> DetectionReport {
timings.total_us = detect_start.elapsed().as_micros();
if !suppress_logging {
profile_detect(path, &timings, !ops.is_empty());
}
DetectionReport { ops, timings }
}
fn profile_detect(path: &Path, timings: &DetectionTimings, has_ops: bool) {
if !PROFILE_DETECT && !has_ops {
return;
}
if PROFILE_DETECT || has_ops {
debug!(
"⚙️ detect {} | total={}µs",
path.display(),
timings.total_us
);
}
}
fn extend_snapshot(snapshot: &mut FileSnapshot, appended: &str) {
if appended.is_empty() {
return;
}
let base_byte = snapshot.content.len();
let is_ascii = appended.is_ascii();
let appended_char_count = if is_ascii {
appended.len()
} else {
appended.chars().count()
};
if !snapshot.char_to_byte.is_empty() {
snapshot.char_to_byte.pop();
if is_ascii {
snapshot
.char_to_byte
.extend((0..appended.len()).map(|i| base_byte + i));
} else {
snapshot.char_to_byte.extend(
appended
.char_indices()
.map(|(offset, _)| base_byte + offset),
);
}
snapshot
.char_to_byte
.push(snapshot.content.len() + appended.len());
}
let appended_bytes = appended.as_bytes();
let mut pos = 0;
while let Some(idx) = memchr::memchr(b'\n', &appended_bytes[pos..]) {
pos += idx + 1;
let char_pos = if is_ascii {
snapshot.char_len + pos
} else {
snapshot.char_len + appended[..pos].chars().count()
};
snapshot.line_starts.push(char_pos);
}
snapshot.content.push_str(appended);
snapshot.byte_len = snapshot.content.len() as u64;
snapshot.char_len += appended_char_count;
}
fn line_col_from_snapshot(snapshot: &FileSnapshot, char_idx: usize) -> (usize, usize) {
let starts = &snapshot.line_starts;
let partition = starts.partition_point(|&start| start <= char_idx);
let line_idx = partition.saturating_sub(1);
let line_start = starts.get(line_idx).copied().unwrap_or(0);
(line_idx + 1, char_idx.saturating_sub(line_start) + 1)
}
fn fast_diff_ops(
path: &Path,
actor_id: &str,
old_snapshot: &FileSnapshot,
new_snapshot: &FileSnapshot,
) -> Vec<Operation> {
if old_snapshot.byte_len == new_snapshot.byte_len {
if std::ptr::eq(&old_snapshot.content, &new_snapshot.content) {
return Vec::new();
}
if old_snapshot.content.as_bytes() == new_snapshot.content.as_bytes() {
return Vec::new();
}
}
let old_snap = ensure_char_mapping(old_snapshot);
let new_snap = ensure_char_mapping(new_snapshot);
let old_bytes = old_snap.content.as_bytes();
let new_bytes = new_snap.content.as_bytes();
let change = match compute_change_range_fast(old_bytes, new_bytes, &old_snap, &new_snap) {
Some(range) => range,
None => return Vec::new(),
};
let (old_start, old_end, new_start, new_end) = change;
let old_start_byte = if old_start < old_snap.char_to_byte.len() {
old_snap.char_to_byte[old_start]
} else {
old_snap.content.len()
};
let old_end_byte = if old_end < old_snap.char_to_byte.len() {
old_snap.char_to_byte[old_end]
} else {
old_snap.content.len()
};
let new_start_byte = if new_start < new_snap.char_to_byte.len() {
new_snap.char_to_byte[new_start]
} else {
new_snap.content.len()
};
let new_end_byte = if new_end < new_snap.char_to_byte.len() {
new_snap.char_to_byte[new_end]
} else {
new_snap.content.len()
};
if old_start_byte == old_end_byte && new_start_byte == new_end_byte {
return Vec::new();
}
let old_segment = &old_snap.content[old_start_byte..old_end_byte];
let new_segment = &new_snap.content[new_start_byte..new_end_byte];
let (line, col) = line_col_from_snapshot(&old_snap, old_start);
let lamport = GLOBAL_CLOCK.tick();
let base_position = Position::new(line, col, old_start, actor_id.to_string(), lamport);
let op_type = match (old_segment.is_empty(), new_segment.is_empty()) {
(true, false) => OperationType::Insert {
position: base_position.clone(),
content: new_segment.to_string(),
length: new_end - new_start,
},
(false, true) => OperationType::Delete {
position: base_position.clone(),
length: old_end - old_start,
},
(false, false) => OperationType::Replace {
position: base_position.clone(),
old_content: old_segment.to_string(),
new_content: new_segment.to_string(),
},
(true, true) => return Vec::new(),
};
let op = Operation::new(path_to_string(path), op_type, actor_id.to_string());
vec![register_operation(op)]
}
#[inline]
fn ensure_char_mapping(snapshot: &FileSnapshot) -> std::borrow::Cow<'_, FileSnapshot> {
if !snapshot.char_to_byte.is_empty() {
return std::borrow::Cow::Borrowed(snapshot);
}
let mut new_snap = snapshot.clone();
new_snap.char_to_byte = (0..=snapshot.content.len()).collect();
std::borrow::Cow::Owned(new_snap)
}
#[inline]
fn compute_change_range_fast(
old_bytes: &[u8],
new_bytes: &[u8],
old_snapshot: &FileSnapshot,
new_snapshot: &FileSnapshot,
) -> Option<(usize, usize, usize, usize)> {
if old_snapshot.char_len == 0 && new_snapshot.char_len == 0 {
return None;
}
let common_prefix_bytes = if old_bytes.len() > 8192 && new_bytes.len() > 8192 {
use rayon::prelude::*;
let chunk_size = 4096;
let min_len = old_bytes.len().min(new_bytes.len());
let num_chunks = (min_len + chunk_size - 1) / chunk_size;
(0..num_chunks)
.into_par_iter()
.map(|chunk_idx| {
let start = chunk_idx * chunk_size;
let end = (start + chunk_size).min(min_len);
let chunk_old = &old_bytes[start..end];
let chunk_new = &new_bytes[start..end];
chunk_old
.iter()
.zip(chunk_new.iter())
.take_while(|(a, b)| a == b)
.count()
})
.enumerate()
.find_first(|(_, prefix_len)| *prefix_len < chunk_size)
.map(|(idx, partial)| idx * chunk_size + partial)
.unwrap_or(min_len)
} else {
old_bytes
.iter()
.zip(new_bytes.iter())
.take_while(|(a, b)| a == b)
.count()
};
let remaining_old = old_bytes.len() - common_prefix_bytes;
let remaining_new = new_bytes.len() - common_prefix_bytes;
let common_suffix_bytes = if remaining_old > 0 && remaining_new > 0 {
old_bytes[common_prefix_bytes..]
.iter()
.rev()
.zip(new_bytes[common_prefix_bytes..].iter().rev())
.take_while(|(a, b)| a == b)
.count()
.min(remaining_old.min(remaining_new))
} else {
0
};
let old_is_ascii = old_snapshot.char_to_byte.is_empty();
let new_is_ascii = new_snapshot.char_to_byte.is_empty();
let prefix_chars = if old_is_ascii {
common_prefix_bytes } else {
old_snapshot
.char_to_byte
.iter()
.position(|&b| b >= common_prefix_bytes)
.unwrap_or(old_snapshot.char_len)
};
let old_suffix_byte_pos = old_bytes.len() - common_suffix_bytes;
let old_suffix_chars = if old_is_ascii {
old_suffix_byte_pos } else {
old_snapshot
.char_to_byte
.iter()
.position(|&b| b >= old_suffix_byte_pos)
.unwrap_or(old_snapshot.char_len)
};
let new_suffix_byte_pos = new_bytes.len() - common_suffix_bytes;
let new_suffix_chars = if new_is_ascii {
new_suffix_byte_pos } else {
new_snapshot
.char_to_byte
.iter()
.position(|&b| b >= new_suffix_byte_pos)
.unwrap_or(new_snapshot.char_len)
};
if prefix_chars == old_snapshot.char_len && prefix_chars == new_snapshot.char_len {
return None;
}
Some((
prefix_chars,
old_suffix_chars,
prefix_chars,
new_suffix_chars,
))
}
fn should_track(path: &Path) -> bool {
is_trackable(path)
}
fn print_operation(op: &Operation, _total_us: u128, _detect_us: u128, _queue_us: u128) {
let filename = std::path::Path::new(&op.file_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(&op.file_path);
let path_buf = std::path::Path::new(&op.file_path).to_path_buf();
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::from_millis(0))
.as_millis();
if let Some(last) = RECENT_LOGS.get(&path_buf) {
if now_ms.saturating_sub(*last.value()) < LOG_THROTTLE_MS {
return;
}
}
RECENT_LOGS.insert(path_buf, now_ms);
if RECENT_LOGS.len() > 5_000 {
let cutoff = now_ms.saturating_sub(3_600_000); let old_keys: Vec<PathBuf> = RECENT_LOGS
.iter()
.filter(|entry| *entry.value() < cutoff)
.map(|entry| entry.key().clone())
.collect();
for key in old_keys {
RECENT_LOGS.remove(&key);
}
}
match &op.op_type {
OperationType::Insert { position, content, .. } => {
let preview = truncate_with_preview(content, 50);
debug!("[+] {} L{}:{} {}", filename, position.line, position.column, preview);
}
OperationType::Delete { position, length } => {
debug!("[-] {} L{}:{} ({} chars)", filename, position.line, position.column, length);
}
OperationType::Replace { position, new_content, .. } => {
let preview = truncate_with_preview(new_content, 50);
debug!("[~] {} L{}:{} {}", filename, position.line, position.column, preview);
}
OperationType::FileCreate { content } => {
debug!("[NEW] {} ({} lines)", filename, content.lines().count());
}
OperationType::FileDelete => {
debug!("[DEL] {}", filename);
}
OperationType::FileRename { old_path, new_path } => {
let old_name = std::path::Path::new(old_path).file_name().and_then(|n| n.to_str()).unwrap_or(old_path);
let new_name = std::path::Path::new(new_path).file_name().and_then(|n| n.to_str()).unwrap_or(new_path);
debug!("[RENAME] {} -> {}", old_name, new_name);
}
}
}
fn truncate_with_preview(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.replace('\n', "\\n").replace('\t', "\\t")
} else {
let truncated = &s[..max_len.min(s.len())];
format!("{}…", truncated.replace('\n', "\\n").replace('\t', "\\t"))
}
}
fn register_operation(op: Operation) -> Operation {
let file_path = op.file_path.clone();
let op = if let Some(prev) = LAST_OPERATION.get(&file_path) {
op.with_parents(vec![*prev])
} else {
op
};
LAST_OPERATION.insert(file_path, op.id);
op
}
fn update_prev_state(path: &Path, snapshot: Option<FileSnapshot>) {
if let Some(state) = snapshot {
PREV_STATE.insert(path.to_path_buf(), state);
} else {
PREV_STATE.remove(path);
}
if PREV_STATE.len() > PREV_CONTENT_LIMIT + 100 {
enforce_prev_state_limit();
}
}
fn clear_prev_state(path: &Path) {
update_prev_state(path, None);
cache_warmer::FILE_POOL.write().remove(path);
}
fn move_prev_state_entry(old: &Path, new: &Path) {
let old_key = old.to_path_buf();
if let Some((_, snapshot)) = PREV_STATE.remove(&old_key) {
PREV_STATE.insert(new.to_path_buf(), snapshot);
enforce_prev_state_limit();
}
let mut pool = cache_warmer::FILE_POOL.write();
if let Some(file) = pool.remove(old) {
pool.insert(new.to_path_buf(), file);
}
}
fn move_last_operation_entry(old: &Path, new: &Path) {
let old_key = path_key(old);
if let Some((_, op_id)) = LAST_OPERATION.remove(&old_key) {
LAST_OPERATION.insert(path_key(new), op_id);
}
}
fn clear_last_operation_entry(path: &Path) {
LAST_OPERATION.remove(&path_key(path));
}
fn path_key(path: &Path) -> String {
path_to_string(path)
}
fn is_temp_path(path: &Path) -> bool {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
let lower = name.to_ascii_lowercase();
return lower.ends_with('~')
|| lower.ends_with(".tmp")
|| lower.ends_with(".temp")
|| lower.ends_with(".swp")
|| lower.ends_with(".swx")
|| lower.ends_with(".bak")
|| lower.ends_with(".bk")
|| lower.starts_with('~')
|| lower.starts_with(".#")
|| lower.starts_with(".~")
|| lower.starts_with(".tmp")
|| lower.starts_with(".goutputstream")
|| lower.contains("goutputstream");
}
false
}
fn cache_temp_content(path: &Path) {
if !is_temp_path(path) {
return;
}
if let Ok(content) = read_file_fast(path) {
let arc = Arc::new(content);
TEMP_CONTENT_CACHE.insert(path.to_path_buf(), (arc, Instant::now()));
enforce_temp_cache_limit();
}
}
fn move_cached_content(old: &Path, new: &Path) {
if old == new {
return;
}
if let Some((_, entry)) = TEMP_CONTENT_CACHE.remove(old) {
TEMP_CONTENT_CACHE.insert(new.to_path_buf(), entry);
}
}
fn take_cached_content(path: &Path) -> Option<String> {
TEMP_CONTENT_CACHE
.remove(path)
.map(|(_, (arc, _))| match Arc::try_unwrap(arc) {
Ok(string) => string,
Err(shared) => shared.as_str().to_owned(),
})
}
fn enforce_temp_cache_limit() {
while TEMP_CONTENT_CACHE.len() > TEMP_CACHE_LIMIT {
if let Some(entry) = TEMP_CONTENT_CACHE.iter().next() {
let key = entry.key().clone();
drop(entry);
TEMP_CONTENT_CACHE.remove(&key);
} else {
break;
}
}
}
fn remember_rename_source(path: Option<PathBuf>) {
if let Ok(mut guard) = LAST_RENAME_SOURCE.lock() {
*guard = path;
}
}
fn take_rename_source() -> Option<PathBuf> {
if let Ok(mut guard) = LAST_RENAME_SOURCE.lock() {
guard.take()
} else {
None
}
}
fn read_file_fast(path: &Path) -> Result<String> {
{
let pool = cache_warmer::FILE_POOL.read();
if let Some(file_arc) = pool.get(path) {
let mmap = unsafe { Mmap::map(file_arc.as_ref())? };
return Ok(std::str::from_utf8(&mmap)?.to_string());
}
}
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
let content = std::str::from_utf8(&mmap)?.to_string();
cache_warmer::FILE_POOL
.write()
.insert(path.to_path_buf(), Arc::new(file));
Ok(content)
}
fn is_trackable(path: &Path) -> bool {
const IGNORED_COMPONENTS: [&str; 5] = [".git", ".dx", ".dx_client", "target", "node_modules"];
for component in path.components() {
if let Component::Normal(seg) = component {
if let Some(segment) = seg.to_str() {
let lower = segment.to_ascii_lowercase();
if IGNORED_COMPONENTS.iter().any(|needle| needle == &lower) {
return false;
}
}
}
}
true
}
#[cfg(test)]
mod tests {
use super::is_trackable;
use std::path::Path;
#[test]
fn ignores_git_directory_unix_style() {
assert!(!is_trackable(Path::new("/repo/.git/config")));
}
#[test]
fn ignores_git_directory_windows_style() {
assert!(!is_trackable(Path::new("C:\\repo\\.git\\config")));
}
#[test]
fn ignores_target_directory() {
assert!(!is_trackable(Path::new("/repo/target/debug/app")));
}
#[test]
fn ignores_dx_directory() {
assert!(!is_trackable(Path::new("/repo/.dx/forge/forge.db")));
}
#[test]
fn ignores_dx_client_directory() {
assert!(!is_trackable(Path::new("/repo/.dx_client/forge/forge.db")));
}
#[test]
fn tracks_regular_source_file() {
assert!(is_trackable(Path::new("/repo/src/main.rs")));
}
#[test]
fn tracks_nested_source_file() {
assert!(is_trackable(Path::new("C:\\repo\\src\\lib.rs")));
}
}