use core::num::{NonZeroU32, NonZeroUsize};
const LIMIT_10: NonZeroUsize = NonZeroUsize::new(10).unwrap();
use oxgraph_postgres::{
Catalog, Config, EdgeId, EdgeRow, EngineBuilder, GraphRole, NodeKey, PostgresGraphError,
QueryFreshness, RegisteredEdge, RegisteredTable, SnapshotRebuild, SyncAction, SyncRow, TableId,
TraversalDirection, TraverseLimits,
bench_fixture::{chain_catalog, chain_edges},
};
fn chain_engine() -> Result<oxgraph_postgres::Engine, PostgresGraphError> {
let catalog = chain_catalog()?;
let edges = chain_edges();
let bytes = SnapshotRebuild::from_catalog_and_edges(&catalog, &edges, 1_700_000_000)?;
EngineBuilder::new().snapshot_owned(bytes).build()
}
#[test]
fn relational_build_reload_and_queries() -> Result<(), oxgraph_postgres::PostgresGraphError> {
let mut catalog = Catalog::new();
catalog.add_table(RegisteredTable {
id: TableId(1),
schema: "public".into(),
name: "nodes".into(),
primary_key_column: "id".into(),
})?;
catalog.add_edge(RegisteredEdge {
id: EdgeId(1),
source_table: TableId(1),
target_table: TableId(1),
source_column: "src".into(),
target_column: "dst".into(),
schema: "public".into(),
name: "edges".into(),
})?;
let edges = [
EdgeRow {
source: NodeKey::registered(TableId(1), 1),
target: NodeKey::registered(TableId(1), 2),
},
EdgeRow {
source: NodeKey::registered(TableId(1), 2),
target: NodeKey::registered(TableId(1), 3),
},
];
let bytes = SnapshotRebuild::from_catalog_and_edges(&catalog, &edges, 1_700_000_000)?;
let mut engine = EngineBuilder::new().snapshot_owned(bytes).build()?;
let limits = TraverseLimits::bounded(LIMIT_10);
let forward = engine.traverse(0, limits, TraversalDirection::Out)?;
assert_eq!(forward, vec![0, 1, 2]);
let reverse = engine.traverse(2, limits, TraversalDirection::In)?;
assert_eq!(reverse, vec![2, 1, 0]);
engine.apply_sync_rows(&[SyncRow {
sequence: 1,
action: SyncAction::InsertEdge {
source: 0,
target: 2,
},
}])?;
let with_overlay = engine.traverse(0, limits, TraversalDirection::Out)?;
assert!(with_overlay.contains(&2));
engine.rebuild_from_catalog(&catalog, &edges, 1_700_000_001)?;
let reloaded = engine.traverse(0, limits, TraversalDirection::Out)?;
assert_eq!(reloaded, vec![0, 1, 2]);
Ok(())
}
#[test]
fn sync_overlay_invisible_until_applied() -> Result<(), PostgresGraphError> {
let mut engine = chain_engine()?;
let limits = TraverseLimits::bounded(LIMIT_10);
let before = engine.traverse(0, limits, TraversalDirection::Out)?;
assert_eq!(before, vec![0, 1, 2]);
assert_eq!(engine.sync_health().overlay_edges, 0);
Ok(())
}
#[test]
fn rebuild_clears_overlay_and_matches_relational() -> Result<(), PostgresGraphError> {
let catalog = chain_catalog()?;
let mut edges = chain_edges().to_vec();
let bytes = SnapshotRebuild::from_catalog_and_edges(&catalog, &edges, 1_700_000_000)?;
let mut engine = EngineBuilder::new().snapshot_owned(bytes).build()?;
engine.apply_sync_rows(&[SyncRow {
sequence: 1,
action: SyncAction::InsertEdge {
source: 0,
target: 2,
},
}])?;
assert_eq!(engine.sync_health().overlay_edges, 1);
edges.push(EdgeRow {
source: NodeKey::registered(TableId(1), 1),
target: NodeKey::registered(TableId(1), 3),
});
engine.rebuild_from_catalog(&catalog, &edges, 1_700_000_001)?;
assert_eq!(engine.sync_health().overlay_edges, 0);
engine.set_config(Config {
query_freshness: QueryFreshness::BaseOnly,
..Config::default()
})?;
let depth_one = engine.traverse(
0,
TraverseLimits {
result_limit: LIMIT_10,
max_depth: NonZeroU32::new(1),
},
TraversalDirection::Out,
)?;
assert!(
depth_one.contains(&2),
"rebuilt base should include relational shortcut edge"
);
Ok(())
}
#[test]
fn base_only_vs_overlay_aware_on_same_overlay() -> Result<(), PostgresGraphError> {
let mut engine = chain_engine()?;
engine.apply_sync_rows(&[SyncRow {
sequence: 1,
action: SyncAction::InsertEdge {
source: 0,
target: 2,
},
}])?;
let limits = TraverseLimits {
result_limit: LIMIT_10,
max_depth: NonZeroU32::new(1),
};
engine.set_config(Config {
query_freshness: QueryFreshness::OverlayAware,
..Config::default()
})?;
let overlay_aware = engine.traverse(0, limits, TraversalDirection::Out)?;
assert!(overlay_aware.contains(&2));
engine.set_config(Config {
query_freshness: QueryFreshness::BaseOnly,
..Config::default()
})?;
let base_only = engine.traverse(0, limits, TraversalDirection::Out)?;
assert_eq!(base_only, vec![0, 1]);
Ok(())
}
#[test]
fn maintenance_disabled_blocks_rebuild() -> Result<(), PostgresGraphError> {
let catalog = chain_catalog()?;
let edges = chain_edges();
let mut engine = chain_engine()?;
engine.set_config(Config {
maintenance_enabled: false,
..Config::default()
})?;
assert!(matches!(
engine.rebuild_from_catalog(&catalog, &edges, 1_700_000_001),
Err(PostgresGraphError::Config(
oxgraph_postgres::ConfigError::MaintenanceDisabled
))
));
Ok(())
}
#[test]
fn acl_denial_helper() {
assert!(GraphRole::Reader.satisfies(GraphRole::Reader).is_ok());
assert!(matches!(
GraphRole::Reader.satisfies(GraphRole::Admin),
Err(PostgresGraphError::AccessDenied { .. })
));
}