#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::doc_markdown
)]
use entelix_core::TenantId;
use std::sync::Arc;
use chrono::Utc;
use entelix_core::ExecutionContext;
use entelix_graphmemory_pg::PgGraphMemory;
use entelix_memory::{GraphMemory, Namespace};
use sqlx::Executor;
use sqlx::postgres::{PgPool, PgPoolOptions};
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::testcontainers::ContainerAsync;
use testcontainers_modules::testcontainers::runners::AsyncRunner;
const APP_ROLE: &str = "graph_app";
const APP_PASSWORD: &str = "apppwd";
async fn boot_with_app_role() -> (
PgGraphMemory<String, String>,
PgGraphMemory<String, String>,
ContainerAsync<Postgres>,
) {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let super_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let super_pool = Arc::new(PgPoolOptions::new().connect(&super_url).await.unwrap());
let super_graph = PgGraphMemory::<String, String>::builder()
.with_pool(Arc::clone(&super_pool))
.build()
.await
.unwrap();
super_pool
.execute(
format!(
"CREATE ROLE {APP_ROLE} WITH LOGIN PASSWORD '{APP_PASSWORD}' \
NOSUPERUSER NOBYPASSRLS"
)
.as_str(),
)
.await
.unwrap();
super_pool
.execute(
format!(
"GRANT SELECT, INSERT, UPDATE, DELETE ON \
graph_nodes, graph_edges TO {APP_ROLE}"
)
.as_str(),
)
.await
.unwrap();
super_pool
.execute(format!("GRANT USAGE ON SCHEMA public TO {APP_ROLE}").as_str())
.await
.unwrap();
let app_url = format!("postgres://{APP_ROLE}:{APP_PASSWORD}@127.0.0.1:{port}/postgres");
let app_pool = Arc::new(PgPoolOptions::new().connect(&app_url).await.unwrap());
let app_graph = PgGraphMemory::<String, String>::builder()
.with_pool(Arc::clone(&app_pool))
.with_auto_migrate(false)
.build()
.await
.unwrap();
(super_graph, app_graph, container)
}
#[tokio::test]
#[ignore = "requires docker"]
async fn rls_blocks_cross_tenant_node_lookup_at_db_layer() {
let (_super_graph, app_graph, container) = boot_with_app_role().await;
let ctx = ExecutionContext::new();
let ns_a = Namespace::new(TenantId::new("tenant-A")).with_scope("scope");
let id = app_graph
.add_node(&ctx, &ns_a, "alice".into())
.await
.unwrap();
assert_eq!(
app_graph
.get_node(&ctx, &ns_a, &id)
.await
.unwrap()
.as_deref(),
Some("alice"),
);
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM graph_nodes")
.fetch_one(&*pool_for(APP_ROLE, APP_PASSWORD, &container).await)
.await
.unwrap();
assert_eq!(
count.0, 0,
"RLS must hide rows when entelix.tenant_id is unset (defense vs forgotten SET LOCAL)"
);
}
#[tokio::test]
#[ignore = "requires docker"]
async fn rls_with_check_blocks_mismatched_tenant_inserts() {
let (_super_graph, _app_graph, container) = boot_with_app_role().await;
let app_pool = pool_for(APP_ROLE, APP_PASSWORD, &container).await;
let mut tx = app_pool.begin().await.unwrap();
sqlx::query("SELECT set_config('entelix.tenant_id', 'tenant-A', true)")
.execute(&mut *tx)
.await
.unwrap();
let result = sqlx::query(
r#"
INSERT INTO graph_nodes (tenant_id, namespace_key, id, payload)
VALUES ('tenant-B', 'tenant-B:scope', 'node-1', '"v"')
"#,
)
.execute(&mut *tx)
.await;
assert!(
result.is_err(),
"WITH CHECK must reject INSERT whose tenant_id differs from session var"
);
}
#[tokio::test]
#[ignore = "requires docker"]
async fn rls_applies_to_edges_table_too() {
let (_super_graph, app_graph, container) = boot_with_app_role().await;
let ctx = ExecutionContext::new();
let ns = Namespace::new(TenantId::new("tenant-A"));
let now = Utc::now();
let a = app_graph.add_node(&ctx, &ns, "a".into()).await.unwrap();
let b = app_graph.add_node(&ctx, &ns, "b".into()).await.unwrap();
app_graph
.add_edge(&ctx, &ns, &a, &b, "ab".into(), now)
.await
.unwrap();
let app_pool = pool_for(APP_ROLE, APP_PASSWORD, &container).await;
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM graph_edges")
.fetch_one(&*app_pool)
.await
.unwrap();
assert_eq!(
count.0, 0,
"graph_edges RLS must hide rows when tenant unset"
);
}
#[tokio::test]
#[ignore = "requires docker"]
async fn correct_tenant_session_returns_rows() {
let (_super_graph, app_graph, container) = boot_with_app_role().await;
let ctx = ExecutionContext::new();
let ns = Namespace::new(TenantId::new("tenant-X")).with_scope("conv");
app_graph
.add_node(&ctx, &ns, "node-1".into())
.await
.unwrap();
let app_pool = pool_for(APP_ROLE, APP_PASSWORD, &container).await;
let count = count_in_tx(&app_pool, "tenant-X").await;
assert_eq!(count, 1, "row visible when correct tenant set");
let mismatched = count_in_tx(&app_pool, "tenant-Y").await;
assert_eq!(mismatched, 0, "row hidden when wrong tenant set");
}
async fn pool_for(user: &str, pw: &str, container: &ContainerAsync<Postgres>) -> Arc<PgPool> {
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://{user}:{pw}@127.0.0.1:{port}/postgres");
Arc::new(PgPoolOptions::new().connect(&url).await.unwrap())
}
async fn count_in_tx(pool: &PgPool, tenant: &str) -> i64 {
let mut tx = pool.begin().await.unwrap();
sqlx::query("SELECT set_config('entelix.tenant_id', $1, true)")
.bind(tenant)
.execute(&mut *tx)
.await
.unwrap();
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM graph_nodes")
.fetch_one(&mut *tx)
.await
.unwrap();
tx.commit().await.unwrap();
row.0
}