mcp_postgres/actions/
pgvector.rs1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn list_vector_columns(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
6 let rows = client
7 .query(
8 "SELECT c.table_schema, c.table_name, c.column_name, c.data_type,
9 e.udt_name
10 FROM information_schema.columns c
11 JOIN information_schema.element_types e ON (c.table_catalog, c.table_schema, c.table_name, c.column_name, c.dtd_identifier)
12 WHERE c.data_type = 'USER-DEFINED'
13 AND e.udt_name = 'vector'
14 ORDER BY c.table_schema, c.table_name, c.ordinal_position",
15 &[],
16 )
17 .await
18 ?;
19
20 let columns: Vec<Value> = rows.iter().map(|row| {
21 json!({
22 "schema": row.get::<_, String>(0),
23 "table": row.get::<_, String>(1),
24 "column": row.get::<_, String>(2),
25 "type": "vector",
26 })
27 }).collect();
28
29 Ok(json!({ "vector_columns": columns }))
30}
31
32pub async fn vector_search(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
33 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
34 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
35 let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str()))
36 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
37 let vector = params.as_ref().and_then(|p| p.get("vector").and_then(|v| v.as_str()))
38 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'vector' parameter (e.g. '[0.1,0.2,0.3]')".into()))?;
39 let limit = params.as_ref().and_then(|p| p.get("limit").and_then(|v| v.as_i64())).unwrap_or(10);
40 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
41 let select_cols = params.as_ref().and_then(|p| p.get("select").and_then(|v| v.as_str())).unwrap_or("*");
42 let distance = params.as_ref().and_then(|p| p.get("distance").and_then(|v| v.as_str())).unwrap_or("cosine");
43
44 let operator = match distance {
45 "l2" | "euclidean" => "<->",
46 "inner" | "ip" => "<#>",
47 _ => "<=>",
48 };
49
50 let qcol = crate::validation::quote_ident(column);
51 let qual = format!("{}.{}", crate::validation::quote_ident(schema), crate::validation::quote_ident(table));
52 let sql = format!(
53 "SELECT {}, {qcol} {operator} '{vector}' AS distance
54 FROM {qual}
55 ORDER BY {qcol} {operator} '{vector}'
56 LIMIT {}",
57 select_cols,
58 limit.min(1000)
59 );
60
61 let rows = client.query(&sql, &[]).await?;
62
63 let mut results = Vec::new();
64 for row in &rows {
65 let mut obj = serde_json::Map::new();
66 for (i, col) in row.columns().iter().enumerate() {
67 let name = col.name();
68 if let Ok(v) = row.try_get::<_, Value>(i) {
69 obj.insert(name.to_string(), v);
70 } else if let Ok(v) = row.try_get::<_, String>(i) {
71 obj.insert(name.to_string(), Value::String(v));
72 } else if let Ok(v) = row.try_get::<_, i64>(i) {
73 obj.insert(name.to_string(), json!(v));
74 } else if let Ok(v) = row.try_get::<_, f64>(i) {
75 obj.insert(name.to_string(), json!(v));
76 } else if let Ok(v) = row.try_get::<_, bool>(i) {
77 obj.insert(name.to_string(), json!(v));
78 } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
79 obj.insert(name.to_string(), v.map(Value::String).unwrap_or(Value::Null));
80 }
81 }
82 results.push(Value::Object(obj));
83 }
84
85 Ok(json!({
86 "results": results,
87 "count": results.len(),
88 "distance_metric": distance,
89 }))
90}
91
92pub async fn create_vector_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
93 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
94 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
95 let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str()))
96 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
97 let index_type = params.as_ref().and_then(|p| p.get("index_type").and_then(|v| v.as_str())).unwrap_or("hnsw");
98 let distance = params.as_ref().and_then(|p| p.get("distance").and_then(|v| v.as_str())).unwrap_or("cosine");
99 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
100
101 let distance_op = match distance {
102 "l2" | "euclidean" => "vector_l2_ops",
103 "inner" | "ip" => "vector_ip_ops",
104 _ => "vector_cosine_ops",
105 };
106
107 let index_name = format!("idx_{}_{}_{}", table, column, index_type);
108
109 let q_schema = crate::validation::quote_ident(schema);
110 let q_table = crate::validation::quote_ident(table);
111 let q_column = crate::validation::quote_ident(column);
112 let sql = match index_type {
113 "ivfflat" => {
114 let lists = params.as_ref().and_then(|p| p.get("lists").and_then(|v| v.as_i64())).unwrap_or(100);
115 format!(
116 "CREATE INDEX \"{index_name}\" ON {q_schema}.{q_table} USING ivfflat ({q_column} {distance_op}) WITH (lists = {lists})"
117 )
118 }
119 _ => {
120 format!(
121 "CREATE INDEX \"{index_name}\" ON {q_schema}.{q_table} USING hnsw ({q_column} {distance_op})"
122 )
123 }
124 };
125
126 client.execute(&sql, &[]).await?;
127 Ok(json!({ "success": true, "index": index_name, "sql": sql }))
128}