1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn list_bm25_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
6 let rows = client
7 .query(
8 "SELECT schemaname, tablename, indexname, indexdef,
9 idx_scan, idx_tup_read, idx_tup_fetch
10 FROM pg_stat_user_indexes
11 WHERE indexdef LIKE '%USING bm25%'
12 ORDER BY schemaname, tablename, indexname",
13 &[],
14 )
15 .await?;
16
17 let indexes: Vec<Value> = rows.iter().map(|row| {
18 json!({
19 "schema": row.get::<_, String>(0),
20 "table": row.get::<_, String>(1),
21 "index": row.get::<_, String>(2),
22 "definition": row.get::<_, String>(3),
23 "scans": row.get::<_, i64>(4),
24 "tuples_read": row.get::<_, i64>(5),
25 "tuples_fetched": row.get::<_, i64>(6),
26 })
27 }).collect();
28
29 Ok(json!({ "bm25_indexes": indexes }))
30}
31
32pub async fn search_bm25(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
33 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
34 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
35 let query = params.as_ref().and_then(|p| p.get("query").and_then(|v| v.as_str()))
36 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
37 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
38 let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
39 let limit = params.as_ref().and_then(|p| p.get("limit").and_then(|v| v.as_i64())).unwrap_or(10);
40 let select_cols = params.as_ref().and_then(|p| p.get("select").and_then(|v| v.as_str())).unwrap_or("*");
41 let text_column = params.as_ref().and_then(|p| p.get("text_column").and_then(|v| v.as_str()))
42 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'text_column'".into()))?;
43
44 let qualified = format!("{}.{}", crate::validation::quote_ident(schema), crate::validation::quote_ident(table));
45 let limit = limit.min(1000);
46
47 let sql = if let Some(idx) = index_name {
48 format!(
49 "SELECT {}, \"{}\" <@> to_bm25query('{}', '{}') AS bm25_score
50 FROM {}
51 ORDER BY bm25_score
52 LIMIT {}",
53 select_cols, text_column, query, idx, qualified, limit
54 )
55 } else {
56 format!(
57 "SELECT {}, \"{}\" <@> '{}' AS bm25_score
58 FROM {}
59 ORDER BY bm25_score
60 LIMIT {}",
61 select_cols, text_column, query, qualified, limit
62 )
63 };
64
65 let rows = client.query(&sql, &[]).await?;
66
67 let mut results = Vec::new();
68 for row in &rows {
69 let mut obj = serde_json::Map::new();
70 for (i, col) in row.columns().iter().enumerate() {
71 let name = col.name();
72 if name == "bm25_score" {
73 if let Ok(v) = row.try_get::<_, f64>(i) {
74 obj.insert(name.to_string(), json!(-v));
75 }
76 } else if let Ok(v) = row.try_get::<_, Value>(i) {
77 obj.insert(name.to_string(), v);
78 } else if let Ok(v) = row.try_get::<_, String>(i) {
79 obj.insert(name.to_string(), Value::String(v));
80 } else if let Ok(v) = row.try_get::<_, i64>(i) {
81 obj.insert(name.to_string(), json!(v));
82 } else if let Ok(v) = row.try_get::<_, f64>(i) {
83 obj.insert(name.to_string(), json!(v));
84 } else if let Ok(v) = row.try_get::<_, bool>(i) {
85 obj.insert(name.to_string(), json!(v));
86 } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
87 obj.insert(name.to_string(), v.map(Value::String).unwrap_or(Value::Null));
88 }
89 }
90 results.push(Value::Object(obj));
91 }
92
93 Ok(json!({
94 "results": results,
95 "count": results.len(),
96 "query": query,
97 }))
98}
99
100pub async fn create_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
101 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
102 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
103 let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str()))
104 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
105 let text_config = params.as_ref().and_then(|p| p.get("text_config").and_then(|v| v.as_str())).unwrap_or("english");
106 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
107 let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
108 let k1 = params.as_ref().and_then(|p| p.get("k1").and_then(|v| v.as_f64()));
109 let b = params.as_ref().and_then(|p| p.get("b").and_then(|v| v.as_f64()));
110 let where_clause = params.as_ref().and_then(|p| p.get("where").and_then(|v| v.as_str()));
111 let concurrently = params.as_ref().and_then(|p| p.get("concurrently").and_then(|v| v.as_bool())).unwrap_or(false);
112
113 let idx_name = match index_name {
114 Some(name) => name.to_string(),
115 None => format!("idx_{}_{}_bm25", table, column),
116 };
117
118 let mut sql = "CREATE INDEX".to_string();
119 if concurrently { sql.push_str(" CONCURRENTLY"); }
120 sql.push_str(&format!(" {} ON {}.{}", crate::validation::quote_ident(&idx_name), crate::validation::quote_ident(schema), crate::validation::quote_ident(table)));
121 sql.push_str(&format!(" USING bm25({}) WITH (text_config='{}'", crate::validation::quote_ident(column), text_config));
122 if let Some(k) = k1 { sql.push_str(&format!(", k1={}", k)); }
123 if let Some(b_val) = b { sql.push_str(&format!(", b={}", b_val)); }
124 sql.push(')');
125
126 if let Some(w) = where_clause {
127 sql.push_str(&format!(" WHERE {}", w));
128 }
129
130 client.execute(&sql, &[]).await?;
131 Ok(json!({ "success": true, "index": idx_name, "sql": sql }))
132}
133
134pub async fn drop_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
135 let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
136 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
137 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
138 let if_exists = params.as_ref().and_then(|p| p.get("if_exists").and_then(|v| v.as_bool())).unwrap_or(false);
139 let concurrently = params.as_ref().and_then(|p| p.get("concurrently").and_then(|v| v.as_bool())).unwrap_or(false);
140
141 let mut sql = "DROP INDEX".to_string();
142 if concurrently { sql.push_str(" CONCURRENTLY"); }
143 if if_exists { sql.push_str(" IF EXISTS"); }
144 sql.push_str(&format!(" {}.{}", crate::validation::quote_ident(schema), crate::validation::quote_ident(index_name)));
145
146 client.execute(&sql, &[]).await?;
147 Ok(json!({ "success": true, "sql": sql }))
148}
149
150pub async fn bm25_force_merge(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
151 let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
152 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
153
154 let _rows = client.query("SELECT bm25_force_merge($1)", &[&index_name]).await?;
155
156 Ok(json!({
157 "success": true,
158 "index": index_name,
159 "message": "All segments merged into one",
160 }))
161}
162
163pub async fn bm25_index_stats(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
164 let index_name = params.as_ref().and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
165 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
166
167 let rows = client.query("SELECT bm25_summarize_index($1)", &[&index_name]).await?;
168 let stats: String = rows[0].get(0);
169
170 Ok(json!({
171 "index": index_name,
172 "statistics": stats,
173 }))
174}