cognee-lib 0.1.3

Cognee — an AI-memory pipeline that turns raw data into queryable knowledge graphs (umbrella crate).
//! Data replacement API -- `update()`.
//!
//! Three-step pipeline: delete old data -> re-add new data -> re-cognify.
//!
//! Equivalent to Python's `cognee.api.v1.update.update()`.

use std::collections::HashMap;
use std::sync::Arc;

use cognee_cognify::{CognifyConfig, CognifyResult, cognify};
use cognee_database::{
    AclDb, DatabaseConnection, PipelineRunRepository, SeaOrmPipelineRunRepository,
};
use cognee_delete::{DeleteMode, DeleteRequest, DeleteResult, DeleteScope, DeleteService};
use cognee_embedding::EmbeddingEngine;
use cognee_graph::GraphDBTrait;
use cognee_ingestion::{AddParams, AddPipeline};
use cognee_llm::Llm;
use cognee_models::{Data, DataInput};
use cognee_ontology::OntologyResolver;
use cognee_storage::StorageTrait;
use cognee_vector::VectorDB;
use uuid::Uuid;

use super::error::ApiError;

/// Result of an `update()` operation.
#[derive(Debug)]
pub struct UpdateResult {
    /// ID of the data item that was deleted.
    pub deleted_data_id: Uuid,
    /// Delete phase summary.
    pub delete_result: DeleteResult,
    /// Newly added data items.
    pub new_data: Vec<Data>,
    /// Cognify phase result (optional -- only present when cognify was run).
    pub cognify_result: Option<CognifyResult>,
}

/// Replace data in a dataset: delete old -> re-add new -> re-cognify.
///
/// # Arguments
/// * `data_id` - ID of the data item to replace.
/// * `new_data` - Replacement data inputs.
/// * `dataset_id` - Explicit dataset UUID (no re-derivation from name).
/// * `dataset_name` - Dataset name (still required for delete scope).
/// * `owner_id` / `tenant_id` - Ownership context.
/// * `node_set` - Optional graph node identifiers for access-control grouping.
/// * `preferred_loaders` - Optional MIME-type-to-loader-name overrides.
/// * `incremental_loading` - When `true`, skip re-adding content already present.
/// * `acl_db` - Optional ACL backend; when provided, a `write` permission check
///   is performed before any mutation.
/// * `delete_service` - Pre-configured [`DeleteService`].
/// * `add_pipeline` - Ingestion pipeline.
/// * `llm` .. `cognify_config` - Components for the cognify phase.
///
/// # Errors
/// Returns `ApiError::PermissionDenied` when `acl_db` is set and the caller
/// lacks `write` on the dataset. Propagates errors from delete, add, or
/// cognify phases.
#[allow(clippy::too_many_arguments)]
pub async fn update(
    data_id: Uuid,
    new_data: Vec<DataInput>,
    dataset_id: Uuid,
    dataset_name: &str,
    owner_id: Uuid,
    tenant_id: Option<Uuid>,
    node_set: Option<Vec<String>>,
    preferred_loaders: Option<HashMap<String, String>>,
    incremental_loading: bool,
    acl_db: Option<&dyn AclDb>,
    delete_service: &DeleteService,
    add_pipeline: &AddPipeline,
    llm: Arc<dyn Llm>,
    storage: Arc<dyn StorageTrait>,
    graph_db: Arc<dyn GraphDBTrait>,
    vector_db: Arc<dyn VectorDB>,
    embedding_engine: Arc<dyn EmbeddingEngine>,
    db: Option<Arc<DatabaseConnection>>,
    ontology_resolver: Arc<dyn OntologyResolver>,
    cognify_config: &CognifyConfig,
) -> Result<UpdateResult, ApiError> {
    // ── Permission gate ───────────────────────────────────────────────────────
    if let Some(acl) = acl_db {
        let permitted = acl
            .has_permission(owner_id, dataset_id, "write")
            .await
            .map_err(|e| ApiError::InvalidArgument(e.to_string()))?;
        if !permitted {
            return Err(ApiError::InvalidArgument(
                "write permission denied on dataset".to_string(),
            ));
        }
    }

    // ── Step 1: Delete old data ───────────────────────────────────────────────
    let delete_request = DeleteRequest {
        scope: DeleteScope::Data {
            owner_id,
            data_id,
            dataset_name: Some(dataset_name.to_string()),
            delete_dataset_if_empty: false,
        },
        // Python update() → datasets.delete_data defaults mode="soft" (datasets.py:147).
        mode: DeleteMode::Soft,
        memory_only: false,
    };
    let delete_result = delete_service.execute(&delete_request).await?;

    // ── Step 2: Re-add new data ───────────────────────────────────────────────
    let params = AddParams {
        node_set,
        preferred_loaders,
        dataset_id: Some(dataset_id),
        importance_weight: None,
        incremental_loading,
    };
    let data_items = add_pipeline
        .add_with_params(new_data, dataset_name, owner_id, tenant_id, &params)
        .await
        .map_err(|e| ApiError::Ingestion(e.to_string()))?;

    // ── Step 3: Re-cognify (if data was added) ───────────────────────────────
    let cognify_result = if !data_items.is_empty() {
        // OSS build has no DB-backed user lookup (the `users` table is owned
        // by the closed cloud build), so we always fall back to `None`.
        // `cognify()` then uses `user_id.to_string()` as the provenance
        // stamp.
        let user_email: Option<String> = None;

        let database = db.clone().ok_or_else(|| {
            ApiError::Cognify("cognify requires a DatabaseConnection".to_string())
        })?;
        let thread_pool: Arc<dyn cognee_core::CpuPool> = Arc::new(
            cognee_core::RayonThreadPool::with_default_threads()
                .map_err(|e| ApiError::Cognify(format!("failed to construct thread pool: {e}")))?,
        );

        // Gap 08-07: persist the four-state `pipeline_runs` trail.
        let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
            Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&database)));

        // Apply `incremental_loading` flag on top of the caller-provided config.
        let effective_cognify_config;
        let cognify_config_ref = if incremental_loading != cognify_config.incremental_loading {
            effective_cognify_config = cognify_config
                .clone()
                .with_incremental_loading(incremental_loading);
            &effective_cognify_config
        } else {
            cognify_config
        };

        let result = cognify(
            data_items.clone(),
            dataset_id,
            Some(owner_id),
            user_email,
            tenant_id,
            llm,
            storage,
            graph_db,
            vector_db,
            embedding_engine,
            database,
            pipeline_run_repo,
            thread_pool,
            ontology_resolver,
            cognify_config_ref,
        )
        .await
        .map_err(|e| ApiError::Cognify(e.to_string()))?;
        Some(result)
    } else {
        None
    };

    Ok(UpdateResult {
        deleted_data_id: data_id,
        delete_result,
        new_data: data_items,
        cognify_result,
    })
}