Skip to main content

mcp_postgres/actions/
pg_textsearch.rs

1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5pub async fn list_bm25_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
6    let rows = client
7        .query(
8            "SELECT schemaname, tablename, indexname, indexdef,
9                    idx_scan, idx_tup_read, idx_tup_fetch
10             FROM pg_stat_user_indexes
11             WHERE indexdef LIKE '%USING bm25%'
12             ORDER BY schemaname, tablename, indexname",
13            &[],
14        )
15        .await?;
16
17    let indexes: Vec<Value> = rows
18        .iter()
19        .map(|row| {
20            json!({
21                "schema": row.get::<_, String>(0),
22                "table": row.get::<_, String>(1),
23                "index": row.get::<_, String>(2),
24                "definition": row.get::<_, String>(3),
25                "scans": row.get::<_, i64>(4),
26                "tuples_read": row.get::<_, i64>(5),
27                "tuples_fetched": row.get::<_, i64>(6),
28            })
29        })
30        .collect();
31
32    Ok(json!({ "bm25_indexes": indexes }))
33}
34
35pub async fn search_bm25(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
36    let table = params
37        .as_ref()
38        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
39        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
40    let query = params
41        .as_ref()
42        .and_then(|p| p.get("query").and_then(|v| v.as_str()))
43        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
44    let schema = params
45        .as_ref()
46        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
47        .unwrap_or("public");
48    let index_name = params
49        .as_ref()
50        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
51    let limit = params
52        .as_ref()
53        .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
54        .unwrap_or(10);
55    let select_cols = params
56        .as_ref()
57        .and_then(|p| p.get("select").and_then(|v| v.as_str()))
58        .unwrap_or("*");
59    let text_column = params
60        .as_ref()
61        .and_then(|p| p.get("text_column").and_then(|v| v.as_str()))
62        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'text_column'".into()))?;
63
64    let qualified = format!(
65        "{}.{}",
66        crate::validation::quote_ident(schema),
67        crate::validation::quote_ident(table)
68    );
69    let limit = limit.min(1000);
70
71    let sql = if let Some(idx) = index_name {
72        format!(
73            "SELECT {}, \"{}\" <@> to_bm25query('{}', '{}') AS bm25_score
74             FROM {}
75             ORDER BY bm25_score
76             LIMIT {}",
77            select_cols, text_column, query, idx, qualified, limit
78        )
79    } else {
80        format!(
81            "SELECT {}, \"{}\" <@> '{}' AS bm25_score
82             FROM {}
83             ORDER BY bm25_score
84             LIMIT {}",
85            select_cols, text_column, query, qualified, limit
86        )
87    };
88
89    let rows = client.query(&sql, &[]).await?;
90
91    let mut results = Vec::new();
92    for row in &rows {
93        let mut obj = serde_json::Map::new();
94        for (i, col) in row.columns().iter().enumerate() {
95            let name = col.name();
96            if name == "bm25_score" {
97                if let Ok(v) = row.try_get::<_, f64>(i) {
98                    obj.insert(name.to_string(), json!(-v));
99                }
100            } else if let Ok(v) = row.try_get::<_, Value>(i) {
101                obj.insert(name.to_string(), v);
102            } else if let Ok(v) = row.try_get::<_, String>(i) {
103                obj.insert(name.to_string(), Value::String(v));
104            } else if let Ok(v) = row.try_get::<_, i64>(i) {
105                obj.insert(name.to_string(), json!(v));
106            } else if let Ok(v) = row.try_get::<_, f64>(i) {
107                obj.insert(name.to_string(), json!(v));
108            } else if let Ok(v) = row.try_get::<_, bool>(i) {
109                obj.insert(name.to_string(), json!(v));
110            } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
111                obj.insert(
112                    name.to_string(),
113                    v.map(Value::String).unwrap_or(Value::Null),
114                );
115            }
116        }
117        results.push(Value::Object(obj));
118    }
119
120    Ok(json!({
121        "results": results,
122        "count": results.len(),
123        "query": query,
124    }))
125}
126
127pub async fn create_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
128    let table = params
129        .as_ref()
130        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
131        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
132    let column = params
133        .as_ref()
134        .and_then(|p| p.get("column").and_then(|v| v.as_str()))
135        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
136    let text_config = params
137        .as_ref()
138        .and_then(|p| p.get("text_config").and_then(|v| v.as_str()))
139        .unwrap_or("english");
140    let schema = params
141        .as_ref()
142        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
143        .unwrap_or("public");
144    let index_name = params
145        .as_ref()
146        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
147    let k1 = params
148        .as_ref()
149        .and_then(|p| p.get("k1").and_then(|v| v.as_f64()));
150    let b = params
151        .as_ref()
152        .and_then(|p| p.get("b").and_then(|v| v.as_f64()));
153    let where_clause = params
154        .as_ref()
155        .and_then(|p| p.get("where").and_then(|v| v.as_str()));
156    let concurrently = params
157        .as_ref()
158        .and_then(|p| p.get("concurrently").and_then(|v| v.as_bool()))
159        .unwrap_or(false);
160
161    let idx_name = match index_name {
162        Some(name) => name.to_string(),
163        None => format!("idx_{}_{}_bm25", table, column),
164    };
165
166    let mut sql = "CREATE INDEX".to_string();
167    if concurrently {
168        sql.push_str(" CONCURRENTLY");
169    }
170    sql.push_str(&format!(
171        " {} ON {}.{}",
172        crate::validation::quote_ident(&idx_name),
173        crate::validation::quote_ident(schema),
174        crate::validation::quote_ident(table)
175    ));
176    sql.push_str(&format!(
177        " USING bm25({}) WITH (text_config='{}'",
178        crate::validation::quote_ident(column),
179        text_config
180    ));
181    if let Some(k) = k1 {
182        sql.push_str(&format!(", k1={}", k));
183    }
184    if let Some(b_val) = b {
185        sql.push_str(&format!(", b={}", b_val));
186    }
187    sql.push(')');
188
189    if let Some(w) = where_clause {
190        sql.push_str(&format!(" WHERE {}", w));
191    }
192
193    client.execute(&sql, &[]).await?;
194    Ok(json!({ "success": true, "index": idx_name, "sql": sql }))
195}
196
197pub async fn drop_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
198    let index_name = params
199        .as_ref()
200        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
201        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
202    let schema = params
203        .as_ref()
204        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
205        .unwrap_or("public");
206    let if_exists = params
207        .as_ref()
208        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
209        .unwrap_or(false);
210    let concurrently = params
211        .as_ref()
212        .and_then(|p| p.get("concurrently").and_then(|v| v.as_bool()))
213        .unwrap_or(false);
214
215    let mut sql = "DROP INDEX".to_string();
216    if concurrently {
217        sql.push_str(" CONCURRENTLY");
218    }
219    if if_exists {
220        sql.push_str(" IF EXISTS");
221    }
222    sql.push_str(&format!(
223        " {}.{}",
224        crate::validation::quote_ident(schema),
225        crate::validation::quote_ident(index_name)
226    ));
227
228    client.execute(&sql, &[]).await?;
229    Ok(json!({ "success": true, "sql": sql }))
230}
231
232pub async fn bm25_force_merge(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
233    let index_name = params
234        .as_ref()
235        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
236        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
237
238    let _rows = client
239        .query("SELECT bm25_force_merge($1)", &[&index_name])
240        .await?;
241
242    Ok(json!({
243        "success": true,
244        "index": index_name,
245        "message": "All segments merged into one",
246    }))
247}
248
249pub async fn bm25_index_stats(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
250    let index_name = params
251        .as_ref()
252        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
253        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
254
255    let rows = client
256        .query("SELECT bm25_summarize_index($1)", &[&index_name])
257        .await?;
258    let stats: String = rows[0].get(0);
259
260    Ok(json!({
261        "index": index_name,
262        "statistics": stats,
263    }))
264}