pub mod chunker;
pub mod embedding_worker;
pub mod graph_ingest;
pub mod link_extractor;
pub mod loopback;
#[cfg(test)]
mod tests;
#[cfg(test)]
mod inline_tests;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use notify_debouncer_full::{
new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
};
use sha2::{Digest, Sha256};
use tokio_util::sync::CancellationToken;
use crate::config::{ConnectorConfig, ContentSourcesConfig};
use crate::source::ContentSourceProvider;
use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
use crate::storage::watchtower as store;
use crate::storage::DbPool;
#[derive(Debug, thiserror::Error)]
pub enum WatchtowerError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("storage error: {0}")]
Storage(#[from] crate::error::StorageError),
#[error("notify error: {0}")]
Notify(#[from] notify::Error),
#[error("config error: {0}")]
Config(String),
#[error("chunker error: {0}")]
Chunker(#[from] chunker::ChunkerError),
}
#[derive(Debug, Default)]
pub struct IngestSummary {
pub ingested: u32,
pub skipped: u32,
pub errors: Vec<String>,
}
#[derive(Debug, Default)]
pub struct ParsedFrontMatter {
pub title: Option<String>,
pub tags: Option<String>,
pub raw_yaml: Option<String>,
}
pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
let (yaml_str, body) = loopback::split_front_matter(content);
let yaml_str = match yaml_str {
Some(y) => y,
None => return (ParsedFrontMatter::default(), content),
};
let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
match parsed {
Ok(serde_yaml::Value::Mapping(map)) => {
let title = map
.get(serde_yaml::Value::String("title".to_string()))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let tags = map
.get(serde_yaml::Value::String("tags".to_string()))
.map(|v| match v {
serde_yaml::Value::Sequence(seq) => seq
.iter()
.filter_map(|item| item.as_str())
.collect::<Vec<_>>()
.join(","),
serde_yaml::Value::String(s) => s.clone(),
_ => String::new(),
})
.filter(|s| !s.is_empty());
let fm = ParsedFrontMatter {
title,
tags,
raw_yaml: Some(yaml_str.to_string()),
};
(fm, body)
}
_ => (
ParsedFrontMatter {
raw_yaml: Some(yaml_str.to_string()),
..Default::default()
},
body,
),
}
}
pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
let file_name = match path.file_name().and_then(|n| n.to_str()) {
Some(n) => n,
None => return false,
};
for pattern in patterns {
if let Ok(p) = glob::Pattern::new(pattern) {
if p.matches(file_name) {
return true;
}
}
}
false
}
fn relative_path_string(path: &Path) -> String {
path.iter()
.map(|part| part.to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join("/")
}
pub async fn ingest_content(
pool: &DbPool,
source_id: i64,
provider_id: &str,
content: &str,
force: bool,
) -> Result<store::UpsertResult, WatchtowerError> {
let (fm, body) = parse_front_matter(content);
let hash = if force {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
hasher.update(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.to_le_bytes(),
);
format!("{:x}", hasher.finalize())
} else {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
};
let result = store::upsert_content_node(
pool,
source_id,
provider_id,
&hash,
fm.title.as_deref(),
body,
fm.raw_yaml.as_deref(),
fm.tags.as_deref(),
)
.await?;
Ok(result)
}
pub async fn ingest_file(
pool: &DbPool,
source_id: i64,
base_path: &Path,
relative_path: &str,
force: bool,
) -> Result<store::UpsertResult, WatchtowerError> {
let full_path = base_path.join(relative_path);
let content = tokio::fs::read_to_string(&full_path).await?;
ingest_content(pool, source_id, relative_path, &content, force).await
}
pub async fn ingest_files(
pool: &DbPool,
source_id: i64,
base_path: &Path,
paths: &[String],
force: bool,
) -> IngestSummary {
let mut summary = IngestSummary::default();
for rel_path in paths {
match ingest_file(pool, source_id, base_path, rel_path, force).await {
Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
summary.ingested += 1;
}
Ok(store::UpsertResult::Skipped) => {
summary.skipped += 1;
}
Err(e) => {
summary.errors.push(format!("{rel_path}: {e}"));
}
}
}
summary
}
struct CooldownSet {
entries: HashMap<PathBuf, Instant>,
ttl: Duration,
}
impl CooldownSet {
fn new(ttl: Duration) -> Self {
Self {
entries: HashMap::new(),
ttl,
}
}
#[allow(dead_code)]
fn mark(&mut self, path: PathBuf) {
self.entries.insert(path, Instant::now());
}
fn is_cooling(&self, path: &Path) -> bool {
if let Some(ts) = self.entries.get(path) {
ts.elapsed() < self.ttl
} else {
false
}
}
fn cleanup(&mut self) {
self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
}
}
type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
pub struct WatchtowerLoop {
pool: DbPool,
config: ContentSourcesConfig,
connector_config: ConnectorConfig,
data_dir: PathBuf,
debounce_duration: Duration,
fallback_scan_interval: Duration,
cooldown_ttl: Duration,
}
impl WatchtowerLoop {
pub fn new(
pool: DbPool,
config: ContentSourcesConfig,
connector_config: ConnectorConfig,
data_dir: PathBuf,
) -> Self {
Self {
pool,
config,
connector_config,
data_dir,
debounce_duration: Duration::from_secs(2),
fallback_scan_interval: Duration::from_secs(300), cooldown_ttl: Duration::from_secs(5),
}
}
pub async fn run(&self, cancel: CancellationToken) {
let local_sources: Vec<_> = self
.config
.sources
.iter()
.filter(|s| s.source_type == "local_fs" && s.is_enabled() && s.path.is_some())
.collect();
let remote_sources: Vec<_> = self
.config
.sources
.iter()
.filter(|s| s.source_type == "google_drive" && s.is_enabled() && s.folder_id.is_some())
.collect();
if local_sources.is_empty() && remote_sources.is_empty() {
tracing::info!("Watchtower: no watch sources configured, exiting");
return;
}
let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
for src in &local_sources {
let path_str = src.path.as_deref().unwrap();
let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
let config_json = serde_json::json!({
"path": path_str,
"file_patterns": src.file_patterns,
"loop_back_enabled": src.loop_back_enabled,
"analytics_sync_enabled": src.analytics_sync_enabled,
})
.to_string();
match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
Ok(source_id) => {
source_map.push((source_id, expanded, src.file_patterns.clone()));
}
Err(e) => {
tracing::error!(path = path_str, error = %e, "Failed to register source context");
}
}
}
let mut remote_map: Vec<RemoteSource> = Vec::new();
for src in &remote_sources {
let folder_id = src.folder_id.as_deref().unwrap();
let config_json = serde_json::json!({
"folder_id": folder_id,
"file_patterns": src.file_patterns,
"service_account_key": src.service_account_key,
"connection_id": src.connection_id,
})
.to_string();
match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
Ok(source_id) => {
let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
let provider: Box<dyn ContentSourceProvider> =
if let Some(connection_id) = src.connection_id {
match self.build_connection_provider(folder_id, connection_id) {
Ok(p) => Box::new(p),
Err(reason) => {
tracing::warn!(
folder_id,
connection_id,
reason = %reason,
"Skipping connection-based source"
);
continue;
}
}
} else if src.service_account_key.is_some() {
let key_path = src.service_account_key.clone().unwrap_or_default();
Box::new(crate::source::google_drive::GoogleDriveProvider::new(
folder_id.to_string(),
key_path,
))
} else {
tracing::warn!(
folder_id,
"Skipping Google Drive source: no connection_id or service_account_key"
);
continue;
};
remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
}
Err(e) => {
tracing::error!(
folder_id = folder_id,
error = %e,
"Failed to register Google Drive source"
);
}
}
}
if source_map.is_empty() && remote_map.is_empty() {
tracing::warn!("Watchtower: no sources registered, exiting");
return;
}
for (source_id, base_path, patterns) in &source_map {
let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
match self.scan_directory(*source_id, base_path, patterns).await {
Ok(_) => {
let _ =
store::update_source_status(&self.pool, *source_id, "active", None).await;
}
Err(e) => {
tracing::error!(
path = %base_path.display(),
error = %e,
"Initial scan failed"
);
let _ = store::update_source_status(
&self.pool,
*source_id,
"error",
Some(&e.to_string()),
)
.await;
}
}
}
self.chunk_pending().await;
if !remote_map.is_empty() {
self.poll_remote_sources(&remote_map).await;
self.chunk_pending().await;
}
let watch_source_map: Vec<_> = source_map
.iter()
.zip(local_sources.iter())
.filter(|(_, src)| !src.is_scan_only())
.map(|(entry, _)| entry.clone())
.collect();
let notify_source_map: Vec<_> = source_map
.iter()
.zip(local_sources.iter())
.filter(|(_, src)| src.effective_change_detection() == "auto")
.map(|(entry, _)| entry.clone())
.collect();
if watch_source_map.is_empty() {
if remote_map.is_empty() {
tracing::info!(
"Watchtower: all local sources are scan-only and no remote sources, exiting"
);
return;
}
self.remote_only_loop(&remote_map, cancel).await;
return;
}
let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
let handler = move |result: DebounceEventResult| {
let _ = async_tx.blocking_send(result);
};
let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
match debouncer_result {
Ok(d) => d,
Err(e) => {
tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
self.polling_loop(&watch_source_map, cancel).await;
return;
}
};
for (_, base_path, _) in ¬ify_source_map {
if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
tracing::error!(
path = %base_path.display(),
error = %e,
"Failed to watch directory"
);
}
}
tracing::info!(
local_sources = source_map.len(),
watching = notify_source_map.len(),
polling = watch_source_map.len() - notify_source_map.len(),
remote_sources = remote_map.len(),
"Watchtower watching for changes"
);
let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
fallback_timer.tick().await;
let remote_interval = remote_map
.iter()
.map(|(_, _, _, d)| *d)
.min()
.unwrap_or(self.fallback_scan_interval);
let mut remote_timer = tokio::time::interval(remote_interval);
remote_timer.tick().await;
loop {
tokio::select! {
() = cancel.cancelled() => {
tracing::info!("Watchtower: cancellation received, shutting down");
break;
}
_ = fallback_timer.tick() => {
for (source_id, base_path, patterns) in &watch_source_map {
if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
tracing::warn!(
path = %base_path.display(),
error = %e,
"Fallback scan failed"
);
}
}
if let Ok(mut cd) = cooldown.lock() {
cd.cleanup();
}
self.chunk_pending().await;
}
_ = remote_timer.tick(), if !remote_map.is_empty() => {
self.poll_remote_sources(&remote_map).await;
self.chunk_pending().await;
}
result = async_rx.recv() => {
match result {
Some(Ok(events)) => {
for event in events {
for path in &event.paths {
self.handle_event(path, &source_map, &cooldown).await;
}
}
self.chunk_pending().await;
}
Some(Err(errs)) => {
for e in errs {
tracing::warn!(error = %e, "Watcher error");
}
}
None => {
tracing::warn!("Watcher event channel closed");
break;
}
}
}
}
}
drop(debouncer);
tracing::info!("Watchtower shut down");
}
async fn handle_event(
&self,
path: &Path,
source_map: &[(i64, PathBuf, Vec<String>)],
cooldown: &Mutex<CooldownSet>,
) {
if let Ok(cd) = cooldown.lock() {
if cd.is_cooling(path) {
tracing::debug!(path = %path.display(), "Skipping cooldown path");
return;
}
}
for (source_id, base_path, patterns) in source_map {
if path.starts_with(base_path) {
if !matches_patterns(path, patterns) {
return;
}
let rel = match path.strip_prefix(base_path) {
Ok(r) => relative_path_string(r),
Err(_) => return,
};
match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
Ok(result) => {
tracing::debug!(
path = %rel,
result = ?result,
"Watchtower ingested file"
);
}
Err(e) => {
tracing::warn!(
path = %rel,
error = %e,
"Watchtower ingest failed"
);
}
}
return;
}
}
}
async fn scan_directory(
&self,
source_id: i64,
base_path: &Path,
patterns: &[String],
) -> Result<IngestSummary, WatchtowerError> {
let mut rel_paths = Vec::new();
Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
tracing::debug!(
path = %base_path.display(),
ingested = summary.ingested,
skipped = summary.skipped,
errors = summary.errors.len(),
"Directory scan complete"
);
let cursor = chrono::Utc::now().to_rfc3339();
if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
tracing::warn!(error = %e, "Failed to update sync cursor");
}
Ok(summary)
}
fn walk_directory(
base: &Path,
current: &Path,
patterns: &[String],
out: &mut Vec<String>,
) -> Result<(), WatchtowerError> {
let entries = std::fs::read_dir(current)?;
for entry in entries {
let entry = entry?;
let file_type = entry.file_type()?;
let path = entry.path();
if file_type.is_dir() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.starts_with('.') {
continue;
}
}
Self::walk_directory(base, &path, patterns, out)?;
} else if file_type.is_file() && matches_patterns(&path, patterns) {
if let Ok(rel) = path.strip_prefix(base) {
out.push(relative_path_string(rel));
}
}
}
Ok(())
}
async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
for (source_id, provider, patterns, _interval) in remote_sources {
let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
let cursor = match store::get_source_context(&self.pool, *source_id).await {
Ok(Some(ctx)) => ctx.sync_cursor,
Ok(None) => None,
Err(e) => {
tracing::warn!(source_id, error = %e, "Failed to get source context");
continue;
}
};
match provider.scan_for_changes(cursor.as_deref(), patterns).await {
Ok(files) => {
let mut ingested = 0u32;
let mut skipped = 0u32;
for file in &files {
match provider.read_content(&file.provider_id).await {
Ok(content) => {
match ingest_content(
&self.pool,
*source_id,
&file.provider_id,
&content,
false,
)
.await
{
Ok(
store::UpsertResult::Inserted
| store::UpsertResult::Updated,
) => {
ingested += 1;
}
Ok(store::UpsertResult::Skipped) => {
skipped += 1;
}
Err(e) => {
tracing::warn!(
provider_id = %file.provider_id,
error = %e,
"Remote ingest failed"
);
}
}
}
Err(e) => {
tracing::warn!(
provider_id = %file.provider_id,
error = %e,
"Failed to read remote content"
);
}
}
}
tracing::debug!(
source_type = provider.source_type(),
ingested,
skipped,
total = files.len(),
"Remote poll complete"
);
let new_cursor = chrono::Utc::now().to_rfc3339();
if let Err(e) =
store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
{
tracing::warn!(error = %e, "Failed to update remote sync cursor");
}
let _ =
store::update_source_status(&self.pool, *source_id, "active", None).await;
}
Err(crate::source::SourceError::ConnectionBroken {
connection_id,
ref reason,
}) => {
tracing::warn!(
source_id,
connection_id,
reason = %reason,
"Connection broken -- marking source as error"
);
let _ =
store::update_source_status(&self.pool, *source_id, "error", Some(reason))
.await;
let _ =
store::update_connection_status(&self.pool, connection_id, "expired").await;
}
Err(e) => {
tracing::warn!(
source_type = provider.source_type(),
error = %e,
"Remote scan failed"
);
let _ = store::update_source_status(
&self.pool,
*source_id,
"error",
Some(&e.to_string()),
)
.await;
}
}
}
}
async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
let interval_dur = remote_map
.iter()
.map(|(_, _, _, d)| *d)
.min()
.unwrap_or(self.fallback_scan_interval);
let mut interval = tokio::time::interval(interval_dur);
interval.tick().await;
loop {
tokio::select! {
() = cancel.cancelled() => {
tracing::info!("Watchtower remote-only loop cancelled");
break;
}
_ = interval.tick() => {
self.poll_remote_sources(remote_map).await;
self.chunk_pending().await;
}
}
}
}
fn build_connection_provider(
&self,
folder_id: &str,
connection_id: i64,
) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
.map_err(|e| format!("connector key error: {e}"))?;
let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
&self.connector_config.google_drive,
)
.map_err(|e| format!("connector config error: {e}"))?;
Ok(
crate::source::google_drive::GoogleDriveProvider::from_connection(
folder_id.to_string(),
connection_id,
self.pool.clone(),
key,
connector,
),
)
}
pub async fn reindex_local_source(
pool: &DbPool,
source_id: i64,
base_path: &Path,
patterns: &[String],
) -> Result<IngestSummary, WatchtowerError> {
store::update_source_status(pool, source_id, "syncing", None).await?;
let mut rel_paths = Vec::new();
Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
let summary = ingest_files(pool, source_id, base_path, &rel_paths, true).await;
let cursor = chrono::Utc::now().to_rfc3339();
let _ = store::update_sync_cursor(pool, source_id, &cursor).await;
if summary.errors.is_empty() {
let _ = store::update_source_status(pool, source_id, "active", None).await;
} else {
let msg = format!("{} errors during reindex", summary.errors.len());
let _ = store::update_source_status(pool, source_id, "error", Some(&msg)).await;
}
Ok(summary)
}
async fn chunk_pending(&self) {
let chunked = chunker::chunk_pending_nodes(&self.pool, DEFAULT_ACCOUNT_ID, 100).await;
if chunked > 0 {
tracing::debug!(chunked, "Watchtower chunked pending nodes");
}
}
async fn polling_loop(
&self,
source_map: &[(i64, PathBuf, Vec<String>)],
cancel: CancellationToken,
) {
let mut interval = tokio::time::interval(self.fallback_scan_interval);
interval.tick().await;
loop {
tokio::select! {
() = cancel.cancelled() => {
tracing::info!("Watchtower polling loop cancelled");
break;
}
_ = interval.tick() => {
for (source_id, base_path, patterns) in source_map {
if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
tracing::warn!(
path = %base_path.display(),
error = %e,
"Polling scan failed"
);
}
}
}
}
}
}
}