Skip to main content

cognee_cognify/memify/
pipeline.rs

1//! Memify pipeline orchestration.
2//!
3//! The `memify` function extracts triplets from an existing knowledge graph
4//! and indexes them into the vector database for semantic search.
5
6use std::sync::Arc;
7
8use cognee_core::pipeline::DataIdFn;
9use cognee_core::pipeline_run_registry::DbPipelineWatcher;
10use cognee_core::task::Value;
11use cognee_core::{
12    CpuPool, Pipeline, PipelineBuilder, PipelineContext, TaskContextBuilder, TypedTask,
13};
14use cognee_database::{DatabaseConnection, PipelineRunRepository};
15use cognee_embedding::EmbeddingEngine;
16use cognee_graph::GraphDBTrait;
17use cognee_models::Triplet;
18use cognee_vector::VectorDB;
19use tracing::info;
20use uuid::Uuid;
21
22use super::config::MemifyConfig;
23use super::error::MemifyError;
24use super::extract_triplets::extract_triplets_from_graph_db;
25use super::index_triplets::{IndexResult, index_triplets};
26use crate::qualification::{Qualification, check_pipeline_run_qualification};
27
28/// Result of the memify pipeline.
29#[derive(Debug, Clone)]
30pub struct MemifyResult {
31    /// Number of triplets extracted from the graph.
32    pub triplet_count: usize,
33
34    /// Details about vector indexing.
35    pub index_result: IndexResult,
36
37    /// `true` when this result was synthesised by the
38    /// `check_pipeline_run_qualification` short-circuit (latest
39    /// `pipeline_runs` row was `COMPLETED`). All other fields are zero.
40    pub already_completed: bool,
41
42    /// The `pipeline_run_id` of the prior completed run that triggered the
43    /// short-circuit. `None` on normal results.
44    pub prior_pipeline_run_id: Option<Uuid>,
45}
46
47impl MemifyResult {
48    /// Create an empty result with zeroed counts.
49    pub fn empty() -> Self {
50        Self {
51            triplet_count: 0,
52            index_result: IndexResult {
53                indexed_count: 0,
54                batch_count: 0,
55            },
56            already_completed: false,
57            prior_pipeline_run_id: None,
58        }
59    }
60
61    /// Create a short-circuit "already completed" result tagged with the
62    /// prior `pipeline_run_id`. Mirrors
63    /// [`crate::CognifyResult::already_completed`]. See doc 08-08 §4.4.
64    pub fn already_completed(pipeline_run_id: Uuid) -> Self {
65        Self {
66            already_completed: true,
67            prior_pipeline_run_id: Some(pipeline_run_id),
68            ..Self::empty()
69        }
70    }
71}
72
73// ---------------------------------------------------------------------------
74// Typed task: Vec<Triplet> -> IndexResult
75// ---------------------------------------------------------------------------
76
77/// Build the one-task closure that indexes a pre-extracted batch of
78/// [`Triplet`]s into the vector database.
79///
80/// Locked Decision 8 (LIB-06, 2026-05-13): memify's executor-routed
81/// pipeline is a single "index-only" task whose input is the
82/// `Vec<Triplet>` produced by the pre-flight extraction step in
83/// [`memify`].
84fn make_index_triplets_task(
85    vector_db: Arc<dyn VectorDB>,
86    embedding_engine: Arc<dyn EmbeddingEngine>,
87    dataset_id: Option<Uuid>,
88    user_id: Option<Uuid>,
89    tenant_id: Option<Uuid>,
90) -> TypedTask<Vec<Triplet>, IndexResult> {
91    TypedTask::async_fn(move |triplets: &Vec<Triplet>, _ctx| {
92        let triplets = triplets.clone();
93        let vector_db = Arc::clone(&vector_db);
94        let embedding_engine = Arc::clone(&embedding_engine);
95        Box::pin(async move {
96            index_triplets(
97                &triplets,
98                &*vector_db,
99                &*embedding_engine,
100                dataset_id,
101                user_id,
102                tenant_id,
103            )
104            .await
105            .map(Box::new)
106            .map_err(|e| format!("{e}").into())
107        })
108    })
109}
110
111/// Build the executor-routed memify pipeline.
112///
113/// One task: `Vec<Triplet>` -> [`IndexResult`]. Triplet extraction
114/// (graph-DB query or custom-data synthesis) and the empty-triplets
115/// short-circuit happen *outside* this pipeline in [`memify`] — see
116/// locked Decision 8 of [LIB-06-02][doc].
117///
118/// [doc]: ../../../../docs/telemetry/lib-06/02-memify-executor-route.md
119pub fn build_memify_index_only_pipeline(
120    vector_db: Arc<dyn VectorDB>,
121    embedding_engine: Arc<dyn EmbeddingEngine>,
122    dataset_id: Option<Uuid>,
123    user_id: Option<Uuid>,
124    tenant_id: Option<Uuid>,
125) -> Pipeline {
126    // Decision 4 (LIB-06): memify has no `Data` inputs. The watcher's
127    // run_info `data_ids` carrier stays empty (Python's `"None"` branch).
128    let data_id_fn: DataIdFn = Arc::new(|_v: Arc<dyn Value>| None);
129    PipelineBuilder::new_with_task(
130        "memify",
131        make_index_triplets_task(vector_db, embedding_engine, dataset_id, user_id, tenant_id),
132    )
133    .with_name("memify")
134    .with_data_id(data_id_fn)
135    .build()
136}
137
138/// Run the memify pipeline: extract triplets from the graph and index them.
139///
140/// # Algorithm
141/// 1. Validate configuration.
142/// 2. Pre-flight: extract triplets from the graph database (or synthesise
143///    them from `config.custom_data`).
144/// 3. If no triplets found, return early with zeros.
145/// 4. Build the one-task index-only pipeline (Decision 8) and route through
146///    [`cognee_core::pipeline::execute`] with a
147///    [`cognee_core::pipeline_run_registry::DbPipelineWatcher`] backed by
148///    the caller-supplied `pipeline_run_repo` (Decision 11, gap 08-07).
149/// 5. Downcast the executor output back to [`IndexResult`] and return a
150///    [`MemifyResult`].
151///
152/// # Arguments
153/// * `graph_db` — Graph database containing the knowledge graph.
154/// * `vector_db` — Vector database for storing triplet embeddings.
155/// * `embedding_engine` — Engine to generate text embeddings.
156/// * `thread_pool` — CPU pool for [`cognee_core::TaskContext`] (LIB-06
157///   Decision 1).
158/// * `database` — Relational [`DatabaseConnection`] for
159///   [`cognee_core::TaskContext`] (LIB-06 Decision 1).
160/// * `dataset_id` — Optional dataset ID for metadata tagging.
161/// * `user_id` — Optional user ID for metadata tagging.
162/// * `tenant_id` — Optional tenant ID for metadata tagging.
163/// * `config` — Pipeline configuration.
164///
165/// # Returns
166/// A [`MemifyResult`] with counts of extracted and indexed triplets.
167#[allow(clippy::too_many_arguments)]
168pub async fn memify(
169    graph_db: Arc<dyn GraphDBTrait>,
170    vector_db: Arc<dyn VectorDB>,
171    embedding_engine: Arc<dyn EmbeddingEngine>,
172    thread_pool: Arc<dyn CpuPool>,
173    database: Arc<DatabaseConnection>,
174    pipeline_run_repo: Arc<dyn PipelineRunRepository>,
175    dataset_id: Option<Uuid>,
176    user_id: Option<Uuid>,
177    tenant_id: Option<Uuid>,
178    config: &MemifyConfig,
179) -> Result<MemifyResult, MemifyError> {
180    // 1. Validate configuration.
181    config.validate()?;
182
183    // 1b. Qualification gate (gap 08-08, locked decision 3) ────────────────
184    // Skip when `dataset_id` is `None` — Python's gate only applies per
185    // dataset, and ad-hoc memify runs without a dataset cannot be looked up
186    // via `get_pipeline_run_by_dataset`. The pipeline name used here matches
187    // what the executor-routed pipeline persists (`build_memify_index_only_pipeline`
188    // sets `with_name("memify")`).
189    let pipeline_name = "memify";
190    if let Some(ds_id) = dataset_id {
191        match check_pipeline_run_qualification(pipeline_run_repo.as_ref(), ds_id, pipeline_name)
192            .await
193            .map_err(|e| MemifyError::Database(e.to_string()))?
194        {
195            Qualification::AlreadyCompleted(prior) => {
196                info!(
197                    dataset_id = %ds_id,
198                    pipeline_run_id = %prior.pipeline_run_id,
199                    "memify: dataset already completed; short-circuiting (Python parity)"
200                );
201                return Ok(MemifyResult::already_completed(prior.pipeline_run_id));
202            }
203            Qualification::AlreadyRunning(_prior) => {
204                return Err(MemifyError::PipelineAlreadyRunning {
205                    pipeline_name: pipeline_name.to_string(),
206                    dataset_id: Some(ds_id),
207                });
208            }
209            Qualification::Proceed => {}
210        }
211    }
212
213    // 2. Pre-flight: extract triplets from the graph database (or use custom data).
214    let triplets = if let Some(ref custom_data) = config.custom_data {
215        // When custom data is provided, convert JSON values to Triplet objects.
216        // Each value should be a JSON object with "source_node", "relationship_name",
217        // and "target_node" fields. UUIDs are generated deterministically from the
218        // text values.
219        let mut custom_triplets = Vec::new();
220        for value in custom_data {
221            let source = value
222                .get("source_node")
223                .and_then(|v| v.as_str())
224                .unwrap_or("unknown")
225                .to_string();
226            let relationship = value
227                .get("relationship_name")
228                .and_then(|v| v.as_str())
229                .unwrap_or("related_to")
230                .to_string();
231            let target = value
232                .get("target_node")
233                .and_then(|v| v.as_str())
234                .unwrap_or("unknown")
235                .to_string();
236            let source_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, source.to_lowercase().as_bytes());
237            let target_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, target.to_lowercase().as_bytes());
238            let text = format!("{source}-\u{203A}{relationship}-\u{203A}{target}");
239            custom_triplets.push(
240                Triplet::new(source_id, target_id, relationship, text).with_names(source, target),
241            );
242        }
243        info!(
244            "Using {} custom triplets instead of graph extraction",
245            custom_triplets.len()
246        );
247        custom_triplets
248    } else {
249        extract_triplets_from_graph_db(&*graph_db, config).await?
250    };
251
252    let triplet_count = triplets.len();
253
254    // 3. If empty, return early with zeros — skip the executor entirely.
255    if triplets.is_empty() {
256        info!("No triplets extracted from graph; nothing to index");
257        return Ok(MemifyResult::empty());
258    }
259
260    // 4. Build the one-task index-only pipeline and run it through
261    //    `pipeline::execute` (Decision 8 / 11).
262    let pipeline = build_memify_index_only_pipeline(
263        Arc::clone(&vector_db),
264        Arc::clone(&embedding_engine),
265        dataset_id,
266        user_id,
267        tenant_id,
268    );
269
270    // The executor re-derives `PipelineRunInfo.pipeline_id` from
271    // `(pipeline.name, user_id, dataset_id)`; we still carry `pipeline.id`
272    // through `PipelineContext` as the placeholder.
273    let pipeline_ctx = PipelineContext {
274        pipeline_id: pipeline.id,
275        pipeline_name: pipeline.name.clone().unwrap_or_default(),
276        user_id,
277        tenant_id,
278        dataset_id,
279        current_data: None,
280        run_id: None,
281        user_email: None,
282        provenance_visited: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
283    };
284
285    let (_cancel_handle, ctx) = TaskContextBuilder::new()
286        .thread_pool(thread_pool)
287        .database(database)
288        .graph_db(Arc::clone(&graph_db))
289        .vector_db(Arc::clone(&vector_db))
290        .pipeline_context(pipeline_ctx)
291        .build()
292        .map_err(|e| MemifyError::Context(e.to_string()))?;
293    let ctx = Arc::new(ctx);
294
295    let inputs: Vec<Arc<dyn Value>> = vec![Arc::new(triplets) as Arc<dyn Value>];
296
297    // Decision 11 (gap 08-07): persist the four-state `pipeline_runs` trail
298    // through the caller-supplied repository.
299    let watcher = DbPipelineWatcher::new(pipeline_run_repo);
300    let outputs = cognee_core::pipeline::execute(&pipeline, inputs, ctx, &watcher)
301        .await
302        .map_err(|e| MemifyError::Execute(e.to_string()))?;
303
304    let index_result = extract_memify_outputs(outputs)?;
305
306    // 5. Log summary and return.
307    info!(
308        "Memify complete: {} triplets extracted, {} indexed",
309        triplet_count, index_result.indexed_count
310    );
311
312    Ok(MemifyResult {
313        triplet_count,
314        index_result,
315        already_completed: false,
316        prior_pipeline_run_id: None,
317    })
318}
319
320// ---------------------------------------------------------------------------
321// Output extraction (Decision 9)
322// ---------------------------------------------------------------------------
323
324/// Downcast the executor's [`Arc<dyn Value>`] outputs back to the concrete
325/// [`IndexResult`] the memify convenience function promises.
326///
327/// Returns [`MemifyError::OutputTypeMismatch`] when the downcast fails — a
328/// programmer error indicating the pipeline's last task does not emit
329/// `IndexResult`.
330fn extract_memify_outputs(outputs: Vec<Arc<dyn Value>>) -> Result<IndexResult, MemifyError> {
331    let first = outputs
332        .into_iter()
333        .next()
334        .ok_or(MemifyError::OutputTypeMismatch {
335            expected: "IndexResult",
336            actual: "empty",
337        })?;
338    // Explicit deref through `Arc` to reach the inner `dyn Value`, then call
339    // `as_any` via vtable dispatch — mirrors the pattern in
340    // `cognee_ingestion::pipeline::extract_data_outputs` (LIB-06-01).
341    (*first)
342        .as_any()
343        .downcast_ref::<IndexResult>()
344        .cloned()
345        .ok_or(MemifyError::OutputTypeMismatch {
346            expected: "IndexResult",
347            actual: "unknown",
348        })
349}