use std::path::PathBuf;
use std::sync::mpsc as stdmpsc;
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use clap::Parser;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use engram::watcher::{
app_focus::AppFocusWatcher,
browser::BrowserWatcher,
config::WatcherConfig,
fs_watcher::{FileEvent, FsWatcher},
};
#[derive(Parser, Debug)]
#[command(name = "engram-watcher")]
#[command(about = "Engram watcher daemon: file, browser, and app-focus monitoring")]
#[command(version)]
struct Args {
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
#[arg(long, default_value = "false")]
dry_run: bool,
#[arg(short, long, default_value = "false")]
verbose: bool,
}
#[derive(serde::Serialize)]
struct JsonRpcRequest {
jsonrpc: &'static str,
id: u64,
method: &'static str,
params: serde_json::Value,
}
struct MemorySender<'a> {
client: &'a reqwest::Client,
engram_url: &'a str,
api_key: Option<&'a str>,
workspace: &'a str,
dry_run: bool,
}
impl<'a> MemorySender<'a> {
async fn send(&self, content: String, memory_type: &str, tags: Vec<String>) {
if self.dry_run {
info!(
workspace = self.workspace,
r#type = memory_type,
"[dry-run] Would create memory: {}",
content
);
return;
}
let request_id: u64 = Utc::now().timestamp_micros() as u64;
let payload = JsonRpcRequest {
jsonrpc: "2.0",
id: request_id,
method: "memory_create",
params: serde_json::json!({
"content": content,
"memory_type": memory_type,
"workspace": self.workspace,
"tags": tags,
}),
};
let url = format!("{}/v1/mcp", self.engram_url.trim_end_matches('/'));
let mut req = self.client.post(&url).json(&payload);
if let Some(key) = self.api_key {
req = req.bearer_auth(key);
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
debug!(workspace = self.workspace, "Memory created successfully");
}
Ok(resp) => {
warn!(
status = %resp.status(),
"Engram API returned non-2xx status"
);
}
Err(e) => {
warn!(error = %e, "Failed to send memory to Engram");
}
}
}
}
fn start_fs_watcher(
config: &WatcherConfig,
tx: tokio::sync::mpsc::UnboundedSender<(String, &'static str, Vec<String>)>,
) -> Option<stdmpsc::SyncSender<()>> {
if !config.file_watcher.enabled || config.file_watcher.paths.is_empty() {
info!("File watcher disabled or no paths configured — skipping");
return None;
}
let fs_config = config.file_watcher.clone();
let callback = move |event: FileEvent| {
let content = event.to_memory_content();
let tags = vec![
"watcher".to_string(),
"file-system".to_string(),
event.kind.to_string(),
];
let _ = tx.send((content, "note", tags));
};
match FsWatcher::new(fs_config, callback) {
Ok((watcher, stop_tx)) => {
std::thread::Builder::new()
.name("engram-fs-watcher".to_string())
.spawn(move || {
info!("File system watcher thread started");
watcher.run();
info!("File system watcher thread exiting");
})
.expect("Failed to spawn fs-watcher thread");
Some(stop_tx)
}
Err(e) => {
error!(error = %e, "Failed to create file system watcher");
None
}
}
}
fn start_browser_watcher(
config: &WatcherConfig,
tx: tokio::sync::mpsc::UnboundedSender<(String, &'static str, Vec<String>)>,
shutdown_rx: Arc<Mutex<tokio::sync::watch::Receiver<bool>>>,
) {
if !config.browser.enabled {
info!("Browser watcher disabled — skipping");
return;
}
let browser_config = config.browser.clone();
let poll_interval = Duration::from_secs(browser_config.poll_interval_secs);
tokio::spawn(async move {
let mut watcher = BrowserWatcher::new(browser_config);
let initial_since = Utc::now();
info!(?poll_interval, "Browser watcher started");
loop {
let shutdown = {
let mut rx = shutdown_rx.lock().await;
tokio::time::timeout(poll_interval, rx.changed()).await
};
if shutdown.is_ok() {
info!("Browser watcher received shutdown signal");
break;
}
let visits = watcher.poll(initial_since);
for visit in visits {
let content = BrowserWatcher::visit_to_memory_content(&visit);
let tags = vec![
"watcher".to_string(),
"browser-history".to_string(),
visit.browser.clone(),
];
if tx.send((content, "note", tags)).is_err() {
break;
}
}
}
info!("Browser watcher task exiting");
});
}
fn start_app_focus_watcher(
config: &WatcherConfig,
tx: tokio::sync::mpsc::UnboundedSender<(String, &'static str, Vec<String>)>,
shutdown_rx: Arc<Mutex<tokio::sync::watch::Receiver<bool>>>,
) {
if !config.app_focus.enabled {
info!("App focus watcher disabled — skipping");
return;
}
let app_focus_config = config.app_focus.clone();
let poll_interval = Duration::from_secs(app_focus_config.poll_interval_secs);
tokio::spawn(async move {
let mut watcher = AppFocusWatcher::new(app_focus_config);
info!(?poll_interval, "App focus watcher started");
loop {
let shutdown = {
let mut rx = shutdown_rx.lock().await;
tokio::time::timeout(poll_interval, rx.changed()).await
};
if shutdown.is_ok() {
info!("App focus watcher received shutdown signal");
break;
}
watcher.tick();
for event in watcher.drain_completed_events() {
let content = event.to_memory_content();
let tags = vec!["watcher".to_string(), "app-focus".to_string()];
if tx.send((content, "episodic", tags)).is_err() {
break;
}
}
}
info!("App focus watcher task exiting");
});
}
#[tokio::main]
async fn main() {
let args = Args::parse();
let filter = if args.verbose {
"engram_watcher=debug,engram=debug"
} else {
"engram_watcher=info,engram=info"
};
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(filter)),
)
.init();
let config = match args.config {
Some(ref path) => match WatcherConfig::load(path) {
Ok(cfg) => {
info!(path = ?path, "Loaded watcher config");
cfg
}
Err(e) => {
error!(error = %e, "Failed to load config; using defaults");
WatcherConfig::default()
}
},
None => {
let cfg = WatcherConfig::load_or_default();
info!("Loaded watcher config (default path)");
cfg
}
};
info!(
engram_url = %config.engram_url,
workspace = %config.workspace,
dry_run = args.dry_run,
"Engram watcher daemon starting"
);
let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to build HTTP client");
let (event_tx, mut event_rx) =
tokio::sync::mpsc::unbounded_channel::<(String, &'static str, Vec<String>)>();
let (shutdown_tx, shutdown_rx_base) = tokio::sync::watch::channel(false);
let shutdown_rx = Arc::new(Mutex::new(shutdown_rx_base));
let fs_stop_tx = start_fs_watcher(&config, event_tx.clone());
start_browser_watcher(&config, event_tx.clone(), Arc::clone(&shutdown_rx));
start_app_focus_watcher(&config, event_tx.clone(), Arc::clone(&shutdown_rx));
drop(event_tx);
let engram_url = config.engram_url.clone();
let api_key = config.api_key.clone();
let workspace = config.workspace.clone();
let dry_run = args.dry_run;
let consumer = tokio::spawn(async move {
let sender = MemorySender {
client: &http_client,
engram_url: &engram_url,
api_key: api_key.as_deref(),
workspace: &workspace,
dry_run,
};
while let Some((content, memory_type, tags)) = event_rx.recv().await {
sender.send(content, memory_type, tags).await;
}
info!("Event consumer task exiting");
});
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received Ctrl-C; initiating graceful shutdown");
}
}
let _ = shutdown_tx.send(true);
if let Some(stop_tx) = fs_stop_tx {
let _ = stop_tx.send(());
}
tokio::time::sleep(Duration::from_millis(500)).await;
consumer.abort();
info!("Engram watcher daemon stopped");
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
use tempfile::NamedTempFile;
#[test]
fn test_load_valid_toml_config() {
let toml = r#"
engram_url = "http://localhost:9999"
api_key = "sk_test_abc"
workspace = "test-workspace"
poll_interval_secs = 120
[file_watcher]
enabled = true
paths = ["/tmp"]
extensions = ["rs", "md"]
debounce_ms = 200
[browser]
enabled = false
[app_focus]
enabled = false
"#;
let tmp = NamedTempFile::new().unwrap();
std::fs::write(tmp.path(), toml).unwrap();
let cfg = WatcherConfig::load(tmp.path()).expect("should parse");
assert_eq!(cfg.engram_url, "http://localhost:9999");
assert_eq!(cfg.api_key.as_deref(), Some("sk_test_abc"));
assert_eq!(cfg.workspace, "test-workspace");
assert_eq!(cfg.poll_interval_secs, 120);
assert!(cfg.file_watcher.enabled);
assert_eq!(cfg.file_watcher.extensions, vec!["rs", "md"]);
assert!(!cfg.browser.enabled);
assert!(!cfg.app_focus.enabled);
}
#[test]
fn test_load_missing_config_returns_error() {
let result = WatcherConfig::load(Path::new("/nonexistent/watcher.toml"));
assert!(result.is_err(), "missing file should return Err");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("watcher.toml") || err_msg.contains("Cannot read"),
"error should mention the path or problem: {err_msg}"
);
}
#[test]
fn test_default_config_values() {
let cfg = WatcherConfig::default();
assert_eq!(cfg.engram_url, "http://localhost:3000");
assert_eq!(cfg.workspace, "watcher");
assert!(cfg.api_key.is_none());
assert_eq!(cfg.poll_interval_secs, 300);
assert!(cfg.file_watcher.enabled);
assert!(!cfg.browser.enabled);
assert!(!cfg.app_focus.enabled);
}
#[test]
fn test_cli_dry_run_flag() {
let args = Args::try_parse_from(["engram-watcher", "--dry-run"]).unwrap();
assert!(args.dry_run);
assert!(!args.verbose);
assert!(args.config.is_none());
}
#[test]
fn test_cli_verbose_flag() {
let args = Args::try_parse_from(["engram-watcher", "--verbose"]).unwrap();
assert!(args.verbose);
assert!(!args.dry_run);
}
#[test]
fn test_cli_config_path() {
let args =
Args::try_parse_from(["engram-watcher", "--config", "/tmp/my-watcher.toml"]).unwrap();
assert_eq!(args.config, Some(PathBuf::from("/tmp/my-watcher.toml")));
}
#[test]
fn test_cli_combined_flags() {
let args = Args::try_parse_from([
"engram-watcher",
"--config",
"/tmp/cfg.toml",
"--dry-run",
"--verbose",
])
.unwrap();
assert!(args.dry_run);
assert!(args.verbose);
assert_eq!(args.config, Some(PathBuf::from("/tmp/cfg.toml")));
}
#[test]
fn test_json_rpc_request_serialisation() {
let req = JsonRpcRequest {
jsonrpc: "2.0",
id: 42,
method: "memory_create",
params: serde_json::json!({
"content": "hello",
"memory_type": "note",
"workspace": "watcher",
"tags": ["watcher", "file-system"],
}),
};
let json = serde_json::to_string(&req).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"method\":\"memory_create\""));
assert!(json.contains("\"id\":42"));
assert!(json.contains("\"content\":\"hello\""));
}
#[test]
fn test_partial_toml_uses_defaults() {
let toml = r#"
engram_url = "http://engram.local:4000"
"#;
let tmp = NamedTempFile::new().unwrap();
std::fs::write(tmp.path(), toml).unwrap();
let cfg = WatcherConfig::load(tmp.path()).unwrap();
assert_eq!(cfg.engram_url, "http://engram.local:4000");
assert_eq!(cfg.workspace, "watcher");
assert!(cfg.api_key.is_none());
assert_eq!(cfg.poll_interval_secs, 300);
}
}