use crate::error::Result;
use crate::reader::ConversationReader;
use crate::types::ConversationEntry;
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct WatcherConfig {
pub poll_interval: Duration,
pub debounce: Duration,
}
impl Default for WatcherConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(5),
debounce: Duration::from_millis(100),
}
}
}
pub struct AsyncConversationWatcher {
file_path: PathBuf,
byte_offset: Arc<Mutex<u64>>,
config: WatcherConfig,
}
impl AsyncConversationWatcher {
pub fn new(file_path: PathBuf, config: Option<WatcherConfig>) -> Self {
Self {
file_path,
byte_offset: Arc::new(Mutex::new(0)),
config: config.unwrap_or_default(),
}
}
pub fn with_offset(file_path: PathBuf, offset: u64, config: Option<WatcherConfig>) -> Self {
Self {
file_path,
byte_offset: Arc::new(Mutex::new(offset)),
config: config.unwrap_or_default(),
}
}
pub async fn offset(&self) -> u64 {
*self.byte_offset.lock().await
}
pub async fn poll(&self) -> Result<Vec<ConversationEntry>> {
let mut offset = self.byte_offset.lock().await;
let (entries, new_offset) = ConversationReader::read_from_offset(&self.file_path, *offset)?;
*offset = new_offset;
Ok(entries)
}
pub async fn start(self, tx: mpsc::Sender<Vec<ConversationEntry>>) -> Result<WatcherHandle> {
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
let file_path = self.file_path.clone();
let byte_offset = self.byte_offset.clone();
let poll_interval = self.config.poll_interval;
let debounce = self.config.debounce;
let (event_tx, mut event_rx) = mpsc::channel::<()>(16);
let event_tx_clone = event_tx.clone();
let file_path_clone = file_path.clone();
let watcher_result: std::result::Result<RecommendedWatcher, notify::Error> =
notify::recommended_watcher(move |res: std::result::Result<Event, notify::Error>| {
if let Ok(event) = res {
if event.kind.is_modify() {
for path in &event.paths {
if path == &file_path_clone {
let _ = event_tx_clone.blocking_send(());
break;
}
}
}
}
});
let mut watcher = match watcher_result {
Ok(mut w) => {
if let Some(parent) = file_path.parent() {
let _ = w.watch(parent, RecursiveMode::NonRecursive);
}
Some(w)
}
Err(e) => {
eprintln!(
"Warning: Failed to create file watcher: {}. Using poll-only mode.",
e
);
None
}
};
let handle = tokio::spawn(async move {
let mut poll_timer = tokio::time::interval(poll_interval);
let mut last_event = std::time::Instant::now();
loop {
tokio::select! {
_ = stop_rx.recv() => {
break;
}
Some(()) = event_rx.recv() => {
let now = std::time::Instant::now();
if now.duration_since(last_event) >= debounce {
last_event = now;
if let Ok(entries) = read_new_entries(&file_path, &byte_offset).await
&& !entries.is_empty() && tx.send(entries).await.is_err()
{
break; }
}
}
_ = poll_timer.tick() => {
if let Ok(entries) = read_new_entries(&file_path, &byte_offset).await
&& !entries.is_empty() && tx.send(entries).await.is_err()
{
break; }
}
}
}
drop(watcher.take());
});
Ok(WatcherHandle {
stop_tx,
_task: handle,
})
}
}
async fn read_new_entries(
file_path: &PathBuf,
byte_offset: &Arc<Mutex<u64>>,
) -> Result<Vec<ConversationEntry>> {
let mut offset = byte_offset.lock().await;
let (entries, new_offset) = ConversationReader::read_from_offset(file_path, *offset)?;
*offset = new_offset;
Ok(entries)
}
pub struct WatcherHandle {
stop_tx: mpsc::Sender<()>,
_task: tokio::task::JoinHandle<()>,
}
impl WatcherHandle {
pub async fn stop(self) {
let _ = self.stop_tx.send(()).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
use tokio::time::timeout;
#[tokio::test]
async fn test_poll_basic() {
let mut temp = NamedTempFile::new().unwrap();
writeln!(
temp,
r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
)
.unwrap();
temp.flush().unwrap();
let watcher = AsyncConversationWatcher::new(temp.path().to_path_buf(), None);
let entries = watcher.poll().await.unwrap();
assert_eq!(entries.len(), 1);
let entries = watcher.poll().await.unwrap();
assert!(entries.is_empty());
writeln!(
temp,
r#"{{"type":"assistant","uuid":"456","timestamp":"2024-01-01T00:00:01Z","sessionId":"test","message":{{"role":"assistant","content":"Hi"}}}}"#
)
.unwrap();
temp.flush().unwrap();
let entries = watcher.poll().await.unwrap();
assert_eq!(entries.len(), 1);
}
#[tokio::test]
async fn test_watcher_start_and_stop() {
let mut temp = NamedTempFile::new().unwrap();
writeln!(
temp,
r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
)
.unwrap();
temp.flush().unwrap();
let config = WatcherConfig {
poll_interval: Duration::from_millis(50),
debounce: Duration::from_millis(10),
};
let watcher = AsyncConversationWatcher::new(temp.path().to_path_buf(), Some(config));
let (tx, mut rx) = mpsc::channel(16);
let handle = watcher.start(tx).await.unwrap();
let entries = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert_eq!(entries.len(), 1);
handle.stop().await;
}
#[tokio::test]
async fn test_offset_persistence() {
let mut temp = NamedTempFile::new().unwrap();
writeln!(
temp,
r#"{{"type":"user","uuid":"123","timestamp":"2024-01-01T00:00:00Z","sessionId":"test","message":{{"role":"user","content":"Hello"}}}}"#
)
.unwrap();
temp.flush().unwrap();
let watcher1 = AsyncConversationWatcher::new(temp.path().to_path_buf(), None);
let _ = watcher1.poll().await.unwrap();
let offset = watcher1.offset().await;
assert!(offset > 0);
writeln!(
temp,
r#"{{"type":"assistant","uuid":"456","timestamp":"2024-01-01T00:00:01Z","sessionId":"test","message":{{"role":"assistant","content":"Hi"}}}}"#
)
.unwrap();
temp.flush().unwrap();
let watcher2 =
AsyncConversationWatcher::with_offset(temp.path().to_path_buf(), offset, None);
let entries = watcher2.poll().await.unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].uuid, "456");
}
}