#![cfg(feature = "lambda")]
use async_trait::async_trait;
use datafold::ingestion::IngestionError;
use datafold::lambda::{LambdaConfig, LambdaContext, LambdaLogging, LogEntry, LogLevel, Logger};
use datafold::storage::DatabaseConfig;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
struct UserIdTestLogger {
logs: Arc<Mutex<Vec<LogEntry>>>,
}
impl UserIdTestLogger {
fn new() -> Self {
Self {
logs: Arc::new(Mutex::new(Vec::new())),
}
}
async fn get_logs(&self) -> Vec<LogEntry> {
self.logs.lock().await.clone()
}
async fn get_logs_for_user(&self, user_id: &str) -> Vec<LogEntry> {
let logs = self.logs.lock().await;
logs.iter()
.filter(|entry| entry.user_id.as_deref() == Some(user_id))
.cloned()
.collect()
}
}
#[async_trait]
impl Logger for UserIdTestLogger {
async fn log(&self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut logs = self.logs.lock().await;
logs.push(entry.clone());
Ok(())
}
}
#[tokio::test]
async fn test_sync_ingestion_logging_context() {
let temp_dir =
std::env::temp_dir().join(format!("lambda_sync_logging_test_{}", uuid::Uuid::new_v4()));
let test_logger = UserIdTestLogger::new();
let storage_config = DatabaseConfig::Local {
path: temp_dir.clone(),
};
let config = LambdaConfig::new(
storage_config,
LambdaLogging::Custom(Arc::new(test_logger.clone())),
)
.with_schema_service_url("https://schema.example.com".to_string());
let _ = LambdaContext::init(config).await;
let test_data = json!([{"id": "1", "name": "Test Item"}]);
let target_user_id = "target_tenant_user_123";
println!(
"Starting synchronous ingestion for user: {}",
target_user_id
);
let _ = LambdaContext::ingest_json_sync(
test_data,
false,
0,
"default".to_string(),
target_user_id.to_string(),
)
.await;
let logs = test_logger.get_logs().await;
println!("Total logs: {}", logs.len());
let user_logs = test_logger.get_logs_for_user(target_user_id).await;
println!("Logs for {}: {}", target_user_id, user_logs.len());
assert!(logs.len() > 0, "Should have emitted logs");
assert!(
user_logs.len() > 0,
"Failed to capture ANY logs with user_id={}. Logs emitted but missing user context!",
target_user_id
);
let _ = std::fs::remove_dir_all(temp_dir);
}