use crate::datafold_node::llm_query::service::LlmQueryService;
use crate::datafold_node::DataFoldNode;
use crate::ingestion::{create_progress_tracker, IngestionConfig, IngestionError, ProgressTracker};
use crate::lambda::config::{AIConfig, AIProvider, LambdaConfig};
use crate::lambda::logging::{LogBridge, Logger};
use crate::lambda::node_manager::NodeManager;
use once_cell::sync::OnceCell;
use std::sync::Arc;
pub struct LambdaContext {
pub(crate) node_manager: Arc<NodeManager>,
pub(crate) progress_tracker: ProgressTracker,
pub(crate) llm_service: Option<Arc<LlmQueryService>>,
pub(crate) logger: Arc<dyn Logger>,
}
static LAMBDA_CONTEXT: OnceCell<LambdaContext> = OnceCell::new();
impl LambdaContext {
pub async fn init(config: LambdaConfig) -> Result<(), IngestionError> {
let node_manager = Arc::new(NodeManager::new(config.clone()).await?);
let progress_tracker: ProgressTracker = match &config.storage {
crate::lambda::config::LambdaStorage::Config(
crate::storage::DatabaseConfig::DynamoDb(dynamo_config),
) => {
use crate::progress::DynamoDbProgressStore;
let table_name = dynamo_config.tables.process.clone();
let region = dynamo_config.region.clone();
Arc::new(DynamoDbProgressStore::from_config(table_name, region).await)
}
_ => {
create_progress_tracker(None).await
}
};
let llm_service = if let Some(ai_config) = config.ai_config.clone() {
let ingestion_config = Self::ai_config_to_ingestion_config(ai_config)?; match LlmQueryService::new(ingestion_config) {
Ok(service) => Some(Arc::new(service)),
Err(e) => {
log::warn!("Failed to initialize AI service: {}. AI query methods will not be available.", e);
None
}
}
} else {
None
};
let logger: Arc<dyn Logger> = match config.logging {
crate::lambda::config::LambdaLogging::DynamoDb => {
use crate::logging::outputs::dynamodb::DynamoDbLogger;
let table_name = if let crate::lambda::config::LambdaStorage::Config(
crate::storage::DatabaseConfig::DynamoDb(cfg),
) = &config.storage
{
cfg.tables.logs.clone()
} else {
"datafold-logs".to_string()
};
Arc::new(DynamoDbLogger::new(table_name).await)
}
crate::lambda::config::LambdaLogging::Stdout => {
Arc::new(crate::lambda::logging::StdoutLogger::new())
}
crate::lambda::config::LambdaLogging::Custom(logger) => logger,
crate::lambda::config::LambdaLogging::NoOp => {
Arc::new(crate::lambda::logging::NoOpLogger::new())
}
};
let _log_bridge = LogBridge::new(logger.clone(), None);
if let Err(e) = log::set_boxed_logger(Box::new(_log_bridge)) {
eprintln!("Warning: Failed to set logger: {}", e);
}
log::set_max_level(log::LevelFilter::Info);
let context = LambdaContext {
node_manager,
progress_tracker,
llm_service,
logger,
};
LAMBDA_CONTEXT
.set(context)
.map_err(|_| IngestionError::configuration_error("Context already initialized"))?;
Ok(())
}
fn ai_config_to_ingestion_config(
ai_config: AIConfig,
) -> Result<IngestionConfig, IngestionError> {
use crate::ingestion::config::{
AIProvider as IngestionAIProvider, OllamaConfig as IngestionOllamaConfig,
OpenRouterConfig as IngestionOpenRouterConfig,
};
let provider = match ai_config.provider {
AIProvider::OpenRouter => IngestionAIProvider::OpenRouter,
AIProvider::Ollama => IngestionAIProvider::Ollama,
};
let openrouter = ai_config.openrouter.map(|cfg| IngestionOpenRouterConfig {
api_key: cfg.api_key,
model: cfg.model,
base_url: cfg
.base_url
.unwrap_or_else(|| "https://openrouter.ai/api/v1".to_string()),
});
let ollama = ai_config.ollama.map(|cfg| IngestionOllamaConfig {
base_url: cfg.base_url,
model: cfg.model,
});
Ok(IngestionConfig {
provider,
openrouter: openrouter.unwrap_or_default(),
ollama: ollama.unwrap_or_default(),
enabled: true,
max_retries: ai_config.max_retries,
timeout_seconds: ai_config.timeout_seconds,
auto_execute_mutations: false, default_trust_distance: 0, })
}
pub(crate) fn get() -> Result<&'static LambdaContext, IngestionError> {
LAMBDA_CONTEXT.get().ok_or_else(|| {
IngestionError::configuration_error(
"Lambda context not initialized. Call LambdaContext::init() first.",
)
})
}
pub async fn node() -> Result<Arc<tokio::sync::Mutex<DataFoldNode>>, IngestionError> {
Self::get()?.node_manager.get_single_node().ok_or_else(|| {
IngestionError::configuration_error(
"No single-tenant node available. Use get_node(user_id) for multi-tenant setups.",
)
})
}
pub async fn get_node(
user_id: &str,
) -> Result<Arc<tokio::sync::Mutex<DataFoldNode>>, IngestionError> {
Self::get()?.node_manager.get_node(user_id).await
}
pub fn progress_tracker() -> Result<ProgressTracker, IngestionError> {
Ok(Self::get()?.progress_tracker.clone())
}
pub fn get_user_logger(
user_id: &str,
) -> Result<crate::lambda::logging::UserLogger, IngestionError> {
let ctx = Self::get()?;
Ok(crate::lambda::logging::UserLogger::new(
user_id.to_string(),
ctx.logger.clone(),
))
}
}