#![cfg(feature = "lambda")]
use datafold::lambda::{LambdaConfig, LambdaContext, LambdaLogging};
use datafold::storage::DatabaseConfig;
use serde_json::json;
#[tokio::test]
async fn test_lambda_context_initialization() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let storage_config = DatabaseConfig::Local {
path: temp_dir.clone(),
};
let config = LambdaConfig::new(storage_config, LambdaLogging::Stdout)
.with_schema_service_url("https://schema.example.com".to_string());
let result = LambdaContext::init(config).await;
match result {
Ok(_) => {
assert!(LambdaContext::node().await.is_ok());
assert!(LambdaContext::progress_tracker().is_ok());
}
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("already initialized")
|| error_msg.contains("Context already initialized"),
"Unexpected error: {}",
error_msg
);
}
}
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_lambda_context_double_init_fails() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir1 = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let temp_dir2 = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let storage_config1 = DatabaseConfig::Local {
path: temp_dir1.clone(),
};
let config1 = LambdaConfig::new(storage_config1, LambdaLogging::Stdout)
.with_schema_service_url("https://schema1.com".to_string());
let _ = LambdaContext::init(config1.clone()).await;
LambdaContext::progress_tracker().unwrap();
let storage_config2 = DatabaseConfig::Local {
path: temp_dir2.clone(),
};
let config2 = LambdaConfig::new(storage_config2, LambdaLogging::Stdout)
.with_schema_service_url("https://schema.example.com".to_string());
let second_init = LambdaContext::init(config2).await;
assert!(second_init.is_err(), "Second initialization should fail");
if let Err(e) = second_init {
let error_msg = e.to_string();
assert!(
error_msg.contains("already initialized")
|| error_msg.contains("Context already initialized"),
"Unexpected error: {}",
error_msg
);
}
let _ = std::fs::remove_dir_all(temp_dir1);
let _ = std::fs::remove_dir_all(temp_dir2);
}
#[tokio::test]
async fn test_lambda_context_with_schema_service_url() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let storage_config = DatabaseConfig::Local {
path: temp_dir.clone(),
};
let config = LambdaConfig::new(storage_config, LambdaLogging::Stdout)
.with_schema_service_url("https://schema.example.com".to_string());
let result = LambdaContext::init(config).await;
match result {
Ok(_) => {
assert!(LambdaContext::node().await.is_ok());
}
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("already initialized")
|| error_msg.contains("Context already initialized"),
"Unexpected error: {}",
error_msg
);
}
}
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_lambda_context_get_progress_nonexistent() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let storage_config = DatabaseConfig::Local {
path: temp_dir.clone(),
};
let config = LambdaConfig::new(storage_config, LambdaLogging::Stdout)
.with_schema_service_url("https://schema.example.com".to_string());
let _ = LambdaContext::init(config).await;
let result = LambdaContext::get_progress("nonexistent-progress-id").await;
match result {
Ok(None) => {
}
Ok(Some(_)) => {
panic!("Found unexpected progress");
}
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("not initialized") || error_msg.contains("already initialized"),
"Unexpected error: {}",
error_msg
);
}
}
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_lambda_context_access_after_init() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let storage_config = DatabaseConfig::Local {
path: temp_dir.clone(),
};
let config = LambdaConfig::new(storage_config, LambdaLogging::Stdout)
.with_schema_service_url("https://schema.example.com".to_string());
let init_result = LambdaContext::init(config).await;
if init_result.is_ok()
|| init_result
.as_ref()
.err()
.map(|e| e.to_string().contains("already initialized"))
.unwrap_or(false)
{
let node_result = LambdaContext::node().await;
let tracker_result = LambdaContext::progress_tracker();
assert!(
node_result.is_ok() || tracker_result.is_ok(),
"Neither node nor progress_tracker accessible"
);
if let Ok(node) = node_result {
let _node_guard = node.lock().await;
}
}
let _ = std::fs::remove_dir_all(temp_dir);
}
#[tokio::test]
async fn test_lambda_context_ingest_json_returns_progress_id() {
std::env::set_var("FOLDB_USER_ID", "test_user");
let temp_dir = std::env::temp_dir().join(format!("lambda_test_{}", uuid::Uuid::new_v4()));
let storage_config = DatabaseConfig::Local {
path: temp_dir.clone(),
};
let config = LambdaConfig::new(storage_config, LambdaLogging::Stdout)
.with_schema_service_url("https://schema.example.com".to_string());
let _ = LambdaContext::init(config).await;
let data = json!([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"}
]);
let result = LambdaContext::ingest_json(
data,
false, 0,
"test_key".to_string(),
"test_user".to_string(),
)
.await;
match result {
Ok(progress_id) => {
assert!(!progress_id.is_empty());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let progress = LambdaContext::get_progress(&progress_id).await;
assert!(progress.is_ok());
}
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("Configuration") || error_msg.contains("config"),
"Unexpected error: {}",
error_msg
);
}
}
let _ = std::fs::remove_dir_all(temp_dir);
}