mod common;
use ankurah::property::Json;
use ankurah::{policy::DEFAULT_CONTEXT as c, Model, Node, PermissiveAgent};
use ankurah_storage_postgres::sql_builder::{split_predicate_for_postgres, SplitPredicate};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[allow(dead_code)]
fn assert_fully_pushes_down(query: &str) {
let selection = ankql::parser::parse_selection(query).expect("Failed to parse query");
let split = split_predicate_for_postgres(&selection.predicate);
assert!(
!split.needs_post_filter(),
"Query '{}' should fully push down to PostgreSQL, but remaining predicate is: {:?}",
query,
split.remaining_predicate
);
}
#[allow(dead_code)]
fn get_predicate_split(query: &str) -> SplitPredicate {
let selection = ankql::parser::parse_selection(query).expect("Failed to parse query");
split_predicate_for_postgres(&selection.predicate)
}
#[derive(Model, Debug, Serialize, Deserialize, Clone)]
pub struct Track {
pub name: String,
pub licensing: Json,
}
#[tokio::test]
async fn test_json_property_storage_and_simple_query() -> Result<()> {
let (_container, storage) = common::create_postgres_container().await?;
let node = Node::new_durable(Arc::new(storage), PermissiveAgent::new());
node.system.create().await?;
let ctx = node.context_async(c).await;
{
let trx = ctx.begin();
trx.create(&Track {
name: "Test Track".to_string(),
licensing: Json::new(serde_json::json!({
"territory": "US",
"rights": "exclusive"
})),
})
.await?;
trx.commit().await?;
}
let tracks: Vec<TrackView> = ctx.fetch("name = 'Test Track'").await?;
assert_eq!(tracks.len(), 1);
assert_eq!(tracks[0].name().unwrap(), "Test Track");
Ok(())
}
#[tokio::test]
async fn test_bytea_jsonb_operator_behavior() -> Result<()> {
let (container, _storage) = common::create_postgres_container().await?;
let host = container.get_host().await?;
let port = container.get_host_port_ipv4(5432).await?;
let (client, connection) =
tokio_postgres::connect(&format!("host={host} port={port} user=postgres password=postgres dbname=ankurah"), tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
let _ = connection.await;
});
client.execute("CREATE TABLE IF NOT EXISTS test_bytea (id SERIAL PRIMARY KEY, data BYTEA)", &[]).await?;
let json_bytes = serde_json::to_vec(&serde_json::json!({"territory": "US"})).unwrap();
client.execute("INSERT INTO test_bytea (data) VALUES ($1)", &[&json_bytes]).await?;
let result = client.query("SELECT data->'territory' FROM test_bytea", &[]).await;
assert!(result.is_err(), "JSONB operator on bytea should error");
let err = result.unwrap_err();
let err_debug = format!("{:?}", err);
let err_display = err.to_string();
assert!(
err_debug.contains("operator does not exist") || err_debug.contains("type") || err_debug.contains("bytea"),
"Expected type/operator error, got display='{}', debug='{}'",
err_display,
err_debug
);
let col_info =
client.query("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'test_bytea'", &[]).await?;
let data_type: String = col_info.iter().find(|r| r.get::<_, String>(0) == "data").unwrap().get(1);
assert_eq!(data_type, "bytea");
Ok(())
}
#[test]
fn test_json_path_pushdown_verification() {
assert_fully_pushes_down("licensing.territory = 'US'");
assert_fully_pushes_down("licensing.rights.holder = 'Label'");
assert_fully_pushes_down("licensing.count > 10");
assert_fully_pushes_down("name = 'Test' AND licensing.territory = 'US'");
assert_fully_pushes_down("licensing.territory = 'US' OR licensing.territory = 'UK'");
assert_fully_pushes_down("licensing.nested.deeply.value = 'test'");
}
#[tokio::test]
async fn test_json_path_query_string_equality() -> Result<()> {
assert_fully_pushes_down("licensing.territory = 'US'");
let (container, storage) = common::create_postgres_container().await?;
let node = Node::new_durable(Arc::new(storage), PermissiveAgent::new());
node.system.create().await?;
let ctx = node.context_async(c).await;
{
let trx = ctx.begin();
trx.create(&Track {
name: "US Track".to_string(),
licensing: Json::new(serde_json::json!({"territory": "US", "rights": "exclusive"})),
})
.await?;
trx.create(&Track {
name: "UK Track".to_string(),
licensing: Json::new(serde_json::json!({"territory": "UK", "rights": "non-exclusive"})),
})
.await?;
trx.commit().await?;
}
{
let host = container.get_host().await?;
let port = container.get_host_port_ipv4(5432).await?;
let (client, connection) = tokio_postgres::connect(
&format!("host={host} port={port} user=postgres password=postgres dbname=ankurah"),
tokio_postgres::NoTls,
)
.await?;
tokio::spawn(async move {
let _ = connection.await;
});
let col_info =
client.query("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'track'", &[]).await?;
println!("Track table columns:");
for row in &col_info {
let name: String = row.get(0);
let dtype: String = row.get(1);
println!(" {} : {}", name, dtype);
}
}
println!("About to fetch with JSON path query...");
let fetch_result = ctx.fetch::<TrackView>("licensing.territory = 'US'").await;
println!("Fetch result: {:?}", fetch_result.as_ref().map(|v| v.len()).map_err(|e| e.to_string()));
let us_tracks = fetch_result?;
println!("Found {} tracks", us_tracks.len());
assert_eq!(us_tracks.len(), 1);
assert_eq!(us_tracks[0].name().unwrap(), "US Track");
Ok(())
}
#[tokio::test]
async fn test_json_path_query_numeric_comparison() -> Result<()> {
let (_container, storage) = common::create_postgres_container().await?;
let node = Node::new_durable(Arc::new(storage), PermissiveAgent::new());
node.system.create().await?;
let ctx = node.context_async(c).await;
{
let trx = ctx.begin();
trx.create(&Track {
name: "Short Track".to_string(),
licensing: Json::new(serde_json::json!({"duration": 120, "territory": "US"})),
})
.await?;
trx.create(&Track {
name: "Long Track".to_string(),
licensing: Json::new(serde_json::json!({"duration": 300, "territory": "US"})),
})
.await?;
trx.commit().await?;
}
let long_tracks: Vec<TrackView> = ctx.fetch("licensing.duration > 200").await?;
assert_eq!(long_tracks.len(), 1);
assert_eq!(long_tracks[0].name().unwrap(), "Long Track");
Ok(())
}
#[tokio::test]
async fn test_json_path_nested_query() -> Result<()> {
let (_container, storage) = common::create_postgres_container().await?;
let node = Node::new_durable(Arc::new(storage), PermissiveAgent::new());
node.system.create().await?;
let ctx = node.context_async(c).await;
{
let trx = ctx.begin();
trx.create(&Track {
name: "Label A Track".to_string(),
licensing: Json::new(serde_json::json!({
"rights": {
"holder": "Label A",
"type": "exclusive"
}
})),
})
.await?;
trx.create(&Track {
name: "Label B Track".to_string(),
licensing: Json::new(serde_json::json!({
"rights": {
"holder": "Label B",
"type": "non-exclusive"
}
})),
})
.await?;
trx.commit().await?;
}
let label_a_tracks: Vec<TrackView> = ctx.fetch("licensing.rights.holder = 'Label A'").await?;
assert_eq!(label_a_tracks.len(), 1);
assert_eq!(label_a_tracks[0].name().unwrap(), "Label A Track");
Ok(())
}
#[tokio::test]
async fn test_json_path_combined_with_regular_field() -> Result<()> {
let (_container, storage) = common::create_postgres_container().await?;
let node = Node::new_durable(Arc::new(storage), PermissiveAgent::new());
node.system.create().await?;
let ctx = node.context_async(c).await;
{
let trx = ctx.begin();
trx.create(&Track { name: "Track A".to_string(), licensing: Json::new(serde_json::json!({"territory": "US"})) }).await?;
trx.create(&Track { name: "Track B".to_string(), licensing: Json::new(serde_json::json!({"territory": "US"})) }).await?;
trx.create(&Track { name: "Track C".to_string(), licensing: Json::new(serde_json::json!({"territory": "UK"})) }).await?;
trx.commit().await?;
}
let tracks: Vec<TrackView> = ctx.fetch("name = 'Track A' AND licensing.territory = 'US'").await?;
assert_eq!(tracks.len(), 1);
assert_eq!(tracks[0].name().unwrap(), "Track A");
Ok(())
}