pub mod embedder;
pub mod phases;
pub mod state;
use anyhow::{Context, Result};
use chrono::Timelike;
use std::io::Write;
use std::path::Path;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::azure::SearchClient;
use crate::azure::schema::{EmbeddingConfig, confluence_index_schema, jira_index_schema};
use crate::config::{Config, SourceConfig};
use crate::sources::confluence::ConfluenceConnector;
use crate::sources::jira::JiraConnector;
use crate::sources::{SourceConnector, SyncCursor};
use self::state::SyncState;
#[derive(Debug, Clone)]
pub enum UiCommand {
Pause,
Resume,
SyncNow,
ResetCursor {
source: String,
subsource: Option<String>,
},
PurgeNow {
source: String,
},
Shutdown,
}
pub fn never_command_channel() -> (mpsc::Sender<UiCommand>, mpsc::Receiver<UiCommand>) {
mpsc::channel(1)
}
#[derive(Debug)]
pub enum EngineOutcome {
Continue,
Shutdown,
ResetCursor {
source: String,
subsource: Option<String>,
},
}
fn format_error_chain(error: &anyhow::Error) -> String {
error
.chain()
.map(|cause| cause.to_string())
.collect::<Vec<_>>()
.join(": ")
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexMode {
Interactive,
AutoCreate,
RequireExisting,
}
pub fn load_embedding_config() -> Result<EmbeddingConfig> {
let config = ailloy::config::Config::load()
.context("failed to load ailloy config — run 'quelch ai config' to set up AI")?;
let (_id, node) = config
.default_node_for("embedding")
.context("no embedding model configured — run 'quelch ai config' to set one up")?;
let metadata = node.embedding_metadata();
let dimensions = metadata.dimensions.context(
"embedding model has no dimensions configured — reconfigure with 'quelch ai config'",
)?;
let vectorizer_json = metadata
.to_azure_search_vectorizer("quelch-vectorizer")
.context("failed to generate vectorizer config — ensure you're using an Azure OpenAI embedding model")?;
Ok(EmbeddingConfig {
dimensions,
vectorizer_json,
})
}
fn schema_for_source(
source_config: &SourceConfig,
embedding: &EmbeddingConfig,
) -> crate::azure::schema::IndexSchema {
match source_config {
SourceConfig::Jira(j) => jira_index_schema(&j.index, embedding),
SourceConfig::Confluence(c) => confluence_index_schema(&c.index, embedding),
}
}
fn subsources_by_source(config: &Config) -> Vec<(String, Vec<String>)> {
config
.sources
.iter()
.map(|s| match s {
SourceConfig::Jira(j) => (j.name.clone(), j.projects.clone()),
SourceConfig::Confluence(c) => (c.name.clone(), c.spaces.clone()),
})
.collect()
}
pub async fn reset_indexes(config: &Config, state_path: &Path) -> Result<Vec<String>> {
let azure = SearchClient::new(&config.azure.endpoint, &config.azure.api_key);
let mut deleted = Vec::new();
let mut seen = std::collections::HashSet::new();
for source in &config.sources {
let index = source.index().to_string();
if seen.insert(index.clone()) {
let exists = azure
.index_exists(&index)
.await
.with_context(|| format!("failed to check index '{}'", index))?;
if exists {
azure
.delete_index(&index)
.await
.with_context(|| format!("failed to delete index '{}'", index))?;
println!(" [deleted] {}", index);
deleted.push(index);
} else {
println!(" [absent] {}", index);
}
}
}
let mut state = SyncState::load(state_path, &subsources_by_source(config))?;
state.reset_all();
state.save(state_path)?;
println!(" [cleared] sync state");
Ok(deleted)
}
pub async fn setup_indexes(
config: &Config,
embedding: &EmbeddingConfig,
mode: IndexMode,
) -> Result<Vec<String>> {
let azure = SearchClient::new(&config.azure.endpoint, &config.azure.api_key);
let mut created = Vec::new();
let mut seen = std::collections::HashSet::new();
let mut schemas = Vec::new();
for source in &config.sources {
let schema = schema_for_source(source, embedding);
if seen.insert(schema.name.clone()) {
schemas.push(schema);
}
}
for schema in &schemas {
let exists = azure
.index_exists(&schema.name)
.await
.with_context(|| format!("failed to check index '{}'", schema.name))?;
if exists {
debug!(index = %schema.name, "index already exists");
continue;
}
let should_create = match mode {
IndexMode::AutoCreate => true,
IndexMode::RequireExisting => {
anyhow::bail!(
"Index '{}' does not exist. Run 'quelch setup' to create it.",
schema.name
);
}
IndexMode::Interactive => {
print!(" [missing] {} — Create it? [y/N] ", schema.name);
std::io::stdout().flush()?;
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
input.trim().eq_ignore_ascii_case("y")
}
};
if should_create {
azure
.create_index(schema)
.await
.with_context(|| format!("failed to create index '{}'", schema.name))?;
info!(index = %schema.name, "created Azure index");
created.push(schema.name.clone());
} else {
info!(index = %schema.name, "skipped index creation");
}
}
Ok(created)
}
pub async fn run_sync(
config: &Config,
state_path: &Path,
embedding: &EmbeddingConfig,
index_mode: IndexMode,
embedder: Option<&dyn embedder::Embedder>,
max_docs: Option<u64>,
) -> Result<()> {
let (_tx, mut rx) = never_command_channel();
run_sync_with(
config, state_path, embedding, index_mode, embedder, max_docs, &mut rx, 1,
)
.await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn run_sync_with(
config: &Config,
state_path: &Path,
embedding: &EmbeddingConfig,
index_mode: IndexMode,
embedder: Option<&dyn embedder::Embedder>,
max_docs: Option<u64>,
cmd_rx: &mut mpsc::Receiver<UiCommand>,
cycle: u64,
) -> Result<EngineOutcome> {
setup_indexes(config, embedding, index_mode).await?;
let azure = SearchClient::new(&config.azure.endpoint, &config.azure.api_key);
let subs = subsources_by_source(config);
let mut state = SyncState::load(state_path, &subs)?;
let mut paused = false;
let mut failures = Vec::new();
let cycle_start = Instant::now();
info!(
phase = phases::CYCLE_STARTED,
cycle = cycle,
"Cycle starting"
);
for source_config in &config.sources {
let source_name = source_config.name().to_string();
let index_name = source_config.index();
if let Ok(total) = azure.count_documents(index_name, None).await {
info!(
phase = phases::INDEX_COUNT,
source = %source_name,
count = total,
);
}
let (filter_field, subkeys): (&'static str, Vec<String>) = match source_config {
SourceConfig::Jira(j) => ("project", j.projects.clone()),
SourceConfig::Confluence(c) => ("space_key", c.spaces.clone()),
};
for subkey in subkeys {
let filter = format!("{filter_field} eq '{subkey}'");
if let Ok(count) = azure.count_documents(index_name, Some(&filter)).await {
info!(
phase = phases::SUBSOURCE_COUNT,
source = %source_name,
subsource = %subkey,
count = count,
);
}
}
}
for source_config in &config.sources {
if let EngineOutcome::Shutdown = poll_commands(cmd_rx, &mut paused).await {
tracing::info!(
phase = phases::CYCLE_FINISHED,
cycle = cycle,
duration_ms = cycle_start.elapsed().as_millis() as u64,
"Cycle shutdown"
);
return Ok(EngineOutcome::Shutdown);
}
match sync_source(
&azure,
embedder,
source_config,
config,
&mut state,
state_path,
max_docs,
cmd_rx,
&mut paused,
)
.await
{
Ok(EngineOutcome::Shutdown) => {
tracing::info!(
phase = phases::CYCLE_FINISHED,
cycle = cycle,
duration_ms = cycle_start.elapsed().as_millis() as u64,
"Cycle shutdown"
);
return Ok(EngineOutcome::Shutdown);
}
Ok(EngineOutcome::Continue) => {}
Ok(EngineOutcome::ResetCursor { .. }) => {}
Err(e) => {
let error_chain = format_error_chain(&e);
error!(
phase = phases::SOURCE_FAILED,
source = source_config.name(),
error = %error_chain,
"Sync failed for source"
);
failures.push(format!("{}: {}", source_config.name(), error_chain));
}
}
}
info!(
phase = phases::CYCLE_FINISHED,
cycle = cycle,
duration_ms = cycle_start.elapsed().as_millis() as u64,
"Cycle finished"
);
if !failures.is_empty() {
anyhow::bail!(
"sync failed for {} source(s): {}",
failures.len(),
failures.join(" | ")
);
}
Ok(EngineOutcome::Continue)
}
pub async fn run_purge(config: &Config) -> Result<()> {
let azure = SearchClient::new(&config.azure.endpoint, &config.azure.api_key);
for source_config in &config.sources {
if let Err(e) = purge_source(&azure, source_config).await {
error!(source = source_config.name(), error = %e, "Purge failed for source");
}
}
Ok(())
}
async fn purge_source(azure: &SearchClient, source_config: &SourceConfig) -> Result<()> {
match source_config {
SourceConfig::Jira(jira_config) => {
let connector = JiraConnector::new(jira_config);
purge_with_connector(azure, &connector).await
}
SourceConfig::Confluence(conf_config) => {
let connector = ConfluenceConnector::new(conf_config);
purge_with_connector(azure, &connector).await
}
}
}
async fn purge_with_connector<C: SourceConnector>(
azure: &SearchClient,
connector: &C,
) -> Result<()> {
let source_name = connector.source_name();
let index_name = connector.index_name();
info!(source = source_name, "Starting orphan detection");
let mut source_ids = std::collections::HashSet::new();
for subsource in connector.subsources() {
let ids = connector
.fetch_all_ids(subsource)
.await
.context("failed to fetch IDs from source")?;
for id in ids {
source_ids.insert(id);
}
}
let index_ids = azure
.fetch_all_ids(index_name)
.await
.context("failed to fetch IDs from Azure index")?;
let orphans: Vec<String> = index_ids
.into_iter()
.filter(|id| !source_ids.contains(id))
.collect();
if orphans.is_empty() {
info!(source = source_name, "No orphaned documents found");
return Ok(());
}
info!(
source = source_name,
orphans = orphans.len(),
"Removing orphaned documents"
);
for chunk in orphans.chunks(1000) {
azure
.delete_documents(index_name, chunk)
.await
.context("failed to delete orphaned documents")?;
}
info!(
source = source_name,
removed = orphans.len(),
"Purge complete"
);
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn sync_source(
azure: &SearchClient,
embedder: Option<&dyn embedder::Embedder>,
source_config: &SourceConfig,
config: &Config,
state: &mut SyncState,
state_path: &Path,
max_docs: Option<u64>,
cmd_rx: &mut mpsc::Receiver<UiCommand>,
paused: &mut bool,
) -> Result<EngineOutcome> {
match source_config {
SourceConfig::Jira(jira_config) => {
let connector = JiraConnector::new(jira_config);
sync_with_connector(
azure, embedder, &connector, config, state, state_path, max_docs, cmd_rx, paused,
)
.await
}
SourceConfig::Confluence(conf_config) => {
let connector = ConfluenceConnector::new(conf_config);
sync_with_connector(
azure, embedder, &connector, config, state, state_path, max_docs, cmd_rx, paused,
)
.await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn sync_with_connector<C: SourceConnector>(
azure: &SearchClient,
embedder: Option<&dyn embedder::Embedder>,
connector: &C,
config: &Config,
state: &mut SyncState,
state_path: &Path,
max_docs: Option<u64>,
cmd_rx: &mut mpsc::Receiver<UiCommand>,
paused: &mut bool,
) -> Result<EngineOutcome> {
let source_name = connector.source_name();
let source_start = Instant::now();
let mut source_docs_synced = 0u64;
info!(
phase = phases::SOURCE_STARTED,
source = source_name,
"Starting source"
);
for subsource_key in connector.subsources() {
let previous_docs = state
.get_source(source_name)
.subsources
.get(subsource_key)
.map(|sub| sub.documents_synced)
.unwrap_or(0);
match poll_commands(cmd_rx, paused).await {
EngineOutcome::Shutdown => return Ok(EngineOutcome::Shutdown),
EngineOutcome::ResetCursor {
source: s,
subsource,
} if s == source_name => {
state.reset_source(source_name, subsource.as_deref());
state
.save(state_path)
.context("failed to save sync state")?;
continue;
}
_ => {}
}
match sync_single_subsource(
azure,
embedder,
connector,
subsource_key,
config,
state,
state_path,
max_docs,
cmd_rx,
paused,
)
.await
{
Ok(EngineOutcome::Shutdown) => return Ok(EngineOutcome::Shutdown),
Ok(EngineOutcome::Continue) => {
let current_docs = state
.get_source(source_name)
.subsources
.get(subsource_key)
.map(|sub| sub.documents_synced)
.unwrap_or(previous_docs);
source_docs_synced += current_docs.saturating_sub(previous_docs);
}
Ok(EngineOutcome::ResetCursor { .. }) => {}
Err(e) => {
error!(
phase = phases::SUBSOURCE_FAILED,
source = source_name,
subsource = subsource_key,
error = %e,
"Subsource failed"
);
}
}
}
state.complete_source_cycle(source_name);
state.save(state_path).ok();
info!(
phase = phases::SOURCE_FINISHED,
source = source_name,
docs_synced = source_docs_synced,
duration_ms = source_start.elapsed().as_millis() as u64,
"Finished source"
);
Ok(EngineOutcome::Continue)
}
async fn poll_commands(cmd_rx: &mut mpsc::Receiver<UiCommand>, paused: &mut bool) -> EngineOutcome {
loop {
match cmd_rx.try_recv() {
Ok(UiCommand::Pause) => {
*paused = true;
}
Ok(UiCommand::Resume) => {
*paused = false;
}
Ok(UiCommand::Shutdown) => return EngineOutcome::Shutdown,
Ok(UiCommand::ResetCursor { source, subsource }) => {
return EngineOutcome::ResetCursor { source, subsource };
}
Ok(UiCommand::SyncNow) | Ok(UiCommand::PurgeNow { .. }) => {
}
Err(_) => break,
}
}
while *paused {
match cmd_rx.recv().await {
Some(UiCommand::Resume) => {
*paused = false;
break;
}
Some(UiCommand::Shutdown) => return EngineOutcome::Shutdown,
Some(UiCommand::Pause) => { }
Some(UiCommand::ResetCursor { source, subsource }) => {
return EngineOutcome::ResetCursor { source, subsource };
}
Some(_) => { }
None => {
*paused = false;
break;
}
}
}
EngineOutcome::Continue
}
#[allow(clippy::too_many_arguments)]
async fn sync_single_subsource<C: SourceConnector>(
azure: &SearchClient,
embedder: Option<&dyn embedder::Embedder>,
connector: &C,
subsource: &str,
config: &Config,
state: &mut SyncState,
state_path: &Path,
max_docs: Option<u64>,
cmd_rx: &mut mpsc::Receiver<UiCommand>,
paused: &mut bool,
) -> Result<EngineOutcome> {
let source_name = connector.source_name();
let index_name = connector.index_name();
let src_state = state.get_source(source_name);
let mut cursor = src_state
.subsources
.get(subsource)
.and_then(|s| s.last_cursor)
.map(|ts| SyncCursor { last_updated: ts });
info!(
phase = phases::SUBSOURCE_STARTED,
source = source_name,
subsource = subsource,
"Starting subsource"
);
let mut total_synced: u64 = 0;
let mut batch_num: u64 = 0;
let mut soft_limit_reached = false;
loop {
if soft_limit_reached {
break;
}
match poll_commands(cmd_rx, paused).await {
EngineOutcome::Shutdown => {
tracing::info!(
phase = phases::SUBSOURCE_FINISHED,
source = source_name,
subsource = subsource,
"Shutdown mid-subsource"
);
return Ok(EngineOutcome::Shutdown);
}
EngineOutcome::ResetCursor {
source: s,
subsource: Some(sub),
} if s == source_name && sub == subsource => {
state.reset_source(source_name, Some(subsource));
if let Err(e) = state.save(state_path) {
tracing::warn!(
source = source_name,
subsource = subsource,
error = %e,
"failed to persist reset"
);
}
cursor = None;
}
_ => {}
}
batch_num += 1;
info!(
phase = phases::STAGE,
source = source_name,
subsource = subsource,
stage = "fetching",
batch = batch_num,
);
let result = connector
.fetch_changes(subsource, cursor.as_ref(), config.sync.batch_size)
.await
.context("failed to fetch changes from source")?;
let result_cursor = result.cursor;
let result_has_more = result.has_more;
let new_docs: Vec<_> = if let Some(ref c) = cursor {
let cursor_minute = c
.last_updated
.with_second(0)
.and_then(|t| t.with_nanosecond(0))
.unwrap_or(c.last_updated);
result
.documents
.into_iter()
.filter(|doc| doc.updated_at > cursor_minute)
.collect()
} else {
result.documents
};
let doc_count = new_docs.len() as u64;
if doc_count == 0 {
info!(
phase = phases::SUBSOURCE_EMPTY,
source = source_name,
subsource = subsource,
batches = batch_num,
total = total_synced,
"No changes to sync"
);
break;
}
info!(
phase = phases::STAGE,
source = source_name,
subsource = subsource,
stage = "embedding",
done = 0u64,
total = doc_count,
);
let embeddings: Option<Vec<Vec<f32>>> = if let Some(emb) = embedder {
let mut vecs = Vec::with_capacity(new_docs.len());
for (i, doc) in new_docs.iter().enumerate() {
let content = doc
.fields
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("");
let id = doc.fields.get("id").and_then(|v| v.as_str()).unwrap_or("?");
let embedding = embed_with_retry(emb, id, content, source_name)
.await
.context("failed to generate embedding")?;
vecs.push(embedding);
let done = (i + 1) as u64;
if done == doc_count || done.is_multiple_of(10) {
debug!(
phase = phases::STAGE,
source = source_name,
subsource = subsource,
stage = "embedding",
done = done,
total = doc_count,
);
}
}
Some(vecs)
} else {
None
};
let azure_docs: Vec<serde_json::Value> = new_docs
.iter()
.enumerate()
.map(|(i, doc)| {
let mut obj: serde_json::Map<String, serde_json::Value> = doc
.fields
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if let Some(embedding) = embeddings.as_ref().and_then(|vecs| vecs.get(i)) {
obj.insert("content_vector".to_string(), serde_json::json!(embedding));
}
serde_json::Value::Object(obj)
})
.collect();
info!(
phase = phases::STAGE,
source = source_name,
subsource = subsource,
stage = "pushing",
total = doc_count,
);
azure
.push_documents(index_name, azure_docs)
.await
.context("failed to push documents to Azure AI Search")?;
const SAMPLE_IDS_PER_BATCH: usize = 5;
let sample_ids: Vec<&str> = new_docs
.iter()
.take(SAMPLE_IDS_PER_BATCH)
.map(|d| d.fields.get("id").and_then(|v| v.as_str()).unwrap_or("?"))
.collect();
let latest_id = new_docs
.last()
.and_then(|d| d.fields.get("id"))
.and_then(|v| v.as_str())
.unwrap_or("?");
info!(
phase = phases::BATCH_PUSHED,
source = source_name,
subsource = subsource,
count = doc_count,
sample_ids = sample_ids.join(","),
latest_id = latest_id,
);
total_synced += doc_count;
let sample_id = new_docs
.last()
.and_then(|d| d.fields.get("id"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
state.update_subsource(
source_name,
subsource,
result_cursor.last_updated,
doc_count,
sample_id.clone(),
);
state
.save(state_path)
.context("failed to save sync state")?;
info!(
phase = phases::SUBSOURCE_BATCH,
source = source_name,
subsource = subsource,
batch = batch_num,
fetched = doc_count,
cursor = %result_cursor.last_updated,
sample_id = sample_id.as_deref().unwrap_or(""),
"Batch pushed"
);
cursor = Some(result_cursor);
if let Some(limit) = max_docs
&& total_synced >= limit
{
soft_limit_reached = true;
}
if !result_has_more {
break;
}
}
info!(
phase = phases::SUBSOURCE_FINISHED,
source = source_name,
subsource = subsource,
total = total_synced,
"Subsource complete"
);
Ok(EngineOutcome::Continue)
}
async fn embed_with_retry(
embedder: &dyn embedder::Embedder,
doc_id: &str,
content: &str,
source_name: &str,
) -> Result<Vec<f32>> {
const MAX_RETRIES: usize = 5;
let mut text = content.to_string();
for attempt in 0..=MAX_RETRIES {
match embedder.embed_one(&text).await {
Ok(embedding) => return Ok(embedding),
Err(e) => {
if attempt == MAX_RETRIES {
anyhow::bail!(
"document {} still exceeds token limit after {} truncations: {}",
doc_id,
MAX_RETRIES,
e
);
}
let error_msg = format!("{}", e);
let is_token_error = error_msg.contains("maximum context length");
if !is_token_error {
return Err(e);
}
let shrink_ratio = parse_token_ratio(&error_msg).unwrap_or(0.5);
let new_len = ((text.len() as f64) * shrink_ratio * 0.9) as usize; let new_len = new_len.max(100);
let byte_end = text
.char_indices()
.take_while(|(i, _)| *i < new_len)
.last()
.map(|(i, c)| i + c.len_utf8())
.unwrap_or(new_len.min(text.len()));
text.truncate(byte_end);
warn!(
source = source_name,
id = doc_id,
attempt = attempt + 1,
new_chars = text.len(),
shrink_ratio = format!("{:.0}%", shrink_ratio * 100.0),
"Truncating content for embedding (token limit exceeded)"
);
}
}
}
unreachable!()
}
fn parse_token_ratio(error_msg: &str) -> Option<f64> {
let max_pos = error_msg.find("maximum context length is ")?;
let after_max = &error_msg[max_pos + 25..];
let max_tokens: f64 = after_max
.split_whitespace()
.next()?
.trim_end_matches(|c: char| !c.is_ascii_digit())
.parse()
.ok()?;
let req_pos = error_msg.find("you requested ")?;
let after_req = &error_msg[req_pos + 14..];
let req_tokens: f64 = after_req
.split_whitespace()
.next()?
.trim_end_matches(|c: char| !c.is_ascii_digit())
.parse()
.ok()?;
if req_tokens > 0.0 {
Some(max_tokens / req_tokens)
} else {
None
}
}