use std::collections::HashMap;
use std::sync::Arc;
use cognee_cognify::{CognifyConfig, CognifyResult, cognify};
use cognee_database::{
AclDb, DatabaseConnection, PipelineRunRepository, SeaOrmPipelineRunRepository,
};
use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
use cognee_embedding::EmbeddingEngine;
use cognee_graph::GraphDBTrait;
use cognee_ingestion::{AddParams, AddPipeline};
use cognee_llm::Llm;
use cognee_models::{Data, DataInput};
use cognee_ontology::OntologyResolver;
use cognee_storage::StorageTrait;
use cognee_vector::VectorDB;
use uuid::Uuid;
use super::error::ApiError;
#[derive(Debug)]
pub struct UpdateResult {
pub deleted_data_id: Uuid,
pub delete_result: DeleteResult,
pub new_data: Vec<Data>,
pub cognify_result: Option<CognifyResult>,
}
#[allow(clippy::too_many_arguments)]
pub async fn update(
data_id: Uuid,
new_data: Vec<DataInput>,
dataset_id: Uuid,
dataset_name: &str,
owner_id: Uuid,
tenant_id: Option<Uuid>,
node_set: Option<Vec<String>>,
preferred_loaders: Option<HashMap<String, String>>,
incremental_loading: bool,
acl_db: Option<&dyn AclDb>,
delete_service: &DeleteService,
add_pipeline: &AddPipeline,
llm: Arc<dyn Llm>,
storage: Arc<dyn StorageTrait>,
graph_db: Arc<dyn GraphDBTrait>,
vector_db: Arc<dyn VectorDB>,
embedding_engine: Arc<dyn EmbeddingEngine>,
db: Option<Arc<DatabaseConnection>>,
ontology_resolver: Arc<dyn OntologyResolver>,
cognify_config: &CognifyConfig,
) -> Result<UpdateResult, ApiError> {
if let Some(acl) = acl_db {
let permitted = acl
.has_permission(owner_id, dataset_id, "write")
.await
.map_err(|e| ApiError::InvalidArgument(e.to_string()))?;
if !permitted {
return Err(ApiError::InvalidArgument(
"write permission denied on dataset".to_string(),
));
}
}
let delete_request = DeleteRequest {
scope: DeleteScope::Data {
owner_id,
data_id,
dataset_name: Some(dataset_name.to_string()),
delete_dataset_if_empty: false,
},
mode: DeleteMode::Soft,
memory_only: false,
};
let delete_result = delete_service.execute(&delete_request).await?;
let params = AddParams {
node_set,
preferred_loaders,
dataset_id: Some(dataset_id),
importance_weight: None,
incremental_loading,
};
let data_items = add_pipeline
.add_with_params(new_data, dataset_name, owner_id, tenant_id, ¶ms)
.await
.map_err(|e| ApiError::Ingestion(e.to_string()))?;
let cognify_result = if !data_items.is_empty() {
let user_email: Option<String> = None;
let database = db.clone().ok_or_else(|| {
ApiError::Cognify("cognify requires a DatabaseConnection".to_string())
})?;
let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(
cognee_core::RayonThreadPool::with_default_threads()
.map_err(|e| ApiError::Cognify(format!("failed to construct thread pool: {e}")))?,
);
let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
let effective_cognify_config;
let cognify_config_ref = if incremental_loading != cognify_config.incremental_loading {
effective_cognify_config = cognify_config
.clone()
.with_incremental_loading(incremental_loading);
&effective_cognify_config
} else {
cognify_config
};
let result = cognify(
data_items.clone(),
dataset_id,
Some(owner_id),
user_email,
tenant_id,
llm,
storage,
graph_db,
vector_db,
embedding_engine,
database,
pipeline_run_repo,
thread_pool,
ontology_resolver,
cognify_config_ref,
)
.await
.map_err(|e| ApiError::Cognify(e.to_string()))?;
Some(result)
} else {
None
};
Ok(UpdateResult {
deleted_data_id: data_id,
delete_result,
new_data: data_items,
cognify_result,
})
}