Skip to main content

mcp_postgres/actions/
pgvector.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn list_vector_columns(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
6    let rows = client
7        .query(
8            "SELECT c.table_schema, c.table_name, c.column_name, c.data_type,
9                    e.udt_name
10             FROM information_schema.columns c
11             JOIN information_schema.element_types e ON (c.table_catalog, c.table_schema, c.table_name, c.column_name, c.dtd_identifier)
12             WHERE c.data_type = 'USER-DEFINED'
13               AND e.udt_name = 'vector'
14             ORDER BY c.table_schema, c.table_name, c.ordinal_position",
15            &[],
16        )
17        .await
18        ?;
19
20    let columns: Vec<Value> = rows.iter().map(|row| {
21        json!({
22            "schema": row.get::<_, String>(0),
23            "table": row.get::<_, String>(1),
24            "column": row.get::<_, String>(2),
25            "type": "vector",
26        })
27    }).collect();
28
29    Ok(json!({ "vector_columns": columns }))
30}
31
32pub async fn vector_search(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
33    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
34        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
35    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str()))
36        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
37    let vector = params.as_ref().and_then(|p| p.get("vector").and_then(|v| v.as_str()))
38        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'vector' parameter (e.g. '[0.1,0.2,0.3]')".into()))?;
39    let limit = params.as_ref().and_then(|p| p.get("limit").and_then(|v| v.as_i64())).unwrap_or(10);
40    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
41    let select_cols = params.as_ref().and_then(|p| p.get("select").and_then(|v| v.as_str())).unwrap_or("*");
42    let distance = params.as_ref().and_then(|p| p.get("distance").and_then(|v| v.as_str())).unwrap_or("cosine");
43
44    let operator = match distance {
45        "l2" | "euclidean" => "<->",
46        "inner" | "ip" => "<#>",
47        _ => "<=>",
48    };
49
50    let qcol = crate::validation::quote_ident(column);
51    let qual = format!("{}.{}", crate::validation::quote_ident(schema), crate::validation::quote_ident(table));
52    let sql = format!(
53        "SELECT {}, {qcol} {operator} '{vector}' AS distance
54         FROM {qual}
55         ORDER BY {qcol} {operator} '{vector}'
56         LIMIT {}",
57        select_cols,
58        limit.min(1000)
59    );
60
61    let rows = client.query(&sql, &[]).await?;
62
63    let mut results = Vec::new();
64    for row in &rows {
65        let mut obj = serde_json::Map::new();
66        for (i, col) in row.columns().iter().enumerate() {
67            let name = col.name();
68            if let Ok(v) = row.try_get::<_, Value>(i) {
69                obj.insert(name.to_string(), v);
70            } else if let Ok(v) = row.try_get::<_, String>(i) {
71                obj.insert(name.to_string(), Value::String(v));
72            } else if let Ok(v) = row.try_get::<_, i64>(i) {
73                obj.insert(name.to_string(), json!(v));
74            } else if let Ok(v) = row.try_get::<_, f64>(i) {
75                obj.insert(name.to_string(), json!(v));
76            } else if let Ok(v) = row.try_get::<_, bool>(i) {
77                obj.insert(name.to_string(), json!(v));
78            } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
79                obj.insert(name.to_string(), v.map(Value::String).unwrap_or(Value::Null));
80            }
81        }
82        results.push(Value::Object(obj));
83    }
84
85    Ok(json!({
86        "results": results,
87        "count": results.len(),
88        "distance_metric": distance,
89    }))
90}
91
92pub async fn create_vector_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
93    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
94        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
95    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str()))
96        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
97    let index_type = params.as_ref().and_then(|p| p.get("index_type").and_then(|v| v.as_str())).unwrap_or("hnsw");
98    let distance = params.as_ref().and_then(|p| p.get("distance").and_then(|v| v.as_str())).unwrap_or("cosine");
99    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
100
101    let distance_op = match distance {
102        "l2" | "euclidean" => "vector_l2_ops",
103        "inner" | "ip" => "vector_ip_ops",
104        _ => "vector_cosine_ops",
105    };
106
107    let index_name = format!("idx_{}_{}_{}", table, column, index_type);
108
109    let q_schema = crate::validation::quote_ident(schema);
110    let q_table = crate::validation::quote_ident(table);
111    let q_column = crate::validation::quote_ident(column);
112    let sql = match index_type {
113        "ivfflat" => {
114            let lists = params.as_ref().and_then(|p| p.get("lists").and_then(|v| v.as_i64())).unwrap_or(100);
115            format!(
116                "CREATE INDEX \"{index_name}\" ON {q_schema}.{q_table} USING ivfflat ({q_column} {distance_op}) WITH (lists = {lists})"
117            )
118        }
119        _ => {
120            format!(
121                "CREATE INDEX \"{index_name}\" ON {q_schema}.{q_table} USING hnsw ({q_column} {distance_op})"
122            )
123        }
124    };
125
126    client.execute(&sql, &[]).await?;
127    Ok(json!({ "success": true, "index": index_name, "sql": sql }))
128}