use crate::datafold_node::DataFoldNode;
use crate::ingestion::config::AIProvider;
use crate::ingestion::core::IngestionRequest;
use crate::ingestion::mutation_generator::MutationGenerator;
use crate::ingestion::ollama_service::OllamaService;
use crate::ingestion::openrouter_service::OpenRouterService;
use crate::ingestion::progress::{IngestionResults, IngestionStep, ProgressService};
use crate::ingestion::{
AISchemaResponse, IngestionConfig, IngestionError, IngestionResponse, IngestionResult,
IngestionStatus, SimplifiedSchema, SimplifiedSchemaMap,
};
use crate::log_feature;
use crate::logging::features::LogFeature;
use crate::schema::types::Mutation;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
struct SchemaCacheEntry {
schemas: SimplifiedSchemaMap,
timestamp: Instant,
}
pub struct SimpleIngestionService {
config: IngestionConfig,
openrouter_service: Option<OpenRouterService>,
ollama_service: Option<OllamaService>,
mutation_generator: MutationGenerator,
schema_cache: Arc<Mutex<Option<SchemaCacheEntry>>>,
}
impl SimpleIngestionService {
pub fn new(config: IngestionConfig) -> IngestionResult<Self> {
let openrouter_service = if config.provider == AIProvider::OpenRouter {
Some(OpenRouterService::new(
config.openrouter.clone(),
config.timeout_seconds,
config.max_retries,
)?)
} else {
None
};
let ollama_service = if config.provider == AIProvider::Ollama {
Some(OllamaService::new(
config.ollama.clone(),
config.timeout_seconds,
config.max_retries,
)?)
} else {
None
};
let mutation_generator = MutationGenerator::new();
Ok(Self {
config,
openrouter_service,
ollama_service,
mutation_generator,
schema_cache: Arc::new(Mutex::new(None)),
})
}
pub async fn process_json_with_node_and_progress(
&self,
request: IngestionRequest,
node: &DataFoldNode,
progress_service: &ProgressService,
progress_id: String,
) -> IngestionResult<IngestionResponse> {
log_feature!(
LogFeature::Ingestion,
info,
"Starting JSON ingestion process with DataFoldNode (progress_id: {})",
progress_id
);
if !self.config.is_ready() {
progress_service
.fail_progress(
&progress_id,
"Ingestion module is not properly configured or disabled".to_string(),
)
.await;
return Ok(IngestionResponse::failure(vec![
"Ingestion module is not properly configured or disabled".to_string(),
]));
}
progress_service
.update_progress(
&progress_id,
IngestionStep::ValidatingConfig,
"Validating input data...".to_string(),
)
.await;
self.validate_input(&request.data)?;
progress_service
.update_progress(
&progress_id,
IngestionStep::PreparingSchemas,
"Preparing available schemas...".to_string(),
)
.await;
let available_schemas = self.get_stripped_available_schemas_from_node(node).await?;
progress_service
.update_progress(
&progress_id,
IngestionStep::FlatteningData,
"Processing and flattening data structure...".to_string(),
)
.await;
let flattened_data = self.flatten_twitter_data(&request.data);
progress_service
.update_progress(
&progress_id,
IngestionStep::GettingAIRecommendation,
"Analyzing data with AI to determine schema...".to_string(),
)
.await;
let ai_response = self
.get_ai_recommendation(&flattened_data, &available_schemas)
.await?;
progress_service
.update_progress(
&progress_id,
IngestionStep::SettingUpSchema,
"Setting up schema and preparing for data storage...".to_string(),
)
.await;
let will_use_existing = !ai_response.existing_schemas.is_empty();
let schema_name = self
.determine_schema_to_use(&ai_response, &request.data, node)
.await?;
let new_schema_created = ai_response.new_schemas.is_some() && !will_use_existing;
progress_service
.update_progress(
&progress_id,
IngestionStep::GeneratingMutations,
"Generating database mutations...".to_string(),
)
.await;
let mutations = if let Some(array) = flattened_data.as_array() {
let total_items = array.len();
let mut all_mutations = Vec::new();
for (idx, item) in array.iter().enumerate() {
let fields_and_values = if let Some(obj) = item.as_object() {
obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
log_feature!(
LogFeature::Ingestion,
warn,
"Array item {} is not an object, skipping",
idx
);
continue;
};
let mutations = self.mutation_generator.generate_mutations(
&schema_name,
&HashMap::new(),
&fields_and_values,
&ai_response.mutation_mappers,
request
.trust_distance
.unwrap_or(self.config.default_trust_distance),
request
.pub_key
.clone()
.unwrap_or_else(|| "default".to_string()),
request.source_file_name.clone(),
)?;
all_mutations.extend(mutations);
if (idx + 1) % 10 == 0 || idx + 1 == total_items {
let percent_of_step = ((idx + 1) as f32 / total_items as f32 * 15.0) as u8;
let progress_percent = 75 + percent_of_step;
progress_service
.update_progress_with_percentage(
&progress_id,
IngestionStep::GeneratingMutations,
format!("Generating mutations... ({}/{})", idx + 1, total_items),
progress_percent,
)
.await;
}
}
all_mutations
} else {
let fields_and_values = if let Some(obj) = flattened_data.as_object() {
obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
HashMap::new()
};
self.mutation_generator.generate_mutations(
&schema_name,
&HashMap::new(),
&fields_and_values,
&ai_response.mutation_mappers,
request
.trust_distance
.unwrap_or(self.config.default_trust_distance),
request
.pub_key
.clone()
.unwrap_or_else(|| "default".to_string()),
request.source_file_name.clone(),
)?
};
log_feature!(
LogFeature::Ingestion,
info,
"Generated {} mutations",
mutations.len()
);
progress_service
.update_progress(
&progress_id,
IngestionStep::ExecutingMutations,
"Executing mutations to store data...".to_string(),
)
.await;
let mutations_len = mutations.len();
let mutations_executed = if request
.auto_execute
.unwrap_or(self.config.auto_execute_mutations)
{
self.execute_mutations_with_node_and_progress(
mutations,
node,
progress_service,
&progress_id,
)
.await?
} else {
0
};
let results = IngestionResults {
schema_name: schema_name.clone(),
new_schema_created,
mutations_generated: mutations_len,
mutations_executed,
};
progress_service
.complete_progress(&progress_id, results)
.await;
Ok(IngestionResponse::success_with_progress(
progress_id,
schema_name,
new_schema_created,
mutations_len,
mutations_executed,
))
}
pub async fn process_json_with_node(
&self,
request: IngestionRequest,
node: &DataFoldNode,
) -> IngestionResult<IngestionResponse> {
log_feature!(
LogFeature::Ingestion,
info,
"Starting JSON ingestion process with DataFoldNode"
);
if !self.config.is_ready() {
return Ok(IngestionResponse::failure(vec![
"Ingestion module is not properly configured or disabled".to_string(),
]));
}
self.validate_input(&request.data)?;
let flattened_data = self.flatten_twitter_data(&request.data);
let available_schemas = self.get_stripped_available_schemas_from_node(node).await?;
let ai_response = self
.get_ai_recommendation(&flattened_data, &available_schemas)
.await?;
let will_use_existing = !ai_response.existing_schemas.is_empty();
let schema_name = self
.determine_schema_to_use(&ai_response, &request.data, node)
.await?;
let new_schema_created = ai_response.new_schemas.is_some() && !will_use_existing;
let mutations = if let Some(array) = flattened_data.as_array() {
let mut all_mutations = Vec::new();
for (idx, item) in array.iter().enumerate() {
let fields_and_values = if let Some(obj) = item.as_object() {
obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
log_feature!(
LogFeature::Ingestion,
warn,
"Array item {} is not an object, skipping",
idx
);
continue;
};
let mutations = self.mutation_generator.generate_mutations(
&schema_name,
&HashMap::new(),
&fields_and_values,
&ai_response.mutation_mappers,
request
.trust_distance
.unwrap_or(self.config.default_trust_distance),
request
.pub_key
.clone()
.unwrap_or_else(|| "default".to_string()),
request.source_file_name.clone(),
)?;
all_mutations.extend(mutations);
}
all_mutations
} else {
let fields_and_values = if let Some(obj) = flattened_data.as_object() {
obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
HashMap::new()
};
self.mutation_generator.generate_mutations(
&schema_name,
&HashMap::new(),
&fields_and_values,
&ai_response.mutation_mappers,
request
.trust_distance
.unwrap_or(self.config.default_trust_distance),
request
.pub_key
.clone()
.unwrap_or_else(|| "default".to_string()),
request.source_file_name.clone(),
)?
};
log_feature!(
LogFeature::Ingestion,
info,
"Generated {} mutations",
mutations.len()
);
let mutations_len = mutations.len();
let mutations_executed = if request
.auto_execute
.unwrap_or(self.config.auto_execute_mutations)
{
self.execute_mutations_with_node(mutations, node).await?
} else {
0
};
Ok(IngestionResponse::success(
schema_name,
new_schema_created,
mutations_len,
mutations_executed,
))
}
async fn get_ai_recommendation(
&self,
json_data: &Value,
available_schemas: &SimplifiedSchemaMap,
) -> IngestionResult<AISchemaResponse> {
let schemas_json = available_schemas.to_json_value();
match self.config.provider {
AIProvider::OpenRouter => {
self.openrouter_service
.as_ref()
.ok_or_else(|| {
IngestionError::configuration_error("OpenRouter service not initialized")
})?
.get_schema_recommendation(json_data, &schemas_json)
.await
}
AIProvider::Ollama => {
self.ollama_service
.as_ref()
.ok_or_else(|| {
IngestionError::configuration_error("Ollama service not initialized")
})?
.get_schema_recommendation(json_data, &schemas_json)
.await
}
}
}
pub fn validate_input(&self, data: &Value) -> IngestionResult<()> {
if data.is_null() {
return Err(IngestionError::invalid_input("Input data cannot be null"));
}
if !data.is_object() && !data.is_array() {
return Err(IngestionError::invalid_input(
"Input data must be a JSON object or array",
));
}
Ok(())
}
pub fn get_status(&self) -> IngestionResult<IngestionStatus> {
let (provider_name, model) = match self.config.provider {
AIProvider::OpenRouter => (
"OpenRouter".to_string(),
self.config.openrouter.model.clone(),
),
AIProvider::Ollama => ("Ollama".to_string(), self.config.ollama.model.clone()),
};
Ok(IngestionStatus {
enabled: self.config.enabled,
configured: self.config.is_ready(),
provider: provider_name,
model,
auto_execute_mutations: self.config.auto_execute_mutations,
default_trust_distance: self.config.default_trust_distance,
})
}
async fn get_stripped_available_schemas_from_node(
&self,
node: &DataFoldNode,
) -> IngestionResult<SimplifiedSchemaMap> {
const CACHE_TTL: Duration = Duration::from_secs(30);
{
let cache_guard = self.schema_cache.lock().await;
if let Some(cache_entry) = cache_guard.as_ref() {
if cache_entry.timestamp.elapsed() < CACHE_TTL {
log_feature!(
LogFeature::Ingestion,
info,
"Using cached schemas ({} schemas, {}s old)",
cache_entry.schemas.len(),
cache_entry.timestamp.elapsed().as_secs()
);
return Ok(cache_entry.schemas.clone());
}
}
}
log_feature!(
LogFeature::Ingestion,
info,
"Cache miss or expired, fetching schemas from schema service"
);
let schemas = {
node.fetch_available_schemas().await.map_err(|e| {
IngestionError::SchemaSystemError(crate::schema::SchemaError::InvalidData(format!(
"Failed to fetch schemas from schema service: {}",
e
)))
})?
};
let mut schema_map = SimplifiedSchemaMap::new();
for schema in schemas {
let mut fields: HashMap<String, Value> = HashMap::new();
for (field_name, topology) in &schema.field_topologies {
if let Ok(topology_value) = serde_json::to_value(topology) {
fields.insert(field_name.clone(), topology_value);
}
}
let simplified = SimplifiedSchema {
name: schema.name.clone(),
fields,
};
schema_map.insert(schema.name.clone(), simplified);
}
{
let mut cache_guard = self.schema_cache.lock().await;
*cache_guard = Some(SchemaCacheEntry {
schemas: schema_map.clone(),
timestamp: Instant::now(),
});
}
log_feature!(
LogFeature::Ingestion,
info,
"Cached {} schemas for future requests",
schema_map.len()
);
Ok(schema_map)
}
async fn determine_schema_to_use(
&self,
ai_response: &AISchemaResponse,
sample_data: &Value,
node: &DataFoldNode,
) -> IngestionResult<String> {
if !ai_response.existing_schemas.is_empty() {
let schema_name = &ai_response.existing_schemas[0];
log_feature!(
LogFeature::Ingestion,
info,
"Using existing schema: {}",
schema_name
);
self.ensure_schema_has_topologies_with_node(
schema_name,
sample_data,
&ai_response.mutation_mappers,
node,
)
.await?;
let schema_manager = {
let db_guard = node
.get_fold_db()
.await
.map_err(|error| IngestionError::SchemaCreationError(error.to_string()))?;
db_guard.schema_manager.clone()
};
schema_manager
.approve(schema_name)
.await
.map_err(|error| IngestionError::SchemaCreationError(error.to_string()))?;
return Ok(schema_name.clone());
}
if let Some(new_schema_def) = &ai_response.new_schemas {
let schema_name = self
.create_new_schema_with_node(new_schema_def, sample_data, node)
.await?;
return Ok(schema_name);
}
Err(IngestionError::ai_response_validation_error(
"AI response contains neither existing schemas nor new schema definition",
))
}
async fn create_new_schema_with_node(
&self,
schema_def: &Value,
sample_data: &Value,
node: &DataFoldNode,
) -> IngestionResult<String> {
let mut schema: crate::schema::types::Schema = serde_json::from_value(schema_def.clone())
.map_err(|error| {
IngestionError::SchemaCreationError(format!(
"Failed to deserialize schema from AI response: {}",
error
))
})?;
log_feature!(
LogFeature::Ingestion,
info,
"Deserialized schema with {} field topologies from AI",
schema.field_topologies.len()
);
if schema.field_topologies.is_empty() {
log_feature!(
LogFeature::Ingestion,
warn,
"AI did not provide field_topologies, inferring from sample data"
);
let sample_for_topology = if let Some(array) = sample_data.as_array() {
array.first().unwrap_or(sample_data)
} else {
sample_data
};
if let Some(sample_obj) = sample_for_topology.as_object() {
let sample_map: std::collections::HashMap<String, serde_json::Value> = sample_obj
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
schema.infer_topologies_from_data(&sample_map);
log_feature!(
LogFeature::Ingestion,
info,
"Inferred topologies for {} fields (no AI topologies)",
sample_map.len()
);
}
}
for topology in schema.field_topologies.values_mut() {
if let crate::schema::types::topology::TopologyNode::Primitive {
value: crate::schema::types::topology::PrimitiveValueType::String,
classifications,
} = &mut topology.root
{
if classifications.is_none()
|| classifications
.as_ref()
.map(|c| c.is_empty())
.unwrap_or(false)
{
*classifications = Some(vec!["word".to_string()]);
crate::log_feature!(
crate::logging::features::LogFeature::Ingestion,
info,
"Added default 'word' classification to string field"
);
}
}
}
schema.compute_schema_topology_hash();
let topology_hash = schema
.get_topology_hash()
.ok_or_else(|| {
IngestionError::SchemaCreationError(
"Schema must have topology_hash computed".to_string(),
)
})?
.clone();
schema.name = topology_hash.clone();
let schema_response = {
node.add_schema_to_service(&schema).await.map_err(|error| {
IngestionError::SchemaCreationError(format!(
"Failed to create schema via schema service: {}",
error
))
})?
};
let json_str = serde_json::to_string(&schema_response).map_err(|error| {
IngestionError::schema_parsing_error(format!(
"Failed to serialize schema definition: {}",
error
))
})?;
let schema_manager = {
let db_guard = node
.get_fold_db()
.await
.map_err(|error| IngestionError::SchemaCreationError(error.to_string()))?;
let manager = db_guard.schema_manager.clone();
drop(db_guard);
manager
};
match schema_manager.load_schema_from_json(&json_str).await {
Ok(_) => {}
Err(error) => return Err(IngestionError::SchemaCreationError(error.to_string())),
};
schema_manager
.approve(&schema_response.name)
.await
.map_err(|error| IngestionError::SchemaCreationError(error.to_string()))?;
Ok(schema_response.name)
}
async fn ensure_schema_has_topologies_with_node(
&self,
schema_name: &str,
sample_data: &Value,
mutation_mappers: &HashMap<String, String>,
node: &DataFoldNode,
) -> IngestionResult<()> {
let mut schema = {
let db_guard = node
.get_fold_db()
.await
.map_err(|error| IngestionError::SchemaCreationError(error.to_string()))?;
let schema = db_guard
.schema_manager
.get_schema(schema_name)
.map_err(|e| {
IngestionError::SchemaSystemError(crate::schema::SchemaError::InvalidData(
format!("Failed to fetch schema '{}': {}", schema_name, e),
))
})?
.ok_or_else(|| {
IngestionError::SchemaSystemError(crate::schema::SchemaError::InvalidData(
format!("Schema '{}' not found", schema_name),
))
})?
.clone();
drop(db_guard);
schema
};
let fields_to_check: Vec<String> = mutation_mappers
.values()
.filter_map(|mapper| {
mapper.split('.').nth(1).map(|s| s.to_string())
})
.collect();
let mut needs_update = false;
for field_name in &fields_to_check {
if !schema.has_field_topology(field_name) {
needs_update = true;
log_feature!(
LogFeature::Ingestion,
info,
"Schema '{}' is missing topology for field '{}'",
schema_name,
field_name
);
break;
} else if let Some(topology) = schema.field_topologies.get(field_name) {
if let crate::schema::types::topology::TopologyNode::Primitive {
value: crate::schema::types::topology::PrimitiveValueType::String,
classifications,
} = &topology.root
{
if classifications.is_none()
|| classifications
.as_ref()
.map(|c| c.is_empty())
.unwrap_or(false)
{
needs_update = true;
log_feature!(
LogFeature::Ingestion,
info,
"Schema '{}' field '{}' is missing 'word' classification",
schema_name,
field_name
);
break;
}
}
}
}
if !needs_update {
log_feature!(
LogFeature::Ingestion,
info,
"Schema '{}' already has topologies for all required fields",
schema_name
);
return Ok(());
}
let sample_for_topology = if let Some(array) = sample_data.as_array() {
array.first().unwrap_or(sample_data)
} else {
sample_data
};
if let Some(sample_obj) = sample_for_topology.as_object() {
let sample_map: std::collections::HashMap<String, serde_json::Value> = sample_obj
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
schema.infer_topologies_from_data(&sample_map);
log_feature!(
LogFeature::Ingestion,
info,
"Inferred topologies for {} fields in schema '{}' from sample data",
sample_map.len(),
schema_name
);
for topology in schema.field_topologies.values_mut() {
if let crate::schema::types::topology::TopologyNode::Primitive {
value: crate::schema::types::topology::PrimitiveValueType::String,
classifications,
} = &mut topology.root
{
if classifications.is_none()
|| classifications
.as_ref()
.map(|c| c.is_empty())
.unwrap_or(false)
{
*classifications = Some(vec!["word".to_string()]);
crate::log_feature!(
crate::logging::features::LogFeature::Ingestion,
info,
"Added default 'word' classification to string field in existing schema"
);
}
}
}
{
node.add_schema_to_service(&schema).await.map_err(|e| {
IngestionError::SchemaSystemError(crate::schema::SchemaError::InvalidData(
format!(
"Failed to update schema '{}' with topologies: {}",
schema_name, e
),
))
})?;
}
let json_str = serde_json::to_string(&schema).map_err(|error| {
IngestionError::schema_parsing_error(format!(
"Failed to serialize updated schema: {}",
error
))
})?;
let schema_manager = {
let db_guard = node
.get_fold_db()
.await
.map_err(|error| IngestionError::SchemaCreationError(error.to_string()))?;
let manager = db_guard.schema_manager.clone();
drop(db_guard);
manager
};
match schema_manager.load_schema_from_json(&json_str).await {
Ok(_) => {}
Err(error) => return Err(IngestionError::SchemaCreationError(error.to_string())),
};
log_feature!(
LogFeature::Ingestion,
info,
"Updated schema '{}' with inferred topologies",
schema_name
);
}
Ok(())
}
async fn execute_mutations_with_node_and_progress(
&self,
mutations: Vec<Mutation>,
node: &DataFoldNode,
progress_service: &ProgressService,
progress_id: &str,
) -> IngestionResult<usize> {
if mutations.is_empty() {
return Ok(0);
}
let total_mutations = mutations.len();
for (idx, _) in mutations.iter().enumerate() {
if (idx + 1) % 5 == 0 || idx + 1 == total_mutations {
let percent_of_step = ((idx + 1) as f32 / total_mutations as f32 * 5.0) as u8;
let progress_percent = 90 + percent_of_step;
progress_service
.update_progress_with_percentage(
progress_id,
IngestionStep::ExecutingMutations,
format!("Executing mutations... ({}/{})", idx + 1, total_mutations),
progress_percent,
)
.await;
}
}
node.mutate_batch(mutations)
.await
.map(|mutation_ids| mutation_ids.len())
.map_err(|e| {
IngestionError::SchemaSystemError(crate::schema::SchemaError::InvalidData(
e.to_string(),
))
})
}
async fn execute_mutations_with_node(
&self,
mutations: Vec<Mutation>,
node: &DataFoldNode,
) -> IngestionResult<usize> {
if mutations.is_empty() {
return Ok(0);
}
node.mutate_batch(mutations)
.await
.map(|mutation_ids| mutation_ids.len())
.map_err(|e| {
IngestionError::SchemaSystemError(crate::schema::SchemaError::InvalidData(
e.to_string(),
))
})
}
fn flatten_twitter_data(&self, data: &Value) -> Value {
if let Some(array) = data.as_array() {
let flattened_items: Vec<Value> = array
.iter()
.map(|item| {
if let Some(obj) = item.as_object() {
if obj.len() == 1 {
let (_wrapper_key, inner_value) = obj.iter().next().unwrap();
if let Some(inner_obj) = inner_value.as_object() {
Value::Object(inner_obj.clone())
} else {
Value::Object(obj.clone())
}
} else {
Value::Object(obj.clone())
}
} else {
item.clone()
}
})
.collect();
Value::Array(flattened_items)
} else if let Some(obj) = data.as_object() {
if obj.len() == 1 {
let (wrapper_key, inner_value) = obj.iter().next().unwrap();
if let Some(inner_obj) = inner_value.as_object() {
log_feature!(
LogFeature::Ingestion,
info,
"Flattening Twitter data: extracting inner object from wrapper '{}'",
wrapper_key
);
Value::Object(inner_obj.clone())
} else {
Value::Object(obj.clone())
}
} else {
Value::Object(obj.clone())
}
} else {
data.clone()
}
}
}