Skip to main content

cognee_lib/api/
update.rs

1//! Data replacement API -- `update()`.
2//!
3//! Three-step pipeline: delete old data -> re-add new data -> re-cognify.
4//!
5//! Equivalent to Python's `cognee.api.v1.update.update()`.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use cognee_cognify::{CognifyConfig, CognifyResult, cognify};
11use cognee_database::{
12    AclDb, DatabaseConnection, PipelineRunRepository, SeaOrmPipelineRunRepository,
13};
14use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
15use cognee_embedding::EmbeddingEngine;
16use cognee_graph::GraphDBTrait;
17use cognee_ingestion::{AddParams, AddPipeline};
18use cognee_llm::Llm;
19use cognee_models::{Data, DataInput};
20use cognee_ontology::OntologyResolver;
21use cognee_storage::StorageTrait;
22use cognee_vector::VectorDB;
23use uuid::Uuid;
24
25use super::error::ApiError;
26
27/// Result of an `update()` operation.
28#[derive(Debug)]
29pub struct UpdateResult {
30    /// ID of the data item that was deleted.
31    pub deleted_data_id: Uuid,
32    /// Delete phase summary.
33    pub delete_result: DeleteResult,
34    /// Newly added data items.
35    pub new_data: Vec<Data>,
36    /// Cognify phase result (optional -- only present when cognify was run).
37    pub cognify_result: Option<CognifyResult>,
38}
39
40/// Replace data in a dataset: delete old -> re-add new -> re-cognify.
41///
42/// # Arguments
43/// * `data_id` - ID of the data item to replace.
44/// * `new_data` - Replacement data inputs.
45/// * `dataset_id` - Explicit dataset UUID (no re-derivation from name).
46/// * `dataset_name` - Dataset name (still required for delete scope).
47/// * `owner_id` / `tenant_id` - Ownership context.
48/// * `node_set` - Optional graph node identifiers for access-control grouping.
49/// * `preferred_loaders` - Optional MIME-type-to-loader-name overrides.
50/// * `incremental_loading` - When `true`, skip re-adding content already present.
51/// * `acl_db` - Optional ACL backend; when provided, a `write` permission check
52///   is performed before any mutation.
53/// * `delete_service` - Pre-configured [`DeleteService`].
54/// * `add_pipeline` - Ingestion pipeline.
55/// * `llm` .. `cognify_config` - Components for the cognify phase.
56///
57/// # Errors
58/// Returns `ApiError::PermissionDenied` when `acl_db` is set and the caller
59/// lacks `write` on the dataset. Propagates errors from delete, add, or
60/// cognify phases.
61#[allow(clippy::too_many_arguments)]
62pub async fn update(
63    data_id: Uuid,
64    new_data: Vec<DataInput>,
65    dataset_id: Uuid,
66    dataset_name: &str,
67    owner_id: Uuid,
68    tenant_id: Option<Uuid>,
69    node_set: Option<Vec<String>>,
70    preferred_loaders: Option<HashMap<String, String>>,
71    incremental_loading: bool,
72    acl_db: Option<&dyn AclDb>,
73    delete_service: &DeleteService,
74    add_pipeline: &AddPipeline,
75    llm: Arc<dyn Llm>,
76    storage: Arc<dyn StorageTrait>,
77    graph_db: Arc<dyn GraphDBTrait>,
78    vector_db: Arc<dyn VectorDB>,
79    embedding_engine: Arc<dyn EmbeddingEngine>,
80    db: Option<Arc<DatabaseConnection>>,
81    ontology_resolver: Arc<dyn OntologyResolver>,
82    cognify_config: &CognifyConfig,
83) -> Result<UpdateResult, ApiError> {
84    // ── Permission gate ───────────────────────────────────────────────────────
85    if let Some(acl) = acl_db {
86        let permitted = acl
87            .has_permission(owner_id, dataset_id, "write")
88            .await
89            .map_err(|e| ApiError::InvalidArgument(e.to_string()))?;
90        if !permitted {
91            return Err(ApiError::InvalidArgument(
92                "write permission denied on dataset".to_string(),
93            ));
94        }
95    }
96
97    // ── Step 1: Delete old data ───────────────────────────────────────────────
98    let delete_request = DeleteRequest {
99        scope: DeleteScope::Data {
100            owner_id,
101            data_id,
102            dataset_name: Some(dataset_name.to_string()),
103            delete_dataset_if_empty: false,
104        },
105        // Python update() → datasets.delete_data defaults mode="soft" (datasets.py:147).
106        mode: DeleteMode::Soft,
107        memory_only: false,
108    };
109    let delete_result = delete_service.execute(&delete_request).await?;
110
111    // ── Step 2: Re-add new data ───────────────────────────────────────────────
112    let params = AddParams {
113        node_set,
114        preferred_loaders,
115        dataset_id: Some(dataset_id),
116        importance_weight: None,
117        incremental_loading,
118    };
119    let data_items = add_pipeline
120        .add_with_params(new_data, dataset_name, owner_id, tenant_id, &params)
121        .await
122        .map_err(|e| ApiError::Ingestion(e.to_string()))?;
123
124    // ── Step 3: Re-cognify (if data was added) ───────────────────────────────
125    let cognify_result = if !data_items.is_empty() {
126        // OSS build has no DB-backed user lookup (the `users` table is owned
127        // by the closed cloud build), so we always fall back to `None`.
128        // `cognify()` then uses `user_id.to_string()` as the provenance
129        // stamp.
130        let user_email: Option<String> = None;
131
132        let database = db.clone().ok_or_else(|| {
133            ApiError::Cognify("cognify requires a DatabaseConnection".to_string())
134        })?;
135        let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(
136            cognee_core::RayonThreadPool::with_default_threads()
137                .map_err(|e| ApiError::Cognify(format!("failed to construct thread pool: {e}")))?,
138        );
139
140        // Gap 08-07: persist the four-state `pipeline_runs` trail.
141        let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
142            Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));
143
144        // Apply `incremental_loading` flag on top of the caller-provided config.
145        let effective_cognify_config;
146        let cognify_config_ref = if incremental_loading != cognify_config.incremental_loading {
147            effective_cognify_config = cognify_config
148                .clone()
149                .with_incremental_loading(incremental_loading);
150            &effective_cognify_config
151        } else {
152            cognify_config
153        };
154
155        let result = cognify(
156            data_items.clone(),
157            dataset_id,
158            Some(owner_id),
159            user_email,
160            tenant_id,
161            llm,
162            storage,
163            graph_db,
164            vector_db,
165            embedding_engine,
166            database,
167            pipeline_run_repo,
168            thread_pool,
169            ontology_resolver,
170            cognify_config_ref,
171        )
172        .await
173        .map_err(|e| ApiError::Cognify(e.to_string()))?;
174        Some(result)
175    } else {
176        None
177    };
178
179    Ok(UpdateResult {
180        deleted_data_id: data_id,
181        delete_result,
182        new_data: data_items,
183        cognify_result,
184    })
185}