#![allow(
clippy::unwrap_used,
clippy::expect_used,
reason = "test code — panics are acceptable failures"
)]
#![cfg(all(feature = "pggraph", feature = "pgvector", feature = "postgres"))]
use std::sync::Arc;
use cognee_lib::PipelineContext;
use cognee_lib::add::AddPipeline;
use cognee_lib::cognify::{CognifyConfig, cognify};
use cognee_lib::component_manager::ComponentManager;
use cognee_lib::config::{ConfigManager, Settings};
use cognee_lib::core::{CpuPool, RayonThreadPool};
use cognee_lib::database::{IngestDb, PipelineRunRepository, SeaOrmPipelineRunRepository, ops};
use cognee_lib::models::DataInput;
use cognee_lib::ontology::{NoOpOntologyResolver, OntologyResolver};
use serial_test::serial;
use uuid::Uuid;
fn postgres_url() -> Option<String> {
let _ = dotenv::dotenv();
std::env::var("TEST_POSTGRES_URL")
.ok()
.filter(|v| !v.is_empty())
}
fn llm_and_embedding_available() -> bool {
let _ = dotenv::dotenv();
let has_llm = std::env::var("OPENAI_URL")
.or_else(|_| std::env::var("LLM_ENDPOINT"))
.map(|v| !v.is_empty())
.unwrap_or(false)
&& std::env::var("OPENAI_TOKEN")
.or_else(|_| std::env::var("LLM_API_KEY"))
.map(|v| !v.is_empty())
.unwrap_or(false);
let has_embed = std::env::var("COGNEE_E2E_EMBED_MODEL_PATH")
.map(|v| !v.is_empty())
.unwrap_or(false)
&& std::env::var("COGNEE_E2E_TOKENIZER_PATH")
.map(|v| !v.is_empty())
.unwrap_or(false);
has_llm && has_embed
}
fn make_all_postgres_settings(pg_url: &str) -> Settings {
let parsed = url::Url::parse(pg_url).expect("TEST_POSTGRES_URL must be a valid URL");
let host = parsed.host_str().unwrap_or("localhost").to_string();
let port = parsed.port().unwrap_or(5432);
let name = parsed.path().trim_start_matches('/').to_string();
let user = parsed.username().to_string();
let pass = parsed.password().unwrap_or("").to_string();
let llm_key = std::env::var("OPENAI_TOKEN")
.or_else(|_| std::env::var("LLM_API_KEY"))
.unwrap_or_default();
let llm_endpoint = std::env::var("OPENAI_URL")
.or_else(|_| std::env::var("LLM_ENDPOINT"))
.unwrap_or_default();
let llm_model = std::env::var("LLM_MODEL")
.or_else(|_| std::env::var("OPENAI_MODEL"))
.unwrap_or_else(|_| "gpt-4o-mini".to_string());
let embed_model = std::env::var("COGNEE_E2E_EMBED_MODEL_PATH").unwrap_or_default();
let embed_tok = std::env::var("COGNEE_E2E_TOKENIZER_PATH").unwrap_or_default();
Settings {
db_provider: "postgres".to_string(),
db_host: host,
db_port: port,
db_name: name,
db_username: user,
db_password: pass,
graph_database_provider: "postgres".to_string(),
vector_db_provider: "pgvector".to_string(),
vector_db_url: pg_url.to_string(),
embedding_provider: "onnx".to_string(),
embedding_model_path: embed_model,
embedding_tokenizer_path: embed_tok,
embedding_dimensions: 384,
llm_provider: "openai".to_string(),
llm_api_key: llm_key,
llm_endpoint,
llm_model,
system_root_directory: format!("./.cognee_pg_e2e_{}", Uuid::new_v4()),
..Settings::default()
}
}
#[tokio::test]
#[serial]
async fn pg_full_stack_add_and_cognify() {
let Some(pg_url) = postgres_url() else {
eprintln!("TEST_POSTGRES_URL not set — skipping pg_full_stack_add_and_cognify");
return;
};
if !llm_and_embedding_available() {
eprintln!("LLM/embedding env vars not set — skipping pg_full_stack_add_and_cognify");
return;
}
let settings = make_all_postgres_settings(&pg_url);
let system_root = settings.system_root_directory.clone();
let cm = Arc::new(ComponentManager::new(ConfigManager::new(settings)));
let db = cm
.database()
.await
.expect("relational Postgres backend must initialize");
let graph_db = cm
.graph_db()
.await
.expect("PgGraphAdapter must initialize via ComponentManager");
let vector_db = cm
.vector_db()
.await
.expect("PgVectorAdapter must initialize via ComponentManager");
let embedding = cm
.embedding_engine()
.await
.expect("embedding engine must initialize");
let llm = cm.llm().await.expect("LLM must initialize");
let storage = cm.storage().await.expect("storage must initialize");
let owner_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001")
.expect("static UUID is always valid");
let dataset_name = format!("pg_e2e_test_{}", Uuid::new_v4().simple());
let fixture = "Alice Johnson is a software engineer at TechCorp in San Francisco. \
She works on machine learning infrastructure and collaborates with Bob Smith.";
let ingest_db: Arc<dyn IngestDb> = db.clone();
let thread_pool: Arc<dyn CpuPool> = Arc::new(
RayonThreadPool::with_default_threads().expect("RayonThreadPool must create successfully"),
);
let add_pipeline = AddPipeline::new(Arc::clone(&storage), ingest_db)
.with_thread_pool(Arc::clone(&thread_pool))
.with_graph_db(Arc::clone(&graph_db))
.with_vector_db(Arc::clone(&vector_db))
.with_database(Arc::clone(&db));
add_pipeline
.add(
vec![DataInput::Text(fixture.to_string())],
&dataset_name,
owner_id,
None,
)
.await
.expect("add() must succeed on Postgres stack");
let dataset = ops::datasets::get_dataset_by_name(&db, &dataset_name, owner_id, None)
.await
.expect("get_dataset_by_name must succeed")
.unwrap_or_else(|| panic!("dataset '{dataset_name}' must exist after add()"));
let data_items = ops::datasets::get_dataset_data(&db, dataset.id)
.await
.expect("get_dataset_data must succeed");
assert!(
!data_items.is_empty(),
"Dataset must have > 0 data items after add()"
);
let cognify_config = CognifyConfig::default()
.with_incremental_loading(false)
.with_summarization(false);
let pipeline_run_repo: Arc<dyn PipelineRunRepository> =
Arc::new(SeaOrmPipelineRunRepository::new(Arc::clone(&db)));
let ontology_resolver: Arc<dyn OntologyResolver> = Arc::new(NoOpOntologyResolver::new());
cognify(
data_items,
dataset.id,
Some(owner_id),
None,
dataset.tenant_id,
Arc::clone(&llm),
Arc::clone(&storage),
Arc::clone(&graph_db),
Arc::clone(&vector_db),
Arc::clone(&embedding),
Arc::clone(&db),
Arc::clone(&pipeline_run_repo),
Arc::clone(&thread_pool),
Arc::clone(&ontology_resolver),
&cognify_config,
)
.await
.expect("cognify() must succeed on Postgres stack");
let (nodes, _) = graph_db
.get_graph_data()
.await
.expect("get_graph_data must succeed");
assert!(
!nodes.is_empty(),
"Graph must have > 0 nodes after cognify on Postgres stack"
);
let collections: Vec<(String, String)> = vector_db
.list_collections()
.await
.expect("list_collections must succeed");
assert!(
!collections.is_empty(),
"Vector DB must have > 0 collections after cognify on Postgres stack"
);
let _ = graph_db.delete_graph().await;
let _ = std::fs::remove_dir_all(&system_root);
}