kowalski-core 1.2.0

Kowalski Core Module: A Rust-based agent for interacting with Ollama models
Documentation
//! Optional Postgres extension probes (pgvector, Apache AGE).

use crate::error::KowalskiError;

/// AGE `cypher()` requires graph names as SQL **literals**, not prepared parameters (`$1::name`).
/// Allow only safe identifier-style names to avoid injection when embedding in SQL.
#[cfg(feature = "postgres")]
fn validate_age_graph_name(name: &str) -> Result<(), KowalskiError> {
    if name.is_empty() || name.len() > 63 {
        return Err(KowalskiError::Configuration(
            "invalid AGE graph name (length)".into(),
        ));
    }
    let ok = name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_');
    if !ok {
        return Err(KowalskiError::Configuration(
            "invalid AGE graph name (use letters, digits, underscore only)".into(),
        ));
    }
    Ok(())
}

/// Report whether `vector` and `age` extensions are installed.
#[cfg(feature = "postgres")]
pub async fn postgres_graph_status(database_url: &str) -> Result<serde_json::Value, KowalskiError> {
    use serde_json::json;
    use sqlx::postgres::PgPool;
    let pool = PgPool::connect(database_url)
        .await
        .map_err(|e| KowalskiError::Configuration(format!("graph status connect: {e}")))?;
    let vector: bool = sqlx::query_scalar::<_, bool>(
        "SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'vector')",
    )
    .fetch_one(&pool)
    .await
    .map_err(|e| KowalskiError::Configuration(format!("graph status: {e}")))?;
    let age: bool = sqlx::query_scalar::<_, bool>(
        "SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'age')",
    )
    .fetch_one(&pool)
    .await
    .map_err(|e| KowalskiError::Configuration(format!("graph status: {e}")))?;
    Ok(json!({
        "postgres": true,
        "vector_extension": vector,
        "age_extension": age,
    }))
}

#[cfg(not(feature = "postgres"))]
pub async fn postgres_graph_status(
    _database_url: &str,
) -> Result<serde_json::Value, KowalskiError> {
    Err(KowalskiError::Configuration(
        "postgres feature not enabled".into(),
    ))
}

/// Run a Cypher query via Apache AGE [`cypher`](https://age.apache.org/).
/// The query must expose a single `agtype` column named `result` (e.g. `RETURN x AS result`).
#[cfg(feature = "postgres")]
pub async fn postgres_age_cypher(
    database_url: &str,
    graph_name: &str,
    cypher: &str,
) -> Result<serde_json::Value, KowalskiError> {
    use serde_json::json;
    use sqlx::postgres::PgPool;

    let pool = PgPool::connect(database_url)
        .await
        .map_err(|e| KowalskiError::Configuration(format!("age cypher connect: {e}")))?;

    let age: bool = sqlx::query_scalar::<_, bool>(
        "SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'age')",
    )
    .fetch_one(&pool)
    .await
    .map_err(|e| KowalskiError::Configuration(format!("age cypher: {e}")))?;

    if !age {
        return Err(KowalskiError::Configuration(
            "Apache AGE extension is not installed on this Postgres instance".into(),
        ));
    }

    // AGE requires `ag_catalog` on `search_path` (and often `LOAD 'age'`) on the same session as
    // `cypher(...)`. Pool connections are arbitrary; use one connection for SET + query.
    let mut conn = pool
        .acquire()
        .await
        .map_err(|e| KowalskiError::Configuration(format!("age cypher acquire: {e}")))?;
    let _ = sqlx::query("LOAD 'age'").execute(&mut *conn).await;
    sqlx::query("SET search_path = ag_catalog, public")
        .execute(&mut *conn)
        .await
        .map_err(|e| KowalskiError::Configuration(format!("age search_path: {e}")))?;

    validate_age_graph_name(graph_name)?;

    // Dollar-quote the Cypher body so quotes/newlines are safe; pick a tag that cannot appear in the query.
    let mut tag = format!("ag{}", uuid::Uuid::new_v4().as_simple());
    while cypher.contains(&format!("${}$", tag)) {
        tag = format!("ag{}", uuid::Uuid::new_v4().as_simple());
    }

    // First argument must be a literal graph name (AGE rejects bound parameters with "a name constant is expected").
    // Second is `$tag$cypher$tag$` (PostgreSQL dollar-quoted string).
    let sql = format!(
        "SELECT (result)::text FROM cypher('{}', ${}${}${}$) AS (result agtype)",
        graph_name, tag, cypher, tag
    );

    let raw: Vec<Option<String>> = sqlx::query_scalar(&sql)
        .fetch_all(&mut *conn)
        .await
        .map_err(|e| KowalskiError::Configuration(format!("cypher execution: {e}")))?;

    let rows: Vec<serde_json::Value> = raw.into_iter().map(|t| json!(t)).collect();
    Ok(json!({ "rows": rows }))
}

#[cfg(not(feature = "postgres"))]
pub async fn postgres_age_cypher(
    _database_url: &str,
    _graph_name: &str,
    _cypher: &str,
) -> Result<serde_json::Value, KowalskiError> {
    Err(KowalskiError::Configuration(
        "postgres feature not enabled".into(),
    ))
}