1use std::collections::HashMap;
8use std::sync::Arc;
9
10use cognee_cognify::{CognifyConfig, CognifyResult, cognify};
11use cognee_database::{
12 AclDb, DatabaseConnection, PipelineRunRepository, SeaOrmPipelineRunRepository,
13};
14use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
15use cognee_embedding::EmbeddingEngine;
16use cognee_graph::GraphDBTrait;
17use cognee_ingestion::{AddParams, AddPipeline};
18use cognee_llm::Llm;
19use cognee_models::{Data, DataInput};
20use cognee_ontology::OntologyResolver;
21use cognee_storage::StorageTrait;
22use cognee_vector::VectorDB;
23use uuid::Uuid;
24
25use super::error::ApiError;
26
27#[derive(Debug)]
29pub struct UpdateResult {
30 pub deleted_data_id: Uuid,
32 pub delete_result: DeleteResult,
34 pub new_data: Vec<Data>,
36 pub cognify_result: Option<CognifyResult>,
38}
39
40#[allow(clippy::too_many_arguments)]
62pub async fn update(
63 data_id: Uuid,
64 new_data: Vec<DataInput>,
65 dataset_id: Uuid,
66 dataset_name: &str,
67 owner_id: Uuid,
68 tenant_id: Option<Uuid>,
69 node_set: Option<Vec<String>>,
70 preferred_loaders: Option<HashMap<String, String>>,
71 incremental_loading: bool,
72 acl_db: Option<&dyn AclDb>,
73 delete_service: &DeleteService,
74 add_pipeline: &AddPipeline,
75 llm: Arc<dyn Llm>,
76 storage: Arc<dyn StorageTrait>,
77 graph_db: Arc<dyn GraphDBTrait>,
78 vector_db: Arc<dyn VectorDB>,
79 embedding_engine: Arc<dyn EmbeddingEngine>,
80 db: Option<Arc<DatabaseConnection>>,
81 ontology_resolver: Arc<dyn OntologyResolver>,
82 cognify_config: &CognifyConfig,
83) -> Result<UpdateResult, ApiError> {
84 if let Some(acl) = acl_db {
86 let permitted = acl
87 .has_permission(owner_id, dataset_id, "write")
88 .await
89 .map_err(|e| ApiError::InvalidArgument(e.to_string()))?;
90 if !permitted {
91 return Err(ApiError::InvalidArgument(
92 "write permission denied on dataset".to_string(),
93 ));
94 }
95 }
96
97 let delete_request = DeleteRequest {
99 scope: DeleteScope::Data {
100 owner_id,
101 data_id,
102 dataset_name: Some(dataset_name.to_string()),
103 delete_dataset_if_empty: false,
104 },
105 mode: DeleteMode::Soft,
107 memory_only: false,
108 };
109 let delete_result = delete_service.execute(&delete_request).await?;
110
111 let params = AddParams {
113 node_set,
114 preferred_loaders,
115 dataset_id: Some(dataset_id),
116 importance_weight: None,
117 incremental_loading,
118 };
119 let data_items = add_pipeline
120 .add_with_params(new_data, dataset_name, owner_id, tenant_id, ¶ms)
121 .await
122 .map_err(|e| ApiError::Ingestion(e.to_string()))?;
123
124 let cognify_result = if !data_items.is_empty() {
126 let user_email: Option<String> = None;
131
132 let database = db.clone().ok_or_else(|| {
133 ApiError::Cognify("cognify requires a DatabaseConnection".to_string())
134 })?;
135 let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(
136 cognee_core::RayonThreadPool::with_default_threads()
137 .map_err(|e| ApiError::Cognify(format!("failed to construct thread pool: {e}")))?,
138 );
139
140 let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
142 Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
143
144 let effective_cognify_config;
146 let cognify_config_ref = if incremental_loading != cognify_config.incremental_loading {
147 effective_cognify_config = cognify_config
148 .clone()
149 .with_incremental_loading(incremental_loading);
150 &effective_cognify_config
151 } else {
152 cognify_config
153 };
154
155 let result = cognify(
156 data_items.clone(),
157 dataset_id,
158 Some(owner_id),
159 user_email,
160 tenant_id,
161 llm,
162 storage,
163 graph_db,
164 vector_db,
165 embedding_engine,
166 database,
167 pipeline_run_repo,
168 thread_pool,
169 ontology_resolver,
170 cognify_config_ref,
171 )
172 .await
173 .map_err(|e| ApiError::Cognify(e.to_string()))?;
174 Some(result)
175 } else {
176 None
177 };
178
179 Ok(UpdateResult {
180 deleted_data_id: data_id,
181 delete_result,
182 new_data: data_items,
183 cognify_result,
184 })
185}