use crate::datafold_node::DataFoldNode;
use crate::ingestion::json_processor::{convert_file_to_json, flatten_root_layers};
use crate::ingestion::ingestion_spawner::{spawn_background_ingestion, IngestionSpawnConfig};
use crate::ingestion::{IngestionError, IngestionResponse, ProgressTracker, IngestionConfig};
use crate::storage::UploadStorage;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub struct S3IngestionRequest {
pub s3_path: String,
pub auto_execute: bool,
pub trust_distance: u32,
pub pub_key: String,
pub ingestion_config: Option<IngestionConfig>,
}
impl S3IngestionRequest {
pub fn new(s3_path: String) -> Self {
Self {
s3_path,
auto_execute: true,
trust_distance: 0,
pub_key: "default".to_string(),
ingestion_config: None,
}
}
pub fn with_auto_execute(mut self, auto_execute: bool) -> Self {
self.auto_execute = auto_execute;
self
}
pub fn with_trust_distance(mut self, trust_distance: u32) -> Self {
self.trust_distance = trust_distance;
self
}
pub fn with_pub_key(mut self, pub_key: String) -> Self {
self.pub_key = pub_key;
self
}
pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
self.ingestion_config = Some(config);
self
}
pub fn with_openrouter_api_key(mut self, api_key: String) -> Self {
let mut config = IngestionConfig::default();
config.openrouter.api_key = api_key;
config.enabled = true;
self.ingestion_config = Some(config);
self
}
pub fn with_openrouter_config(mut self, api_key: String, model: String, base_url: String) -> Self {
let mut config = IngestionConfig::default();
config.openrouter.api_key = api_key;
config.openrouter.model = model;
config.openrouter.base_url = base_url;
config.enabled = true;
self.ingestion_config = Some(config);
self
}
}
pub async fn ingest_from_s3_path_async(
request: &S3IngestionRequest,
upload_storage: &UploadStorage,
progress_tracker: &ProgressTracker,
node: Arc<Mutex<DataFoldNode>>,
ingestion_config: Option<&IngestionConfig>,
) -> Result<IngestionResponse, IngestionError> {
let config = if let Some(config) = ingestion_config {
config.clone()
} else if let Some(config) = &request.ingestion_config {
config.clone()
} else {
IngestionConfig::from_env()?
};
let (file_path, filename) = download_s3_file(&request.s3_path, upload_storage).await?;
let json_value = convert_file_to_json(&file_path)
.await
.map_err(|_| IngestionError::FileConversionFailed)?;
let flattened_json = flatten_root_layers(json_value);
let spawn_config = IngestionSpawnConfig {
json_data: flattened_json,
auto_execute: request.auto_execute,
trust_distance: request.trust_distance,
pub_key: request.pub_key.clone(),
source_file_name: Some(filename),
ingestion_config: config,
};
let progress_id = spawn_background_ingestion(spawn_config, progress_tracker, node).await;
Ok(IngestionResponse {
success: true,
progress_id: Some(progress_id),
schema_used: None,
new_schema_created: false,
mutations_generated: 0,
mutations_executed: 0,
errors: Vec::new(),
})
}
pub async fn ingest_from_s3_path_sync(
request: &S3IngestionRequest,
upload_storage: &UploadStorage,
progress_tracker: &ProgressTracker,
node: Arc<Mutex<DataFoldNode>>,
ingestion_config: Option<&IngestionConfig>,
) -> Result<IngestionResponse, IngestionError> {
let async_response = ingest_from_s3_path_async(request, upload_storage, progress_tracker, node, ingestion_config).await?;
let progress_id = async_response.progress_id
.ok_or_else(|| IngestionError::InvalidInput("No progress_id returned".to_string()))?;
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let progress = progress_tracker.load(&progress_id).await
.map_err(|e| IngestionError::InvalidInput(format!("Failed to load progress: {}", e)))?;
if let Some(progress) = progress {
if progress.is_complete {
return Ok(IngestionResponse {
success: true,
progress_id: Some(progress_id),
schema_used: progress.results.as_ref().map(|r| r.schema_name.clone()),
new_schema_created: progress.results.as_ref().is_some_and(|r| r.new_schema_created),
mutations_generated: progress.results.as_ref().map_or(0, |r| r.mutations_generated),
mutations_executed: progress.results.as_ref().map_or(0, |r| r.mutations_executed),
errors: progress.error_message.into_iter().collect(),
});
}
} else {
return Err(IngestionError::InvalidInput(format!(
"Progress tracking lost for {}",
progress_id
)));
}
}
}
async fn download_s3_file(
s3_path: &str,
upload_storage: &UploadStorage,
) -> Result<(PathBuf, String), IngestionError> {
if !s3_path.starts_with("s3://") {
return Err(IngestionError::InvalidInput(format!(
"Invalid S3 path format. Expected 's3://bucket/key', got: {}",
s3_path
)));
}
let path_without_prefix = &s3_path[5..]; let parts: Vec<&str> = path_without_prefix.splitn(2, '/').collect();
if parts.len() != 2 {
return Err(IngestionError::InvalidInput(format!(
"Invalid S3 path structure. Expected 's3://bucket/key', got: {}",
s3_path
)));
}
let bucket = parts[0];
let key = parts[1];
let filename = key.rsplit('/').next().unwrap_or(key).to_string();
let file_data = upload_storage
.download_from_s3_path(bucket, key)
.await
.map_err(|e| IngestionError::StorageError(e.to_string()))?;
let temp_path = std::env::temp_dir().join(&filename);
fs::write(&temp_path, &file_data)
.await
.map_err(|e| IngestionError::StorageError(format!("Failed to write temp file: {}", e)))?;
Ok((temp_path, filename))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_s3_ingestion_request_builder() {
let request = S3IngestionRequest::new("s3://bucket/file.json".to_string())
.with_auto_execute(false)
.with_trust_distance(5)
.with_pub_key("custom".to_string());
assert_eq!(request.s3_path, "s3://bucket/file.json");
assert!(!request.auto_execute);
assert_eq!(request.trust_distance, 5);
assert_eq!(request.pub_key, "custom");
assert!(request.ingestion_config.is_none());
}
#[test]
fn test_s3_ingestion_request_with_api_key() {
let request = S3IngestionRequest::new("s3://bucket/file.json".to_string())
.with_openrouter_api_key("test-key".to_string());
assert_eq!(request.s3_path, "s3://bucket/file.json");
assert!(request.ingestion_config.is_some());
let config = request.ingestion_config.unwrap();
assert_eq!(config.openrouter.api_key, "test-key");
assert!(config.enabled);
}
#[test]
fn test_s3_ingestion_request_with_config() {
let mut custom_config = IngestionConfig::default();
custom_config.openrouter.api_key = "custom-key".to_string();
custom_config.openrouter.model = "custom-model".to_string();
custom_config.enabled = true;
let request = S3IngestionRequest::new("s3://bucket/file.json".to_string())
.with_ingestion_config(custom_config);
assert!(request.ingestion_config.is_some());
let config = request.ingestion_config.unwrap();
assert_eq!(config.openrouter.api_key, "custom-key");
assert_eq!(config.openrouter.model, "custom-model");
}
#[test]
fn test_s3_path_parsing() {
let path = "s3://my-bucket/path/to/file.json";
assert!(path.starts_with("s3://"));
let path_without_prefix = &path[5..];
let parts: Vec<&str> = path_without_prefix.splitn(2, '/').collect();
assert_eq!(parts.len(), 2);
assert_eq!(parts[0], "my-bucket");
assert_eq!(parts[1], "path/to/file.json");
}
#[test]
fn test_filename_extraction() {
let key = "path/to/file.json";
let filename = key.rsplit('/').next().unwrap_or(key);
assert_eq!(filename, "file.json");
let key2 = "file.json";
let filename2 = key2.rsplit('/').next().unwrap_or(key2);
assert_eq!(filename2, "file.json");
}
}