Skip to main content

mcp_postgres/actions/
pg_textsearch.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn list_bm25_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
6    let rows = client
7        .query(
8            "SELECT schemaname, tablename, indexname, indexdef,
9                    idx_scan, idx_tup_read, idx_tup_fetch
10             FROM pg_stat_user_indexes
11             WHERE indexdef LIKE '%USING bm25%'
12             ORDER BY schemaname, tablename, indexname",
13            &[],
14        )
15        .await?;
16
17    let indexes: Vec<Value> = rows.iter().map(|row| {
18        json!({
19            "schema": row.get::<_, String>(0),
20            "table": row.get::<_, String>(1),
21            "index": row.get::<_, String>(2),
22            "definition": row.get::<_, String>(3),
23            "scans": row.get::<_, i64>(4),
24            "tuples_read": row.get::<_, i64>(5),
25            "tuples_fetched": row.get::<_, i64>(6),
26        })
27    }).collect();
28
29    Ok(json!({ "bm25_indexes": indexes }))
30}
31
32pub async fn search_bm25(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
33    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
34        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
35    let query = params.as_ref().and_then(|p| p.get("query").and_then(|v| v.as_str()))
36        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
37    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
38    let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
39    let limit = params.as_ref().and_then(|p| p.get("limit").and_then(|v| v.as_i64())).unwrap_or(10);
40    let select_cols = params.as_ref().and_then(|p| p.get("select").and_then(|v| v.as_str())).unwrap_or("*");
41    let text_column = params.as_ref().and_then(|p| p.get("text_column").and_then(|v| v.as_str()))
42        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'text_column'".into()))?;
43
44    let qualified = format!("{}.{}", crate::validation::quote_ident(schema), crate::validation::quote_ident(table));
45    let limit = limit.min(1000);
46
47    let sql = if let Some(idx) = index_name {
48        format!(
49            "SELECT {}, \"{}\" <@> to_bm25query('{}', '{}') AS bm25_score
50             FROM {}
51             ORDER BY bm25_score
52             LIMIT {}",
53            select_cols, text_column, query, idx, qualified, limit
54        )
55    } else {
56        format!(
57            "SELECT {}, \"{}\" <@> '{}' AS bm25_score
58             FROM {}
59             ORDER BY bm25_score
60             LIMIT {}",
61            select_cols, text_column, query, qualified, limit
62        )
63    };
64
65    let rows = client.query(&sql, &[]).await?;
66
67    let mut results = Vec::new();
68    for row in &rows {
69        let mut obj = serde_json::Map::new();
70        for (i, col) in row.columns().iter().enumerate() {
71            let name = col.name();
72            if name == "bm25_score" {
73                if let Ok(v) = row.try_get::<_, f64>(i) {
74                    obj.insert(name.to_string(), json!(-v));
75                }
76            } else if let Ok(v) = row.try_get::<_, Value>(i) {
77                obj.insert(name.to_string(), v);
78            } else if let Ok(v) = row.try_get::<_, String>(i) {
79                obj.insert(name.to_string(), Value::String(v));
80            } else if let Ok(v) = row.try_get::<_, i64>(i) {
81                obj.insert(name.to_string(), json!(v));
82            } else if let Ok(v) = row.try_get::<_, f64>(i) {
83                obj.insert(name.to_string(), json!(v));
84            } else if let Ok(v) = row.try_get::<_, bool>(i) {
85                obj.insert(name.to_string(), json!(v));
86            } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
87                obj.insert(name.to_string(), v.map(Value::String).unwrap_or(Value::Null));
88            }
89        }
90        results.push(Value::Object(obj));
91    }
92
93    Ok(json!({
94        "results": results,
95        "count": results.len(),
96        "query": query,
97    }))
98}
99
100pub async fn create_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
101    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
102        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
103    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str()))
104        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
105    let text_config = params.as_ref().and_then(|p| p.get("text_config").and_then(|v| v.as_str())).unwrap_or("english");
106    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
107    let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
108    let k1 = params.as_ref().and_then(|p| p.get("k1").and_then(|v| v.as_f64()));
109    let b = params.as_ref().and_then(|p| p.get("b").and_then(|v| v.as_f64()));
110    let where_clause = params.as_ref().and_then(|p| p.get("where").and_then(|v| v.as_str()));
111    let concurrently = params.as_ref().and_then(|p| p.get("concurrently").and_then(|v| v.as_bool())).unwrap_or(false);
112
113    let idx_name = match index_name {
114        Some(name) => name.to_string(),
115        None => format!("idx_{}_{}_bm25", table, column),
116    };
117
118    let mut sql = "CREATE INDEX".to_string();
119    if concurrently { sql.push_str(" CONCURRENTLY"); }
120    sql.push_str(&format!(" {} ON {}.{}", crate::validation::quote_ident(&idx_name), crate::validation::quote_ident(schema), crate::validation::quote_ident(table)));
121    sql.push_str(&format!(" USING bm25({}) WITH (text_config='{}'", crate::validation::quote_ident(column), text_config));
122    if let Some(k) = k1 { sql.push_str(&format!(", k1={}", k)); }
123    if let Some(b_val) = b { sql.push_str(&format!(", b={}", b_val)); }
124    sql.push(')');
125
126    if let Some(w) = where_clause {
127        sql.push_str(&format!(" WHERE {}", w));
128    }
129
130    client.execute(&sql, &[]).await?;
131    Ok(json!({ "success": true, "index": idx_name, "sql": sql }))
132}
133
134pub async fn drop_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
135    let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
136        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
137    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
138    let if_exists = params.as_ref().and_then(|p| p.get("if_exists").and_then(|v| v.as_bool())).unwrap_or(false);
139    let concurrently = params.as_ref().and_then(|p| p.get("concurrently").and_then(|v| v.as_bool())).unwrap_or(false);
140
141    let mut sql = "DROP INDEX".to_string();
142    if concurrently { sql.push_str(" CONCURRENTLY"); }
143    if if_exists { sql.push_str(" IF EXISTS"); }
144    sql.push_str(&format!(" {}.{}", crate::validation::quote_ident(schema), crate::validation::quote_ident(index_name)));
145
146    client.execute(&sql, &[]).await?;
147    Ok(json!({ "success": true, "sql": sql }))
148}
149
150pub async fn bm25_force_merge(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
151    let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
152        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
153
154    let _rows = client.query("SELECT bm25_force_merge($1)", &[&index_name]).await?;
155
156    Ok(json!({
157        "success": true,
158        "index": index_name,
159        "message": "All segments merged into one",
160    }))
161}
162
163pub async fn bm25_index_stats(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
164    let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
165        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
166
167    let rows = client.query("SELECT bm25_summarize_index($1)", &[&index_name]).await?;
168    let stats: String = rows[0].get(0);
169
170    Ok(json!({
171        "index": index_name,
172        "statistics": stats,
173    }))
174}