agent-first-psql 0.4.2

Persistent PostgreSQL client for AI agents — SQL-native JSONL in, JSONL out
Documentation
use super::*;
use crate::db::{DbExecutor, ExecError, ExecOutcome};
use async_trait::async_trait;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock};

#[tokio::test]
async fn emit_rows_uses_db_columns_even_when_rows_empty() {
    let columns = vec![
        ColumnInfo {
            name: "a".to_string(),
            type_name: "int4".to_string(),
        },
        ColumnInfo {
            name: "b".to_string(),
            type_name: "text".to_string(),
        },
    ];
    let (tx, mut rx) = mpsc::channel(16);
    let app = Arc::new(App::new(RuntimeConfig::default(), tx));
    let opts = ResolvedOptions {
        stream_rows: false,
        batch_rows: 10,
        batch_bytes: 1024,
        statement_timeout_ms: 100,
        lock_timeout_ms: 100,
        read_only: false,
        inline_max_rows: 100,
        inline_max_bytes: 1000,
    };

    let status = emit_rows_result(
        &app,
        Some("q_empty".to_string()),
        Some("default".to_string()),
        columns.clone(),
        vec![],
        std::time::Instant::now(),
        &opts,
    )
    .await;
    assert!(matches!(status, RowEmitStatus::Sent { .. }));
    let out_opt = rx.recv().await;
    assert!(out_opt.is_some());
    if let Some(out) = out_opt {
        assert!(matches!(out, Output::Result { .. }));
        if let Output::Result { columns: got, .. } = out {
            assert_eq!(got.len(), columns.len());
        }
    }
}

#[tokio::test]
async fn emit_rows_result_paths() {
    let (tx, mut rx) = mpsc::channel(64);
    let app = Arc::new(App::new(RuntimeConfig::default(), tx));

    let stream_opts = ResolvedOptions {
        stream_rows: true,
        batch_rows: 2,
        batch_bytes: 1024,
        statement_timeout_ms: 100,
        lock_timeout_ms: 100,
        read_only: false,
        inline_max_rows: 100,
        inline_max_bytes: 100000,
    };
    let status = emit_rows_result(
        &app,
        Some("q1".to_string()),
        Some("default".to_string()),
        vec![ColumnInfo {
            name: "n".to_string(),
            type_name: "int4".to_string(),
        }],
        vec![
            serde_json::json!({"n":1}),
            serde_json::json!({"n":2}),
            serde_json::json!({"n":3}),
        ],
        std::time::Instant::now(),
        &stream_opts,
    )
    .await;
    assert!(matches!(status, RowEmitStatus::Sent { .. }));
    while rx.try_recv().is_ok() {}

    let inline_opts = ResolvedOptions {
        stream_rows: false,
        batch_rows: 100,
        batch_bytes: 1024,
        statement_timeout_ms: 100,
        lock_timeout_ms: 100,
        read_only: false,
        inline_max_rows: 1,
        inline_max_bytes: 10000,
    };
    let status = emit_rows_result(
        &app,
        Some("q2".to_string()),
        Some("default".to_string()),
        vec![ColumnInfo {
            name: "n".to_string(),
            type_name: "int4".to_string(),
        }],
        vec![serde_json::json!({"n":1}), serde_json::json!({"n":2})],
        std::time::Instant::now(),
        &inline_opts,
    )
    .await;
    assert!(matches!(status, RowEmitStatus::TooLarge { .. }));
}

struct MockExecutor {
    result: Mutex<Option<Result<ExecOutcome, ExecError>>>,
}

#[async_trait]
impl DbExecutor for MockExecutor {
    async fn execute(
        &self,
        _session_name: &str,
        _session_cfg: &SessionConfig,
        _sql: &str,
        _params: &[Value],
        _opts: &ResolvedOptions,
    ) -> Result<ExecOutcome, ExecError> {
        self.result
            .lock()
            .await
            .take()
            .unwrap_or(Ok(ExecOutcome::Command { affected: 0 }))
    }
}

fn test_app_with_executor(
    cfg: RuntimeConfig,
    result: Result<ExecOutcome, ExecError>,
) -> (Arc<App>, mpsc::Receiver<Output>) {
    let (tx, rx) = mpsc::channel(64);
    let app = Arc::new(App {
        config: RwLock::new(cfg),
        executor: Arc::new(MockExecutor {
            result: Mutex::new(Some(result)),
        }),
        writer: tx,
        in_flight: Mutex::new(std::collections::HashMap::new()),
        requests_total: AtomicU64::new(0),
        start_time: std::time::Instant::now(),
    });
    (app, rx)
}

#[tokio::test]
async fn execute_query_unknown_session_emits_connect_failed() {
    let cfg = RuntimeConfig {
        default_session: "missing".to_string(),
        ..Default::default()
    };
    let (app, mut rx) = test_app_with_executor(cfg, Ok(ExecOutcome::Command { affected: 1 }));
    execute_query(
        &app,
        Some("q1".to_string()),
        Some("missing".to_string()),
        "select 1".to_string(),
        vec![],
        QueryOptions::default(),
    )
    .await;
    let msg_opt = rx.recv().await;
    assert!(msg_opt.is_some());
    if let Some(msg) = msg_opt {
        assert!(matches!(msg, Output::Error { .. }));
        if let Output::Error { error_code, .. } = msg {
            assert_eq!(error_code, "connect_failed");
        }
    }
}

#[tokio::test]
async fn execute_query_maps_executor_outcomes() {
    let mut cfg = RuntimeConfig::default();
    cfg.sessions
        .insert("default".to_string(), SessionConfig::default());

    for result in [
        Ok(ExecOutcome::Rows {
            columns: vec![ColumnInfo {
                name: "n".to_string(),
                type_name: "int4".to_string(),
            }],
            rows: vec![serde_json::json!({"n":1})],
        }),
        Ok(ExecOutcome::Command { affected: 2 }),
        Err(ExecError::Connect("down".to_string())),
        Err(ExecError::InvalidParams("bad".to_string())),
        Err(ExecError::Sql {
            sqlstate: "22023".to_string(),
            message: "bad".to_string(),
            detail: None,
            hint: None,
            position: None,
        }),
        Err(ExecError::Internal("boom".to_string())),
    ] {
        let (app, mut rx) = test_app_with_executor(cfg.clone(), result);
        execute_query(
            &app,
            Some("q1".to_string()),
            Some("default".to_string()),
            "select 1".to_string(),
            vec![],
            QueryOptions::default(),
        )
        .await;
        let msg_opt = rx.recv().await;
        assert!(msg_opt.is_some());
    }
}