cognee_cognify/memify/pipeline.rs
1//! Memify pipeline orchestration.
2//!
3//! The `memify` function extracts triplets from an existing knowledge graph
4//! and indexes them into the vector database for semantic search.
5
6use std::sync::Arc;
7
8use cognee_core::pipeline::DataIdFn;
9use cognee_core::pipeline_run_registry::DbPipelineWatcher;
10use cognee_core::task::Value;
11use cognee_core::{
12 CpuPool, Pipeline, PipelineBuilder, PipelineContext, TaskContextBuilder, TypedTask,
13};
14use cognee_database::{DatabaseConnection, PipelineRunRepository};
15use cognee_embedding::EmbeddingEngine;
16use cognee_graph::GraphDBTrait;
17use cognee_models::Triplet;
18use cognee_vector::VectorDB;
19use tracing::info;
20use uuid::Uuid;
21
22use super::config::MemifyConfig;
23use super::error::MemifyError;
24use super::extract_triplets::extract_triplets_from_graph_db;
25use super::index_triplets::{IndexResult, index_triplets};
26use crate::qualification::{Qualification, check_pipeline_run_qualification};
27
28/// Result of the memify pipeline.
29#[derive(Debug, Clone)]
30pub struct MemifyResult {
31 /// Number of triplets extracted from the graph.
32 pub triplet_count: usize,
33
34 /// Details about vector indexing.
35 pub index_result: IndexResult,
36
37 /// `true` when this result was synthesised by the
38 /// `check_pipeline_run_qualification` short-circuit (latest
39 /// `pipeline_runs` row was `COMPLETED`). All other fields are zero.
40 pub already_completed: bool,
41
42 /// The `pipeline_run_id` of the prior completed run that triggered the
43 /// short-circuit. `None` on normal results.
44 pub prior_pipeline_run_id: Option<Uuid>,
45}
46
47impl MemifyResult {
48 /// Create an empty result with zeroed counts.
49 pub fn empty() -> Self {
50 Self {
51 triplet_count: 0,
52 index_result: IndexResult {
53 indexed_count: 0,
54 batch_count: 0,
55 },
56 already_completed: false,
57 prior_pipeline_run_id: None,
58 }
59 }
60
61 /// Create a short-circuit "already completed" result tagged with the
62 /// prior `pipeline_run_id`. Mirrors
63 /// [`crate::CognifyResult::already_completed`]. See doc 08-08 §4.4.
64 pub fn already_completed(pipeline_run_id: Uuid) -> Self {
65 Self {
66 already_completed: true,
67 prior_pipeline_run_id: Some(pipeline_run_id),
68 ..Self::empty()
69 }
70 }
71}
72
73// ---------------------------------------------------------------------------
74// Typed task: Vec<Triplet> -> IndexResult
75// ---------------------------------------------------------------------------
76
77/// Build the one-task closure that indexes a pre-extracted batch of
78/// [`Triplet`]s into the vector database.
79///
80/// Locked Decision 8 (LIB-06, 2026-05-13): memify's executor-routed
81/// pipeline is a single "index-only" task whose input is the
82/// `Vec<Triplet>` produced by the pre-flight extraction step in
83/// [`memify`].
84fn make_index_triplets_task(
85 vector_db: Arc<dyn VectorDB>,
86 embedding_engine: Arc<dyn EmbeddingEngine>,
87 dataset_id: Option<Uuid>,
88 user_id: Option<Uuid>,
89 tenant_id: Option<Uuid>,
90) -> TypedTask<Vec<Triplet>, IndexResult> {
91 TypedTask::async_fn(move |triplets: &Vec<Triplet>, _ctx| {
92 let triplets = triplets.clone();
93 let vector_db = Arc::clone(&vector_db);
94 let embedding_engine = Arc::clone(&embedding_engine);
95 Box::pin(async move {
96 index_triplets(
97 &triplets,
98 &*vector_db,
99 &*embedding_engine,
100 dataset_id,
101 user_id,
102 tenant_id,
103 )
104 .await
105 .map(Box::new)
106 .map_err(|e| format!("{e}").into())
107 })
108 })
109}
110
111/// Build the executor-routed memify pipeline.
112///
113/// One task: `Vec<Triplet>` -> [`IndexResult`]. Triplet extraction
114/// (graph-DB query or custom-data synthesis) and the empty-triplets
115/// short-circuit happen *outside* this pipeline in [`memify`] — see
116/// locked Decision 8 of [LIB-06-02][doc].
117///
118/// [doc]: ../../../../docs/telemetry/lib-06/02-memify-executor-route.md
119pub fn build_memify_index_only_pipeline(
120 vector_db: Arc<dyn VectorDB>,
121 embedding_engine: Arc<dyn EmbeddingEngine>,
122 dataset_id: Option<Uuid>,
123 user_id: Option<Uuid>,
124 tenant_id: Option<Uuid>,
125) -> Pipeline {
126 // Decision 4 (LIB-06): memify has no `Data` inputs. The watcher's
127 // run_info `data_ids` carrier stays empty (Python's `"None"` branch).
128 let data_id_fn: DataIdFn = Arc::new(|_v: Arc<dyn Value>| None);
129 PipelineBuilder::new_with_task(
130 "memify",
131 make_index_triplets_task(vector_db, embedding_engine, dataset_id, user_id, tenant_id),
132 )
133 .with_name("memify")
134 .with_data_id(data_id_fn)
135 .build()
136}
137
138/// Run the memify pipeline: extract triplets from the graph and index them.
139///
140/// # Algorithm
141/// 1. Validate configuration.
142/// 2. Pre-flight: extract triplets from the graph database (or synthesise
143/// them from `config.custom_data`).
144/// 3. If no triplets found, return early with zeros.
145/// 4. Build the one-task index-only pipeline (Decision 8) and route through
146/// [`cognee_core::pipeline::execute`] with a
147/// [`cognee_core::pipeline_run_registry::DbPipelineWatcher`] backed by
148/// the caller-supplied `pipeline_run_repo` (Decision 11, gap 08-07).
149/// 5. Downcast the executor output back to [`IndexResult`] and return a
150/// [`MemifyResult`].
151///
152/// # Arguments
153/// * `graph_db` — Graph database containing the knowledge graph.
154/// * `vector_db` — Vector database for storing triplet embeddings.
155/// * `embedding_engine` — Engine to generate text embeddings.
156/// * `thread_pool` — CPU pool for [`cognee_core::TaskContext`] (LIB-06
157/// Decision 1).
158/// * `database` — Relational [`DatabaseConnection`] for
159/// [`cognee_core::TaskContext`] (LIB-06 Decision 1).
160/// * `dataset_id` — Optional dataset ID for metadata tagging.
161/// * `user_id` — Optional user ID for metadata tagging.
162/// * `tenant_id` — Optional tenant ID for metadata tagging.
163/// * `config` — Pipeline configuration.
164///
165/// # Returns
166/// A [`MemifyResult`] with counts of extracted and indexed triplets.
167#[allow(clippy::too_many_arguments)]
168pub async fn memify(
169 graph_db: Arc<dyn GraphDBTrait>,
170 vector_db: Arc<dyn VectorDB>,
171 embedding_engine: Arc<dyn EmbeddingEngine>,
172 thread_pool: Arc<dyn CpuPool>,
173 database: Arc<DatabaseConnection>,
174 pipeline_run_repo: Arc<dyn PipelineRunRepository>,
175 dataset_id: Option<Uuid>,
176 user_id: Option<Uuid>,
177 tenant_id: Option<Uuid>,
178 config: &MemifyConfig,
179) -> Result<MemifyResult, MemifyError> {
180 // 1. Validate configuration.
181 config.validate()?;
182
183 // 1b. Qualification gate (gap 08-08, locked decision 3) ────────────────
184 // Skip when `dataset_id` is `None` — Python's gate only applies per
185 // dataset, and ad-hoc memify runs without a dataset cannot be looked up
186 // via `get_pipeline_run_by_dataset`. The pipeline name used here matches
187 // what the executor-routed pipeline persists (`build_memify_index_only_pipeline`
188 // sets `with_name("memify")`).
189 let pipeline_name = "memify";
190 if let Some(ds_id) = dataset_id {
191 match check_pipeline_run_qualification(pipeline_run_repo.as_ref(), ds_id, pipeline_name)
192 .await
193 .map_err(|e| MemifyError::Database(e.to_string()))?
194 {
195 Qualification::AlreadyCompleted(prior) => {
196 info!(
197 dataset_id = %ds_id,
198 pipeline_run_id = %prior.pipeline_run_id,
199 "memify: dataset already completed; short-circuiting (Python parity)"
200 );
201 return Ok(MemifyResult::already_completed(prior.pipeline_run_id));
202 }
203 Qualification::AlreadyRunning(_prior) => {
204 return Err(MemifyError::PipelineAlreadyRunning {
205 pipeline_name: pipeline_name.to_string(),
206 dataset_id: Some(ds_id),
207 });
208 }
209 Qualification::Proceed => {}
210 }
211 }
212
213 // 2. Pre-flight: extract triplets from the graph database (or use custom data).
214 let triplets = if let Some(ref custom_data) = config.custom_data {
215 // When custom data is provided, convert JSON values to Triplet objects.
216 // Each value should be a JSON object with "source_node", "relationship_name",
217 // and "target_node" fields. UUIDs are generated deterministically from the
218 // text values.
219 let mut custom_triplets = Vec::new();
220 for value in custom_data {
221 let source = value
222 .get("source_node")
223 .and_then(|v| v.as_str())
224 .unwrap_or("unknown")
225 .to_string();
226 let relationship = value
227 .get("relationship_name")
228 .and_then(|v| v.as_str())
229 .unwrap_or("related_to")
230 .to_string();
231 let target = value
232 .get("target_node")
233 .and_then(|v| v.as_str())
234 .unwrap_or("unknown")
235 .to_string();
236 let source_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, source.to_lowercase().as_bytes());
237 let target_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, target.to_lowercase().as_bytes());
238 let text = format!("{source}-\u{203A}{relationship}-\u{203A}{target}");
239 custom_triplets.push(
240 Triplet::new(source_id, target_id, relationship, text).with_names(source, target),
241 );
242 }
243 info!(
244 "Using {} custom triplets instead of graph extraction",
245 custom_triplets.len()
246 );
247 custom_triplets
248 } else {
249 extract_triplets_from_graph_db(&*graph_db, config).await?
250 };
251
252 let triplet_count = triplets.len();
253
254 // 3. If empty, return early with zeros — skip the executor entirely.
255 if triplets.is_empty() {
256 info!("No triplets extracted from graph; nothing to index");
257 return Ok(MemifyResult::empty());
258 }
259
260 // 4. Build the one-task index-only pipeline and run it through
261 // `pipeline::execute` (Decision 8 / 11).
262 let pipeline = build_memify_index_only_pipeline(
263 Arc::clone(&vector_db),
264 Arc::clone(&embedding_engine),
265 dataset_id,
266 user_id,
267 tenant_id,
268 );
269
270 // The executor re-derives `PipelineRunInfo.pipeline_id` from
271 // `(pipeline.name, user_id, dataset_id)`; we still carry `pipeline.id`
272 // through `PipelineContext` as the placeholder.
273 let pipeline_ctx = PipelineContext {
274 pipeline_id: pipeline.id,
275 pipeline_name: pipeline.name.clone().unwrap_or_default(),
276 user_id,
277 tenant_id,
278 dataset_id,
279 current_data: None,
280 run_id: None,
281 user_email: None,
282 provenance_visited: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
283 };
284
285 let (_cancel_handle, ctx) = TaskContextBuilder::new()
286 .thread_pool(thread_pool)
287 .database(database)
288 .graph_db(Arc::clone(&graph_db))
289 .vector_db(Arc::clone(&vector_db))
290 .pipeline_context(pipeline_ctx)
291 .build()
292 .map_err(|e| MemifyError::Context(e.to_string()))?;
293 let ctx = Arc::new(ctx);
294
295 let inputs: Vec<Arc<dyn Value>> = vec![Arc::new(triplets) as Arc<dyn Value>];
296
297 // Decision 11 (gap 08-07): persist the four-state `pipeline_runs` trail
298 // through the caller-supplied repository.
299 let watcher = DbPipelineWatcher::new(pipeline_run_repo);
300 let outputs = cognee_core::pipeline::execute(&pipeline, inputs, ctx, &watcher)
301 .await
302 .map_err(|e| MemifyError::Execute(e.to_string()))?;
303
304 let index_result = extract_memify_outputs(outputs)?;
305
306 // 5. Log summary and return.
307 info!(
308 "Memify complete: {} triplets extracted, {} indexed",
309 triplet_count, index_result.indexed_count
310 );
311
312 Ok(MemifyResult {
313 triplet_count,
314 index_result,
315 already_completed: false,
316 prior_pipeline_run_id: None,
317 })
318}
319
320// ---------------------------------------------------------------------------
321// Output extraction (Decision 9)
322// ---------------------------------------------------------------------------
323
324/// Downcast the executor's [`Arc<dyn Value>`] outputs back to the concrete
325/// [`IndexResult`] the memify convenience function promises.
326///
327/// Returns [`MemifyError::OutputTypeMismatch`] when the downcast fails — a
328/// programmer error indicating the pipeline's last task does not emit
329/// `IndexResult`.
330fn extract_memify_outputs(outputs: Vec<Arc<dyn Value>>) -> Result<IndexResult, MemifyError> {
331 let first = outputs
332 .into_iter()
333 .next()
334 .ok_or(MemifyError::OutputTypeMismatch {
335 expected: "IndexResult",
336 actual: "empty",
337 })?;
338 // Explicit deref through `Arc` to reach the inner `dyn Value`, then call
339 // `as_any` via vtable dispatch — mirrors the pattern in
340 // `cognee_ingestion::pipeline::extract_data_outputs` (LIB-06-01).
341 (*first)
342 .as_any()
343 .downcast_ref::<IndexResult>()
344 .cloned()
345 .ok_or(MemifyError::OutputTypeMismatch {
346 expected: "IndexResult",
347 actual: "unknown",
348 })
349}