use crate::fold_db_core::orchestration::IndexingStatus;
use crate::fold_db_core::query::records_from_field_map;
use crate::ingestion::IngestionError;
use crate::lambda::types::{
AIQueryResponse, CompleteQueryResponse, ConversationMessage, FollowupRequest, FollowupResponse,
QueryContext, QueryPlanInfo,
};
use crate::schema::types::operations::Mutation;
use serde_json::Value;
use std::time::{SystemTime, UNIX_EPOCH};
use super::context::LambdaContext;
use crate::datafold_node::OperationProcessor;
use crate::error::FoldDbError;
impl LambdaContext {
pub async fn ai_query(query: &str, user_id: String) -> Result<AIQueryResponse, IngestionError> {
let ctx = Self::get()?;
let service = ctx.llm_service.as_ref()
.ok_or_else(|| IngestionError::configuration_error(
"AI query not configured. Initialize LambdaContext with ai_config using .with_openrouter() or .with_ollama()."
))?;
let schemas = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor.list_schemas().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get schemas: {}", e))
})?
};
use crate::lambda::logging::run_with_user;
let (ai_interpretation, raw_results) = run_with_user(&user_id, async {
let node_mutex = Self::get_node(&user_id).await?;
let node = node_mutex.lock().await;
let db_ops = node
.get_fold_db()
.await
.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to access database: {}", e))
})?
.get_db_ops();
drop(node);
service
.execute_ai_native_index_query_with_results(query, &schemas, &db_ops)
.await
.map_err(|e| IngestionError::InvalidInput(format!("AI query failed: {}", e)))
})
.await?;
let results_as_json: Vec<Value> = raw_results
.into_iter()
.map(|result| serde_json::to_value(result).unwrap_or(serde_json::json!({})))
.collect();
let context = QueryContext {
original_query: query.to_string(),
query_results: results_as_json.clone(),
conversation_history: vec![
ConversationMessage {
role: "user".to_string(),
content: query.to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
},
ConversationMessage {
role: "assistant".to_string(),
content: ai_interpretation.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
},
],
query_plan: None,
};
Ok(AIQueryResponse {
ai_interpretation,
raw_results: results_as_json,
context,
})
}
pub async fn run_ai_query(
query: &str,
user_id: String,
) -> Result<CompleteQueryResponse, IngestionError> {
let ctx = Self::get()?;
let service = ctx.llm_service.as_ref()
.ok_or_else(|| IngestionError::configuration_error(
"AI query not configured. Initialize LambdaContext with ai_config using .with_openrouter() or .with_ollama()."
))?;
let schemas = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor.list_schemas().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get schemas: {}", e))
})?
};
use crate::lambda::logging::run_with_user;
run_with_user(&user_id, async {
let query_plan = service.analyze_query(query, &schemas).await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to analyze query: {}", e))
})?;
let processor = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
OperationProcessor::new(node_guard.clone())
};
let results = match processor.execute_query_json(query_plan.query.clone()).await {
Ok(results) => results,
Err(e) => {
return Err(IngestionError::InvalidInput(format!(
"Failed to execute query: {}",
e
)));
}
};
let summary = service.summarize_results(query, &results).await.ok();
let filter_type = query_plan.query.filter.as_ref().map(|f| format!("{:?}", f));
let query_plan_info = QueryPlanInfo {
schema_name: query_plan.query.schema_name.clone(),
fields: query_plan.query.fields.clone(),
filter_type,
reasoning: query_plan.reasoning.clone(),
};
let mut conversation_history = vec![ConversationMessage {
role: "user".to_string(),
content: query.to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}];
if let Some(ref s) = summary {
conversation_history.push(ConversationMessage {
role: "assistant".to_string(),
content: s.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
});
}
let context = QueryContext {
original_query: query.to_string(),
query_results: results.clone(),
conversation_history,
query_plan: Some(query_plan_info.clone()),
};
Ok(CompleteQueryResponse {
query_plan: query_plan_info,
results,
summary,
context,
})
})
.await
}
pub async fn ask_followup(
request: FollowupRequest,
user_id: String,
) -> Result<FollowupResponse, IngestionError> {
let ctx = Self::get()?;
let service = ctx.llm_service.as_ref()
.ok_or_else(|| IngestionError::configuration_error(
"AI query not configured. Initialize LambdaContext with ai_config using .with_openrouter() or .with_ollama()."
))?;
let context = request.context;
let question = request.question;
let schemas = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor.list_schemas().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get schemas: {}", e))
})?
};
use crate::lambda::logging::run_with_user;
run_with_user(&user_id, async {
let conversation_history: Vec<crate::datafold_node::llm_query::types::Message> =
context
.conversation_history
.iter()
.map(|msg| crate::datafold_node::llm_query::types::Message {
role: msg.role.clone(),
content: msg.content.clone(),
timestamp: SystemTime::UNIX_EPOCH
+ std::time::Duration::from_secs(msg.timestamp),
})
.collect();
let analysis = service
.analyze_followup_question(
&context.original_query,
&context.query_results,
&question,
&schemas,
)
.await
.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to analyze followup: {}", e))
})?;
let mut combined_results = context.query_results.clone();
let mut executed_new_query = false;
if analysis.needs_query {
if let Some(new_query) = analysis.query {
executed_new_query = true;
let processor = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
OperationProcessor::new(node_guard.clone())
};
match processor.execute_query_json(new_query).await {
Ok(results) => {
combined_results = results;
}
Err(e) => {
log::warn!("Failed to execute followup query: {}", e);
}
}
}
}
let answer = service
.answer_question(
&context.original_query,
&combined_results,
&conversation_history,
&question,
)
.await
.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get answer: {}", e))
})?;
let mut updated_conversation = context.conversation_history.clone();
updated_conversation.push(ConversationMessage {
role: "user".to_string(),
content: question.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
});
updated_conversation.push(ConversationMessage {
role: "assistant".to_string(),
content: answer.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
});
let updated_context = QueryContext {
original_query: context.original_query,
query_results: combined_results,
conversation_history: updated_conversation,
query_plan: context.query_plan,
};
Ok(FollowupResponse {
answer,
executed_new_query,
context: updated_context,
})
})
.await
}
pub async fn query(
query: crate::schema::types::Query,
user_id: String,
) -> Result<Vec<Value>, IngestionError> {
use crate::lambda::logging::run_with_user;
run_with_user(&user_id, async {
let processor = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
OperationProcessor::new(node_guard.clone())
};
match processor.execute_query_json(query).await {
Ok(results) => Ok(results),
Err(e) => Err(IngestionError::InvalidInput(format!("Query failed: {}", e))),
}
})
.await
}
pub async fn native_index_search(
term: &str,
user_id: String,
) -> Result<Vec<Value>, IngestionError> {
use crate::lambda::logging::run_with_user;
run_with_user(&user_id, async {
let processor = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
OperationProcessor::new(node_guard.clone())
};
let results = processor.native_index_search(term).await.map_err(|e| {
IngestionError::InvalidInput(format!("Native index search failed: {}", e))
})?;
Ok(results
.into_iter()
.map(|r| serde_json::to_value(r).unwrap_or(serde_json::json!({})))
.collect())
})
.await
}
pub async fn execute_mutation(
mutation: Mutation,
user_id: String,
) -> Result<String, IngestionError> {
use crate::lambda::logging::run_with_user;
run_with_user(&user_id, async {
let processor = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
OperationProcessor::new(node_guard.clone())
};
processor
.execute_mutation_op(mutation)
.await
.map_err(|e| IngestionError::InvalidInput(format!("Mutation failed: {}", e)))
})
.await
}
pub async fn execute_mutations_batch(
mutations: Vec<Mutation>,
user_id: String,
) -> Result<Vec<String>, IngestionError> {
use crate::lambda::logging::run_with_user;
run_with_user(&user_id, async {
let processor = {
let node_mutex = Self::get_node(&user_id).await?;
let node_guard = node_mutex.lock().await;
OperationProcessor::new(node_guard.clone())
};
processor
.execute_mutations_batch_ops(mutations)
.await
.map_err(|e| IngestionError::InvalidInput(format!("Batch mutation failed: {}", e)))
})
.await
}
pub async fn list_transforms(
) -> Result<std::collections::HashMap<String, crate::schema::types::Transform>, IngestionError>
{
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor
.list_transforms()
.await
.map_err(|e| IngestionError::InvalidInput(format!("detect: {}", e)))
}
pub async fn get_transform_queue() -> Result<Value, IngestionError> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
let (len, queued) = processor.get_transform_queue().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get transform queue info: {}", e))
})?;
Ok(serde_json::json!({
"length": len,
"queued_transforms": queued
}))
}
pub async fn add_to_transform_queue(id: &str) -> Result<(), IngestionError> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor
.add_to_transform_queue(id, "manual_lambda_trigger")
.await
.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to add transform to queue: {}", e))
})
}
pub async fn get_all_backfills() -> Result<
Vec<crate::fold_db_core::infrastructure::backfill_tracker::BackfillInfo>,
IngestionError,
> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor
.get_all_backfills()
.await
.map_err(|e| IngestionError::InvalidInput(format!("Failed to get backfills: {}", e)))
}
pub async fn get_active_backfills() -> Result<
Vec<crate::fold_db_core::infrastructure::backfill_tracker::BackfillInfo>,
IngestionError,
> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor.get_active_backfills().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get active backfills: {}", e))
})
}
pub async fn get_backfill(
id: &str,
) -> Result<
Option<crate::fold_db_core::infrastructure::backfill_tracker::BackfillInfo>,
IngestionError,
> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor
.get_backfill(id)
.await
.map_err(|e| IngestionError::InvalidInput(format!("Failed to get backfill: {}", e)))
}
pub async fn get_backfill_statistics() -> Result<
crate::fold_db_core::infrastructure::backfill_tracker::BackfillStatistics,
IngestionError,
> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor.get_backfill_statistics().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get backfill statistics: {}", e))
})
}
pub async fn get_transform_statistics() -> Result<Value, IngestionError> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
let stats = processor.get_transform_statistics().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get transform statistics: {}", e))
})?;
Ok(serde_json::to_value(stats).unwrap_or(serde_json::json!({})))
}
pub async fn get_indexing_status() -> Result<IndexingStatus, IngestionError> {
let node_mutex = Self::node().await?;
let node_guard = node_mutex.lock().await;
let processor = OperationProcessor::new(node_guard.clone());
processor.get_indexing_status().await.map_err(|e| {
IngestionError::InvalidInput(format!("Failed to get indexing status: {}", e))
})
}
}