use super::types::{FollowupAnalysis, Message, QueryPlan};
use crate::ingestion::{
config::{AIProvider, IngestionConfig},
ollama_service::OllamaService,
openrouter_service::OpenRouterService,
};
use crate::schema::types::{DeclarativeSchemaDefinition, Query};
use crate::schema::SchemaWithState;
use serde_json::Value;
use std::collections::HashSet;
pub struct LlmQueryService {
provider: AIProvider,
openrouter_service: Option<OpenRouterService>,
ollama_service: Option<OllamaService>,
}
impl LlmQueryService {
pub fn new(config: IngestionConfig) -> Result<Self, String> {
let openrouter_service = if config.provider == AIProvider::OpenRouter {
Some(
OpenRouterService::new(
config.openrouter.clone(),
config.timeout_seconds,
config.max_retries,
)
.map_err(|e| format!("Failed to create OpenRouter service: {}", e))?,
)
} else {
None
};
let ollama_service = if config.provider == AIProvider::Ollama {
Some(
OllamaService::new(
config.ollama.clone(),
config.timeout_seconds,
config.max_retries,
)
.map_err(|e| format!("Failed to create Ollama service: {}", e))?,
)
} else {
None
};
Ok(Self {
provider: config.provider,
openrouter_service,
ollama_service,
})
}
pub async fn analyze_query(
&self,
user_query: &str,
schemas: &[SchemaWithState],
) -> Result<QueryPlan, String> {
let prompt = self.build_analysis_prompt(user_query, schemas);
let prompt_preview = if prompt.len() > 500 {
format!(
"{}... [truncated, total {} chars]",
&prompt[..500],
prompt.len()
)
} else {
prompt.clone()
};
log::debug!("AI Query Prompt Preview: {}", prompt_preview);
let response = self.call_llm(&prompt).await?;
let mut query_plan = self.parse_query_plan(&response)?;
let target_schema_lower = query_plan.query.schema_name.to_lowercase();
for schema_state in schemas {
if schema_state.schema.name.to_lowercase() == target_schema_lower {
if query_plan.query.schema_name != schema_state.schema.name {
log::info!(
"🤖 AI Autocorrect: Normalizing schema name '{}' -> '{}'",
query_plan.query.schema_name,
schema_state.schema.name
);
query_plan.query.schema_name = schema_state.schema.name.clone();
}
break;
}
}
Ok(query_plan)
}
pub async fn summarize_results(
&self,
original_query: &str,
results: &[Value],
) -> Result<String, String> {
let prompt = self.build_summarization_prompt(original_query, results);
self.call_llm(&prompt).await
}
pub async fn answer_question(
&self,
original_query: &str,
results: &[Value],
conversation_history: &[Message],
question: &str,
) -> Result<String, String> {
let prompt =
self.build_chat_prompt(original_query, results, conversation_history, question);
self.call_llm(&prompt).await
}
pub async fn analyze_followup_question(
&self,
original_query: &str,
results: &[Value],
question: &str,
schemas: &[crate::schema::SchemaWithState],
) -> Result<FollowupAnalysis, String> {
let prompt =
self.build_followup_analysis_prompt(original_query, results, question, schemas);
let response = self.call_llm(&prompt).await?;
self.parse_followup_analysis(&response)
}
pub async fn generate_native_index_query_terms(
&self,
user_query: &str,
schemas: &[crate::schema::SchemaWithState],
) -> Result<Vec<String>, String> {
let prompt = self.build_native_index_query_terms_prompt(user_query, schemas);
let response = self.call_llm(&prompt).await?;
self.parse_query_terms_response(&response)
}
pub async fn execute_ai_native_index_query(
&self,
user_query: &str,
schemas: &[crate::schema::SchemaWithState],
db_ops: &crate::db_operations::DbOperations,
) -> Result<String, String> {
let search_terms = self
.generate_native_index_search_terms(user_query, schemas)
.await?;
let mut all_results = Vec::new();
if let Some(native_index_mgr) = db_ops.native_index_manager() {
for term in &search_terms {
if native_index_mgr.is_async() {
match native_index_mgr
.search_all_classifications_async(term)
.await
{
Ok(mut results) => {
log::debug!(
"LLM Query: Term '{}' returned {} results (async)",
term,
results.len()
);
all_results.append(&mut results);
}
Err(e) => {
log::warn!("Native index search failed for term '{}': {}", term, e);
}
}
} else {
match native_index_mgr.search_all_classifications(term) {
Ok(mut results) => {
log::debug!(
"LLM Query: Term '{}' returned {} results (sync)",
term,
results.len()
);
all_results.append(&mut results);
}
Err(e) => {
log::warn!("Native index search failed for term '{}': {}", term, e);
}
}
}
}
}
log::info!(
"LLM Query: Collected {} total results for AI interpretation",
all_results.len()
);
self.interpret_native_index_results(user_query, &all_results)
.await
}
pub async fn execute_ai_native_index_query_with_results(
&self,
user_query: &str,
schemas: &[crate::schema::SchemaWithState],
db_ops: &crate::db_operations::DbOperations,
) -> Result<(String, Vec<crate::db_operations::IndexResult>), String> {
let search_terms = self
.generate_native_index_search_terms(user_query, schemas)
.await?;
let mut all_results = Vec::new();
if let Some(native_index_mgr) = db_ops.native_index_manager() {
for term in &search_terms {
if native_index_mgr.is_async() {
match native_index_mgr
.search_all_classifications_async(term)
.await
{
Ok(mut results) => {
log::debug!(
"LLM Query: Term '{}' returned {} results (async)",
term,
results.len()
);
all_results.append(&mut results);
}
Err(e) => {
log::warn!("Native index search failed for term '{}': {}", term, e);
}
}
} else {
match native_index_mgr.search_all_classifications(term) {
Ok(mut results) => {
log::debug!(
"LLM Query: Term '{}' returned {} results (sync)",
term,
results.len()
);
all_results.append(&mut results);
}
Err(e) => {
log::warn!("Native index search failed for term '{}': {}", term, e);
}
}
}
}
}
log::debug!(
"LLM Query: Total results before deduplication: {}",
all_results.len()
);
let deduplicated_results = self.deduplicate_results(all_results);
log::info!(
"LLM Query: Sending {} deduplicated results to AI for interpretation",
deduplicated_results.len()
);
let ai_interpretation = self
.interpret_native_index_results(user_query, &deduplicated_results)
.await?;
Ok((ai_interpretation, deduplicated_results))
}
fn build_followup_analysis_prompt(
&self,
original_query: &str,
results: &[Value],
question: &str,
schemas: &[crate::schema::SchemaWithState],
) -> String {
let results_preview = if results.len() > 100 {
&results[..100]
} else {
results
};
let results_str = serde_json::to_string_pretty(results_preview)
.unwrap_or_else(|_| "Failed to serialize results".to_string());
let mut prompt = String::from(
"You are analyzing whether a follow-up question can be answered from existing query results or needs a new query.\n\n"
);
prompt.push_str(&format!("Original Query: {}\n", original_query));
prompt.push_str(&format!(
"Existing Results ({} total): {}\n\n",
results.len(),
results_str
));
prompt.push_str(&format!("Follow-up Question: {}\n\n", question));
prompt.push_str("Available Schemas:\n");
for schema in schemas {
prompt.push_str(&format!(
"- {} (Type: {:?})\n",
schema.schema.name, schema.schema.schema_type
));
if let Some(ref key) = schema.schema.key {
if let Some(ref hash_field) = key.hash_field {
prompt.push_str(&format!(" Hash Key: {} (filters: HashKey, HashPattern, HashRangeKey, HashRangePrefix operate on this field)\n", hash_field));
}
if let Some(ref range_field) = key.range_field {
prompt.push_str(&format!(" Range Key: {} (filters: RangePrefix, RangePattern, RangeRange, HashRangeKey, HashRangePrefix operate on this field)\n", range_field));
}
}
prompt.push_str(" Fields: ");
let field_names: Vec<String> = schema.schema.runtime_fields.keys().cloned().collect();
prompt.push_str(&field_names.join(", "));
prompt.push('\n');
}
prompt.push_str("\nDetermine if:\n");
prompt.push_str("1. The question can be FULLY answered from the existing results (needs_query: false)\n");
prompt.push_str(
"2. The question needs NEW data that requires a query (needs_query: true)\n\n",
);
prompt.push_str("If a new query is needed, provide:\n");
prompt.push_str("- query: The Query object to execute (same format as before)\n");
prompt.push_str("- reasoning: Why a new query is needed\n\n");
prompt.push_str(
"FILTER TYPES AVAILABLE:\n\n\
Filters for HashRange schemas (have both Hash Key and Range Key):\n\
- HashRangeKey: {\"HashRangeKey\": {\"hash\": \"value\", \"range\": \"value\"}} - exact match on BOTH hash key field AND range key field\n\
- HashKey: {\"HashKey\": \"value\"} - filter on hash key field only\n\
- HashRangePrefix: {\"HashRangePrefix\": {\"hash\": \"value\", \"prefix\": \"prefix\"}} - filter on hash key field + range key field prefix\n\
- HashPattern: {\"HashPattern\": \"*pattern*\"} - glob pattern on hash key field\n\n\
Filters for Range schemas (have Range Key only):\n\
- RangePrefix: {\"RangePrefix\": \"prefix\"} - filter on range key field\n\
- RangePattern: {\"RangePattern\": \"*pattern*\"} - glob pattern on range key field\n\
- RangeRange: {\"RangeRange\": {\"start\": \"2025-01-01\", \"end\": \"2025-12-31\"}} - filter on range key field\n\n\
Universal filters (work on any schema type):\n\
- SampleN: {\"SampleN\": 100} - return N RANDOM records\n\
- null - no filter (return all records)\n\n\
IMPORTANT JSON FORMATTING:\n\
- All filter string values must use proper JSON format\n\
- Special characters like @ # $ are valid in JSON strings without escaping\n\
- Example: {\"HashKey\": \"@techinfluencer\"} is correct\n\n\
CRITICAL: Always use key-based filters (HashKey, RangePrefix, etc.).\n\
Check each schema's Hash Key and Range Key fields to determine which filter to use.\n\
Example: If searching for author \"Jennifer Liu\" and schema has hash_field=author, use {\"HashKey\": \"Jennifer Liu\"}.\n\n"
);
prompt.push_str(
"Respond in JSON format:\n\
{\n\
\"needs_query\": true/false,\n\
\"query\": null or {\"schema_name\": \"...\", \"fields\": [...], \"filter\": ...},\n\
\"reasoning\": \"explanation\"\n\
}\n\n\
IMPORTANT: Return ONLY the JSON object, no additional text.",
);
prompt
}
fn build_native_index_query_terms_prompt(
&self,
user_query: &str,
schemas: &[crate::schema::SchemaWithState],
) -> String {
let mut prompt = String::from(
"You are generating search terms for a native word index. Based on the user's natural language query, \
generate relevant search terms that would help find matching records.\n\n"
);
prompt.push_str("Available Schemas:\n");
for schema in schemas {
prompt.push_str(&format!(
"- {} (Type: {:?}, State: {:?})\n",
schema.schema.name, schema.schema.schema_type, schema.state
));
if let Some(ref key) = schema.schema.key {
if let Some(ref hash_field) = key.hash_field {
prompt.push_str(&format!(
" Hash Key: {} (indexed for fast lookup)\n",
hash_field
));
}
if let Some(ref range_field) = key.range_field {
prompt.push_str(&format!(
" Range Key: {} (indexed for fast lookup)\n",
range_field
));
}
}
prompt.push_str(" Fields: ");
let field_names: Vec<String> = schema.schema.runtime_fields.keys().cloned().collect();
prompt.push_str(&field_names.join(", "));
prompt.push('\n');
}
prompt.push_str(&format!("\nUser Query: {}\n\n", user_query));
prompt.push_str(
"Generate 3-8 relevant search terms that would help find records matching this query.\n\n\
Guidelines:\n\
- Extract key words and phrases from the query\n\
- Include synonyms and related terms\n\
- Consider different ways the same concept might be expressed\n\
- Include specific names, places, or entities mentioned\n\
- Generate terms that would be found in indexed fields\n\
- Avoid very common words (stopwords)\n\
- Keep terms concise but meaningful\n\n\
Examples:\n\
- Query: \"Find posts about artificial intelligence\"\n\
Terms: [\"artificial\", \"intelligence\", \"AI\", \"machine learning\", \"neural network\"]\n\
- Query: \"Show me articles by Jennifer Liu\"\n\
Terms: [\"Jennifer\", \"Liu\", \"Jennifer Liu\", \"author\"]\n\
- Query: \"Products with electronics tag\"\n\
Terms: [\"electronics\", \"electronic\", \"tech\", \"gadgets\", \"devices\"]\n\n\
Respond with a JSON array of strings:\n\
[\"term1\", \"term2\", \"term3\", ...]\n\n\
IMPORTANT: Return ONLY the JSON array, no additional text."
);
prompt
}
fn parse_query_terms_response(&self, response: &str) -> Result<Vec<String>, String> {
let json_str = if let Some(start) = response.find('[') {
if let Some(end) = response.rfind(']') {
&response[start..=end]
} else {
response
}
} else {
response
};
let terms: Vec<String> = serde_json::from_str(json_str)
.map_err(|e| format!("Failed to parse query terms: {}. Response: {}", e, json_str))?;
if terms.is_empty() {
return Err("No query terms generated".to_string());
}
Ok(terms)
}
async fn generate_native_index_search_terms(
&self,
user_query: &str,
schemas: &[crate::schema::SchemaWithState],
) -> Result<Vec<String>, String> {
let prompt = self.build_native_index_search_prompt(user_query, schemas);
let response = self.call_llm(&prompt).await?;
self.parse_query_terms_response(&response)
}
fn deduplicate_results(
&self,
mut results: Vec<crate::db_operations::IndexResult>,
) -> Vec<crate::db_operations::IndexResult> {
let _original_count = results.len();
let mut seen = HashSet::new();
results.retain(|result| {
let key = format!(
"{}:{}:{}",
result.schema_name,
serde_json::to_string(&result.key_value).unwrap_or_default(),
result.field
);
seen.insert(key)
});
results
}
async fn interpret_native_index_results(
&self,
original_query: &str,
results: &[crate::db_operations::IndexResult],
) -> Result<String, String> {
log::info!(
"LLM Query: Sending {} results to AI for interpretation",
results.len()
);
if results.is_empty() {
log::warn!("LLM Query: No results to send to AI");
} else {
log::debug!(
"LLM Query: Sample result - schema={}, field={}, key_value={:?}",
results[0].schema_name,
results[0].field,
results[0].key_value
);
}
let prompt = self.build_native_index_interpretation_prompt(original_query, results);
self.call_llm(&prompt).await
}
fn build_native_index_search_prompt(
&self,
user_query: &str,
schemas: &[crate::schema::SchemaWithState],
) -> String {
let mut prompt = String::from(
"You are generating search terms for a native word index system. Based on the user's natural language query, \
generate 3-6 specific search terms that will be used to search the native index.\n\n"
);
prompt.push_str("Available Schemas:\n");
for schema in schemas {
prompt.push_str(&format!(
"- {} (Type: {:?}, State: {:?})\n",
schema.schema.name, schema.schema.schema_type, schema.state
));
if let Some(ref key) = schema.schema.key {
if let Some(ref hash_field) = key.hash_field {
prompt.push_str(&format!(
" Hash Key: {} (indexed for fast lookup)\n",
hash_field
));
}
if let Some(ref range_field) = key.range_field {
prompt.push_str(&format!(
" Range Key: {} (indexed for fast lookup)\n",
range_field
));
}
}
prompt.push_str(" Fields: ");
let field_names: Vec<String> = schema.schema.runtime_fields.keys().cloned().collect();
prompt.push_str(&field_names.join(", "));
prompt.push('\n');
}
prompt.push_str(&format!("\nUser Query: {}\n\n", user_query));
prompt.push_str(
"Generate 3-6 specific search terms that will be used to search the native word index.\n\n\
Guidelines:\n\
- Extract the most important keywords from the query\n\
- Include specific names, places, or entities mentioned\n\
- Generate terms that would be found in indexed text fields\n\
- Avoid very common words (stopwords)\n\
- Keep terms concise but meaningful\n\
- Focus on terms that are likely to appear in the data\n\n\
Examples:\n\
- Query: \"Find posts about artificial intelligence\"\n\
Terms: [\"artificial\", \"intelligence\", \"AI\", \"machine learning\"]\n\
- Query: \"Show me articles by Jennifer Liu\"\n\
Terms: [\"Jennifer\", \"Liu\", \"Jennifer Liu\"]\n\
- Query: \"Products with electronics tag\"\n\
Terms: [\"electronics\", \"electronic\", \"tech\"]\n\n\
Respond with a JSON array of strings:\n\
[\"term1\", \"term2\", \"term3\", ...]\n\n\
IMPORTANT: Return ONLY the JSON array, no additional text."
);
prompt
}
fn build_native_index_interpretation_prompt(
&self,
original_query: &str,
results: &[crate::db_operations::IndexResult],
) -> String {
let results_preview = if results.len() > 50 {
&results[..50]
} else {
results
};
let results_str = serde_json::to_string_pretty(results_preview)
.unwrap_or_else(|_| "Failed to serialize results".to_string());
format!(
"You are interpreting native index search results for a user. Analyze the search results and provide a helpful response.\n\n\
Original User Query: {}\n\
Search Results ({} total, showing first {}):\n{}\n\n\
Provide:\n\
1. A summary of what was found\n\
2. Key insights from the results\n\
3. Notable patterns or interesting findings\n\
4. If no results were found, suggest alternative search terms\n\n\
Keep the response concise, informative, and helpful to the user.",
original_query,
results.len(),
results_preview.len(),
results_str
)
}
fn parse_followup_analysis(&self, response: &str) -> Result<FollowupAnalysis, String> {
let json_str = if let Some(start) = response.find('{') {
if let Some(end) = response.rfind('}') {
&response[start..=end]
} else {
response
}
} else {
response
};
#[derive(serde::Deserialize)]
struct LlmFollowupResponse {
needs_query: bool,
query: Option<Query>,
reasoning: String,
}
let parsed: LlmFollowupResponse = serde_json::from_str(json_str).map_err(|e| {
format!(
"Failed to parse followup analysis: {}. Response: {}",
e, json_str
)
})?;
Ok(FollowupAnalysis {
needs_query: parsed.needs_query,
query: parsed.query,
reasoning: parsed.reasoning,
})
}
pub async fn suggest_alternative_query(
&self,
original_user_query: &str,
failed_query: &Query,
schemas: &[crate::schema::SchemaWithState],
previous_attempts: &[String],
) -> Result<Option<QueryPlan>, String> {
let prompt = self.build_alternative_query_prompt(
original_user_query,
failed_query,
schemas,
previous_attempts,
);
let response = self.call_llm(&prompt).await?;
self.parse_alternative_query(&response)
}
fn build_alternative_query_prompt(
&self,
original_user_query: &str,
failed_query: &Query,
schemas: &[crate::schema::SchemaWithState],
previous_attempts: &[String],
) -> String {
let mut prompt = String::from(
"A query returned no results. Suggest an alternative approach to find the data the user wants.\n\n"
);
prompt.push_str(&format!(
"User's Original Question: {}\n\n",
original_user_query
));
prompt.push_str("Failed Query:\n");
prompt.push_str(&format!(" Schema: {}\n", failed_query.schema_name));
prompt.push_str(&format!(" Fields: {:?}\n", failed_query.fields));
prompt.push_str(&format!(" Filter: {:?}\n\n", failed_query.filter));
if !previous_attempts.is_empty() {
prompt.push_str("Previous Failed Attempts:\n");
for (i, attempt) in previous_attempts.iter().enumerate() {
prompt.push_str(&format!("{}. {}\n", i + 1, attempt));
}
prompt.push('\n');
}
prompt.push_str("Available Schemas:\n");
for schema in schemas {
prompt.push_str(&format!(
"- {} (Type: {:?}, State: {:?})\n",
schema.schema.name, schema.schema.schema_type, schema.state
));
if let Some(ref key) = schema.schema.key {
if let Some(ref hash_field) = key.hash_field {
prompt.push_str(&format!(" Hash Key: {} (filters: HashKey, HashPattern, HashRangeKey, HashRangePrefix operate on this field)\n", hash_field));
}
if let Some(ref range_field) = key.range_field {
prompt.push_str(&format!(" Range Key: {} (filters: RangePrefix, RangePattern, RangeRange, HashRangeKey, HashRangePrefix operate on this field)\n", range_field));
}
}
prompt.push_str(" Fields: ");
let field_names: Vec<String> = schema.schema.runtime_fields.keys().cloned().collect();
prompt.push_str(&field_names.join(", "));
prompt.push('\n');
}
prompt.push_str("\nSuggest ONE alternative approach:\n");
prompt.push_str("1. Try a different schema that might have the data\n");
prompt.push_str(
"2. Broaden the filter (e.g., remove date constraints, use pattern matching)\n",
);
prompt.push_str("3. Try a different filter type (e.g., null filter for all records)\n");
prompt.push_str("4. Search in related/index schemas\n\n");
prompt
.push_str("If you believe there are NO reasonable alternatives left, respond with:\n");
prompt.push_str(
"{\"has_alternative\": false, \"query\": null, \"reasoning\": \"explanation\"}\n\n",
);
prompt.push_str("Otherwise, respond with:\n");
prompt.push_str("{\n");
prompt.push_str(" \"has_alternative\": true,\n");
prompt.push_str(
" \"query\": {\"schema_name\": \"...\", \"fields\": [...], \"filter\": ...},\n",
);
prompt.push_str(" \"reasoning\": \"why this approach might work\"\n");
prompt.push_str("}\n\n");
prompt.push_str(
"FILTER TYPES:\n\
For HashRange schemas (check Hash Key field):\n\
- HashRangeKey, HashKey, HashRangePrefix, HashPattern\n\
For Range schemas (check Range Key field):\n\
- RangePrefix, RangePattern, RangeRange\n\
Universal filters:\n\
- Value (LAST RESORT ONLY), SampleN, null (all records)\n\n\
JSON FORMATTING:\n\
- Use proper JSON format for all filter values\n\
- Special characters like @ # $ are valid in JSON strings\n\
- Example: {\"Value\": \"@username\"}, {\"HashKey\": \"@mention\"}\n\n\
CRITICAL: Prefer key-based filters over Value filter.\n\
Check Hash Key and Range Key fields to determine correct filter.\n\
If search matches a key field, use key filter (HashKey/RangePrefix), NOT Value filter.\n\n\
IMPORTANT: Return ONLY the JSON object."
);
prompt
}
fn parse_alternative_query(&self, response: &str) -> Result<Option<QueryPlan>, String> {
let json_str = if let Some(start) = response.find('{') {
if let Some(end) = response.rfind('}') {
&response[start..=end]
} else {
response
}
} else {
response
};
#[derive(serde::Deserialize)]
struct LlmAlternativeResponse {
has_alternative: bool,
query: Option<Query>,
reasoning: String,
}
let parsed: LlmAlternativeResponse = serde_json::from_str(json_str).map_err(|e| {
format!(
"Failed to parse alternative query: {}. Response: {}",
e, json_str
)
})?;
if parsed.has_alternative {
if let Some(query) = parsed.query {
Ok(Some(QueryPlan {
query,
index_schema: None,
reasoning: parsed.reasoning,
}))
} else {
Err("has_alternative is true but no query provided".to_string())
}
} else {
Ok(None)
}
}
fn build_analysis_prompt(&self, user_query: &str, schemas: &[SchemaWithState]) -> String {
let mut prompt = String::from(
"You are a database query optimizer. Analyze the following natural language query \
and available schemas to create an execution plan.\n\n",
);
prompt.push_str("Available Schemas:\n");
for schema in schemas {
prompt.push_str(&format!(
"- {} (Type: {:?}, State: {:?})\n",
schema.schema.name, schema.schema.schema_type, schema.state
));
if let Some(ref key) = schema.schema.key {
if let Some(ref hash_field) = key.hash_field {
prompt.push_str(&format!(" Hash Key: {} (filters: HashKey, HashPattern, HashRangeKey, HashRangePrefix operate on this field)\n", hash_field));
}
if let Some(ref range_field) = key.range_field {
prompt.push_str(&format!(" Range Key: {} (filters: RangePrefix, RangePattern, RangeRange, HashRangeKey, HashRangePrefix operate on this field)\n", range_field));
}
}
prompt.push_str(" Fields: ");
let field_names: Vec<String> = schema.schema.runtime_fields.keys().cloned().collect();
prompt.push_str(&field_names.join(", "));
prompt.push('\n');
}
prompt.push_str(&format!("\nUser Query: {}\n\n", user_query));
prompt.push_str(
"Determine:\n\
1. Which schema(s) to query\n\
2. What fields to retrieve\n\
3. What filters to apply (if any)\n\
4. If an index is needed (consider element count > 10,000 as threshold)\n\n\
FILTER TYPES AVAILABLE:\n\n\
Filters for HashRange schemas (have both Hash Key and Range Key):\n\
- HashRangeKey: {\"HashRangeKey\": {\"hash\": \"value\", \"range\": \"value\"}} - exact match on BOTH hash key field AND range key field\n\
- HashKey: {\"HashKey\": \"value\"} - filter on hash key field only, returns all records with this hash\n\
- HashRangePrefix: {\"HashRangePrefix\": {\"hash\": \"value\", \"prefix\": \"prefix\"}} - filter on hash key field + range key field prefix\n\
- HashPattern: {\"HashPattern\": \"*pattern*\"} - glob pattern matching on hash key field\n\n\
Filters for Range schemas (have Range Key only):\n\
- RangePrefix: {\"RangePrefix\": \"prefix\"} - filter on range key field, returns records with range starting with prefix\n\
- RangePattern: {\"RangePattern\": \"*pattern*\"} - glob pattern matching on range key field\n\
- RangeRange: {\"RangeRange\": {\"start\": \"2025-01-01\", \"end\": \"2025-12-31\"}} - filter on range key field for values within range\n\n\
Universal filters (work on any schema type):\n\
- SampleN: {\"SampleN\": 100} - return N RANDOM records (NOT sorted)\n\
- null - no filter (return all records)\n\n\
IMPORTANT JSON FORMATTING:\n\
- All string values in filters MUST be properly JSON-escaped\n\
- Special characters like @ # $ etc. do NOT need escaping in JSON strings\n\
- Example: {\"HashKey\": \"user@domain.com\"} is valid JSON\n\n\
CRITICAL FILTER SELECTION RULES:\n\
1. ALWAYS check the schema's Hash Key and Range Key fields to determine the correct filter\n\
2. If the search term matches a Hash Key field value, use HashKey or HashPattern filter\n\
3. If the search term matches a Range Key field value, use RangePrefix, RangePattern, or RangeRange filter\n\
4. Examples of when to use each:\n\
- Searching for author \"Jennifer Liu\" on a schema with hash_field=author → use {\"HashKey\": \"Jennifer Liu\"}\n\
- Searching for date \"2025-09\" on a schema with range_field=publish_date → use {\"RangePrefix\": \"2025-09\"}\n\n\
IMPORTANT NOTES:\n\
- For HashRange schemas, HashKey filters operate on the hash_field, Range filters operate on the range_field\n\
- For Range schemas, Range filters operate on the range_field\n\
- SampleN returns RANDOM records, NOT sorted or ordered\n\
- For \"most recent\" or \"latest\" queries, use null filter to get all records (backend will handle sorting)\n\
- Range keys are stored as strings and compared lexicographically\n\n\
EXAMPLES:\n\
- Search for word \"ai\" in BlogPostWordIndex (hash_field=word): {\"HashKey\": \"ai\"} ✓ CORRECT\n\
- Search for author \"Jennifer Liu\" in schema with hash_field=author: {\"HashKey\": \"Jennifer Liu\"} ✓ CORRECT\n\
- Get blog post by ID in BlogPost (range_field=post_id): {\"RangePrefix\": \"post-123\"} ✓ CORRECT\n\
- Get most recent posts: null (returns all, sorted by backend) ✓ CORRECT\n\
- Get posts in date range (range_field=publish_date): {\"RangeRange\": {\"start\": \"2025-09-01\", \"end\": \"2025-09-30\"}} ✓ CORRECT\n\n\
Respond in JSON format with:\n\
{\n\
\"query\": {\n\
\"schema_name\": \"string\",\n\
\"fields\": [\"field1\", \"field2\"],\n\
\"filter\": null or one of the filter types above\n\
},\n\
\"index_schema\": null or index schema definition (see below),\n\
\"reasoning\": \"your analysis\"\n\
}\n\n\
INDEX SCHEMA CREATION:\n\
If no efficient schema exists for the query, recommend an index schema.\n\
Index schemas enable fast lookups by creating a HashRange index on specific fields.\n\n\
When to recommend an index:\n\
- Word search queries (e.g., \"find posts containing 'technology'\")\n\
- Array field searches (e.g., \"products with tag 'electronics'\")\n\
- Author/user lookup queries (e.g., \"posts by Alice Johnson\")\n\
- Any query that would benefit from hash-based lookup\n\n\
Index schema format:\n\
{\n\
\"name\": \"SourceSchemaFieldIndex\",\n\
\"descriptive_name\": \"Human Readable Name\",\n\
\"key\": {\n\
\"hash_field\": \"field_to_index_on\",\n\
\"range_field\": \"timestamp_or_id_field\"\n\
},\n\
\"transform_fields\": {\n\
\"indexed_field\": \"SourceSchema.field.transform()\",\n\
\"other_field\": \"SourceSchema.map().other_field\"\n\
},\n\
\"field_topologies\": {\n\
\"indexed_field\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"other_field\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}}\n\
}\n\
}\n\n\
CRITICAL TOPOLOGY FORMAT:\n\
- Every field in field_topologies MUST have format: {\"root\": {\"type\": \"Primitive\", \"value\": \"TYPE\"}}\n\
- The \"value\" field is REQUIRED for Primitive types\n\
- Valid values: \"String\", \"Number\", \"Boolean\", \"Null\"\n\
- Arrays: {\"root\": {\"type\": \"Array\", \"value\": {\"type\": \"Primitive\", \"value\": \"String\"}}}\n\
- Objects: {\"root\": {\"type\": \"Object\", \"value\": {\"field1\": {\"type\": \"Primitive\", \"value\": \"String\"}}}}\n\n\
Transform functions available:\n\
- split_by_word() - splits text into individual words\n\
- split_array() - splits array into individual elements\n\
- count() - counts items (returns Number)\n\
- map() - applies transformation to each item\n\n\
Example index schemas:\n\
1. Word search index:\n\
{\n\
\"name\": \"BlogPostWordIndex\",\n\
\"descriptive_name\": \"Blog Post Word Index\",\n\
\"key\": {\"hash_field\": \"word\", \"range_field\": \"publish_date\"},\n\
\"transform_fields\": {\n\
\"word\": \"BlogPost.map().content.split_by_word().map()\",\n\
\"title\": \"BlogPost.map().title\",\n\
\"author\": \"BlogPost.map().author\",\n\
\"publish_date\": \"BlogPost.map().publish_date\"\n\
},\n\
\"field_topologies\": {\n\
\"word\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"title\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"author\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"publish_date\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}}\n\
}\n\
}\n\n\
2. Author lookup index:\n\
{\n\
\"name\": \"BlogPostAuthorIndex\",\n\
\"descriptive_name\": \"Blog Post Author Index\",\n\
\"key\": {\"hash_field\": \"author\", \"range_field\": \"publish_date\"},\n\
\"transform_fields\": {\n\
\"author\": \"BlogPost.map().author\",\n\
\"title\": \"BlogPost.map().title\",\n\
\"content\": \"BlogPost.map().content\",\n\
\"publish_date\": \"BlogPost.map().publish_date\"\n\
},\n\
\"field_topologies\": {\n\
\"author\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"title\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"content\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"publish_date\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}}\n\
}\n\
}\n\n\
3. Tag search index (array splitting):\n\
{\n\
\"name\": \"ProductTagIndex\",\n\
\"descriptive_name\": \"Product Tag Index\",\n\
\"key\": {\"hash_field\": \"tag\", \"range_field\": \"created_at\"},\n\
\"transform_fields\": {\n\
\"tag\": \"Product.map().tags.split_array().map()\",\n\
\"product_id\": \"Product.map().product_id\",\n\
\"name\": \"Product.map().name\",\n\
\"price\": \"Product.map().price\",\n\
\"created_at\": \"Product.map().created_at\"\n\
},\n\
\"field_topologies\": {\n\
\"tag\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"product_id\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"name\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}},\n\
\"price\": {\"root\": {\"type\": \"Primitive\", \"value\": \"Number\"}},\n\
\"created_at\": {\"root\": {\"type\": \"Primitive\", \"value\": \"String\"}}\n\
}\n\
}\n\n\
IMPORTANT: \n\
- Return ONLY the JSON object, no additional text\n\
- Use the EXACT filter format shown above\n\
- For \"most recent\", \"latest\", or \"newest\" queries, use null filter (NOT SampleN)\n\
- Prefer existing approved schemas; only recommend index_schema if no efficient schema exists\n\
- Index schemas must always have schema_type \"HashRange\" (implicit)\n\
- Always include field_topologies for all fields in transform_fields\n\
- Choose hash_field based on what will be queried (word, author, tag, etc.)\n\
- Choose range_field as a timestamp or ID for natural ordering"
);
prompt
}
fn build_summarization_prompt(&self, original_query: &str, results: &[Value]) -> String {
let results_preview = if results.len() > 1000 {
&results[..1000]
} else {
results
};
let results_str = serde_json::to_string_pretty(results_preview)
.unwrap_or_else(|_| "Failed to serialize results".to_string());
format!(
"Summarize the following query results for the user.\n\n\
Original Query: {}\n\
Results ({} total): {}\n\n\
Provide:\n\
1. High-level summary\n\
2. Key insights\n\
3. Notable patterns or anomalies\n\n\
Keep the summary concise and informative.",
original_query,
results.len(),
results_str
)
}
fn build_chat_prompt(
&self,
original_query: &str,
results: &[Value],
conversation_history: &[Message],
question: &str,
) -> String {
let results_preview = if results.len() > 1000 {
&results[..1000]
} else {
results
};
let results_str = serde_json::to_string_pretty(results_preview)
.unwrap_or_else(|_| "Failed to serialize results".to_string());
let mut prompt = String::from(
"You are helping a user explore query results. Answer their question based on \
the context provided.\n\n",
);
prompt.push_str(&format!("Original Query: {}\n", original_query));
prompt.push_str(&format!(
"Results ({} total): {}\n\n",
results.len(),
results_str
));
if !conversation_history.is_empty() {
prompt.push_str("Conversation History:\n");
for msg in conversation_history {
prompt.push_str(&format!("{}: {}\n", msg.role, msg.content));
}
prompt.push('\n');
}
prompt.push_str(&format!("User Question: {}\n\n", question));
prompt.push_str("Provide a clear, concise answer based on the data.");
prompt
}
async fn call_llm(&self, prompt: &str) -> Result<String, String> {
match self.provider {
AIProvider::OpenRouter => {
if let Some(ref service) = self.openrouter_service {
service
.call_openrouter_api(prompt)
.await
.map_err(|e| format!("OpenRouter API error: {}", e))
} else {
Err("OpenRouter service not initialized".to_string())
}
}
AIProvider::Ollama => {
if let Some(ref service) = self.ollama_service {
service
.call_ollama_api(prompt)
.await
.map_err(|e| format!("Ollama API error: {}", e))
} else {
Err("Ollama service not initialized".to_string())
}
}
}
}
fn parse_query_plan(&self, response: &str) -> Result<QueryPlan, String> {
let json_str = if let Some(start) = response.find('{') {
if let Some(end) = response.rfind('}') {
&response[start..=end]
} else {
response
}
} else {
response
};
#[derive(serde::Deserialize)]
struct LlmResponse {
query: Query,
index_schema: Option<DeclarativeSchemaDefinition>,
reasoning: String,
}
let parsed: LlmResponse = serde_json::from_str(json_str).map_err(|e| {
format!(
"Failed to parse LLM response: {}. Response: {}",
e, json_str
)
})?;
Ok(QueryPlan {
query: parsed.query,
index_schema: parsed.index_schema,
reasoning: parsed.reasoning,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::types::{
DeclarativeSchemaDefinition, JsonTopology, KeyConfig, PrimitiveType, TopologyNode,
};
use crate::schema::{SchemaState, SchemaWithState};
use std::collections::HashMap;
fn create_test_hash_range_schema() -> SchemaWithState {
let mut field_topologies = HashMap::new();
field_topologies.insert(
"author".to_string(),
JsonTopology {
root: TopologyNode::Primitive {
value: PrimitiveType::String,
classifications: Some(vec!["word".to_string()]),
},
},
);
field_topologies.insert(
"publish_date".to_string(),
JsonTopology {
root: TopologyNode::Primitive {
value: PrimitiveType::String,
classifications: Some(vec!["word".to_string()]),
},
},
);
let mut schema = DeclarativeSchemaDefinition::new(
"BlogPostAuthorIndex".to_string(),
crate::schema::types::schema::DeclarativeSchemaType::HashRange,
Some(KeyConfig {
hash_field: Some("author".to_string()),
range_field: Some("publish_date".to_string()),
}),
None, None, None, );
schema.descriptive_name = Some("Blog Post Author Index".to_string());
schema.field_topologies = field_topologies;
SchemaWithState {
schema,
state: SchemaState::Approved,
}
}
#[test]
fn test_prompt_includes_hash_and_range_keys() {
let mut config = crate::ingestion::config::IngestionConfig::default();
config.provider = crate::ingestion::config::AIProvider::Ollama;
let service = LlmQueryService::new(config).expect("Failed to create service");
let schemas = vec![create_test_hash_range_schema()];
let prompt = service.build_analysis_prompt("Find posts by Jennifer Liu", &schemas);
assert!(
prompt.contains("Hash Key: author"),
"Prompt should include Hash Key field"
);
assert!(
prompt.contains("Range Key: publish_date"),
"Prompt should include Range Key field"
);
assert!(
prompt.contains("HashKey"),
"Prompt should mention HashKey filter"
);
assert!(
prompt.contains("CRITICAL"),
"Prompt should include critical filter selection guidance"
);
assert!(
prompt.contains("Jennifer Liu"),
"Prompt should include the example with Jennifer Liu"
);
}
#[test]
fn test_prompt_shows_correct_vs_incorrect_examples() {
let mut config = crate::ingestion::config::IngestionConfig::default();
config.provider = crate::ingestion::config::AIProvider::Ollama;
let service = LlmQueryService::new(config).expect("Failed to create service");
let schemas = vec![create_test_hash_range_schema()];
let prompt = service.build_analysis_prompt("Test query", &schemas);
assert!(
prompt.contains("✓ CORRECT"),
"Prompt should show correct examples"
);
}
}