use serde_json::Value;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::datafold_node::DataFoldNode;
use crate::ingestion::config::IngestionConfig;
use crate::ingestion::core::IngestionRequest;
use crate::ingestion::progress::ProgressService;
use crate::ingestion::simple_service::SimpleIngestionService;
use crate::ingestion::IngestionError;
use crate::ingestion::ProgressTracker;
use crate::log_feature;
use crate::logging::features::LogFeature;
pub struct IngestionSpawnConfig {
pub json_data: Value,
pub auto_execute: bool,
pub trust_distance: u32,
pub pub_key: String,
pub source_file_name: Option<String>,
pub ingestion_config: IngestionConfig,
}
pub async fn spawn_background_ingestion(
config: IngestionSpawnConfig,
progress_tracker: &ProgressTracker,
node: Arc<RwLock<DataFoldNode>>,
) -> String {
let progress_id = uuid::Uuid::new_v4().to_string();
let progress_service = ProgressService::new(progress_tracker.clone());
progress_service.start_progress(progress_id.clone()).await;
let ingestion_request = IngestionRequest {
data: config.json_data,
auto_execute: Some(config.auto_execute),
trust_distance: Some(config.trust_distance),
pub_key: Some(config.pub_key),
source_file_name: config.source_file_name,
};
let progress_id_clone = progress_id.clone();
let ingestion_config = config.ingestion_config;
tokio::spawn(async move {
if let Err(e) = run_background_ingestion(
ingestion_request,
node,
progress_service,
progress_id_clone,
ingestion_config,
)
.await
{
log_feature!(
LogFeature::Ingestion,
error,
"Background ingestion setup failed: {}",
e
);
}
});
progress_id
}
async fn run_background_ingestion(
ingestion_request: IngestionRequest,
node: Arc<RwLock<DataFoldNode>>,
progress_service: ProgressService,
progress_id: String,
ingestion_config: IngestionConfig,
) -> Result<(), String> {
log_feature!(
LogFeature::Ingestion,
info,
"Starting background ingestion for uploaded file with progress_id: {}",
progress_id
);
let service = match create_simple_ingestion_service(ingestion_config).await {
Ok(s) => s,
Err(e) => {
let error_msg = format!("Ingestion service not available: {}", e);
log_feature!(
LogFeature::Ingestion,
error,
"Failed to initialize ingestion service: {}",
e
);
progress_service
.fail_progress(&progress_id, error_msg.clone())
.await;
return Err(error_msg);
}
};
{
let node_guard = node.read().await;
match service
.process_json_with_node_and_progress(
ingestion_request,
&node_guard,
&progress_service,
progress_id.clone(),
)
.await
{
Ok(response) => {
if response.success {
log_feature!(
LogFeature::Ingestion,
info,
"File ingestion completed successfully: {}",
progress_id
);
} else {
log_feature!(
LogFeature::Ingestion,
error,
"File ingestion failed: {:?}",
response.errors
);
}
Ok(())
}
Err(e) => {
let error_msg = format!("Processing failed: {}", e);
log_feature!(
LogFeature::Ingestion,
error,
"File ingestion processing failed: {}",
e
);
progress_service
.fail_progress(&progress_id, error_msg.clone())
.await;
Err(error_msg)
}
}
}
}
async fn create_simple_ingestion_service(
config: IngestionConfig,
) -> Result<SimpleIngestionService, IngestionError> {
SimpleIngestionService::new(config)
}