1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5pub async fn list_bm25_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
6 let rows = client
7 .query(
8 "SELECT schemaname, tablename, indexname, indexdef,
9 idx_scan, idx_tup_read, idx_tup_fetch
10 FROM pg_stat_user_indexes
11 WHERE indexdef LIKE '%USING bm25%'
12 ORDER BY schemaname, tablename, indexname",
13 &[],
14 )
15 .await?;
16
17 let indexes: Vec<Value> = rows
18 .iter()
19 .map(|row| {
20 json!({
21 "schema": row.get::<_, String>(0),
22 "table": row.get::<_, String>(1),
23 "index": row.get::<_, String>(2),
24 "definition": row.get::<_, String>(3),
25 "scans": row.get::<_, i64>(4),
26 "tuples_read": row.get::<_, i64>(5),
27 "tuples_fetched": row.get::<_, i64>(6),
28 })
29 })
30 .collect();
31
32 Ok(json!({ "bm25_indexes": indexes }))
33}
34
35pub async fn search_bm25(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
36 let table = params
37 .as_ref()
38 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
39 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
40 let query = params
41 .as_ref()
42 .and_then(|p| p.get("query").and_then(|v| v.as_str()))
43 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
44 let schema = params
45 .as_ref()
46 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
47 .unwrap_or("public");
48 let index_name = params
49 .as_ref()
50 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
51 let limit = params
52 .as_ref()
53 .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
54 .unwrap_or(10);
55 let select_cols = params
56 .as_ref()
57 .and_then(|p| p.get("select").and_then(|v| v.as_str()))
58 .unwrap_or("*");
59 let text_column = params
60 .as_ref()
61 .and_then(|p| p.get("text_column").and_then(|v| v.as_str()))
62 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'text_column'".into()))?;
63
64 let qualified = format!(
65 "{}.{}",
66 crate::validation::quote_ident(schema),
67 crate::validation::quote_ident(table)
68 );
69 let limit = limit.min(1000);
70
71 let sql = if let Some(idx) = index_name {
72 format!(
73 "SELECT {}, \"{}\" <@> to_bm25query('{}', '{}') AS bm25_score
74 FROM {}
75 ORDER BY bm25_score
76 LIMIT {}",
77 select_cols, text_column, query, idx, qualified, limit
78 )
79 } else {
80 format!(
81 "SELECT {}, \"{}\" <@> '{}' AS bm25_score
82 FROM {}
83 ORDER BY bm25_score
84 LIMIT {}",
85 select_cols, text_column, query, qualified, limit
86 )
87 };
88
89 let rows = client.query(&sql, &[]).await?;
90
91 let mut results = Vec::new();
92 for row in &rows {
93 let mut obj = serde_json::Map::new();
94 for (i, col) in row.columns().iter().enumerate() {
95 let name = col.name();
96 if name == "bm25_score" {
97 if let Ok(v) = row.try_get::<_, f64>(i) {
98 obj.insert(name.to_string(), json!(-v));
99 }
100 } else if let Ok(v) = row.try_get::<_, Value>(i) {
101 obj.insert(name.to_string(), v);
102 } else if let Ok(v) = row.try_get::<_, String>(i) {
103 obj.insert(name.to_string(), Value::String(v));
104 } else if let Ok(v) = row.try_get::<_, i64>(i) {
105 obj.insert(name.to_string(), json!(v));
106 } else if let Ok(v) = row.try_get::<_, f64>(i) {
107 obj.insert(name.to_string(), json!(v));
108 } else if let Ok(v) = row.try_get::<_, bool>(i) {
109 obj.insert(name.to_string(), json!(v));
110 } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
111 obj.insert(
112 name.to_string(),
113 v.map(Value::String).unwrap_or(Value::Null),
114 );
115 }
116 }
117 results.push(Value::Object(obj));
118 }
119
120 Ok(json!({
121 "results": results,
122 "count": results.len(),
123 "query": query,
124 }))
125}
126
127pub async fn create_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
128 let table = params
129 .as_ref()
130 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
131 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
132 let column = params
133 .as_ref()
134 .and_then(|p| p.get("column").and_then(|v| v.as_str()))
135 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
136 let text_config = params
137 .as_ref()
138 .and_then(|p| p.get("text_config").and_then(|v| v.as_str()))
139 .unwrap_or("english");
140 let schema = params
141 .as_ref()
142 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
143 .unwrap_or("public");
144 let index_name = params
145 .as_ref()
146 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()));
147 let k1 = params
148 .as_ref()
149 .and_then(|p| p.get("k1").and_then(|v| v.as_f64()));
150 let b = params
151 .as_ref()
152 .and_then(|p| p.get("b").and_then(|v| v.as_f64()));
153 let where_clause = params
154 .as_ref()
155 .and_then(|p| p.get("where").and_then(|v| v.as_str()));
156 let concurrently = params
157 .as_ref()
158 .and_then(|p| p.get("concurrently").and_then(|v| v.as_bool()))
159 .unwrap_or(false);
160
161 let idx_name = match index_name {
162 Some(name) => name.to_string(),
163 None => format!("idx_{}_{}_bm25", table, column),
164 };
165
166 let mut sql = "CREATE INDEX".to_string();
167 if concurrently {
168 sql.push_str(" CONCURRENTLY");
169 }
170 sql.push_str(&format!(
171 " {} ON {}.{}",
172 crate::validation::quote_ident(&idx_name),
173 crate::validation::quote_ident(schema),
174 crate::validation::quote_ident(table)
175 ));
176 sql.push_str(&format!(
177 " USING bm25({}) WITH (text_config='{}'",
178 crate::validation::quote_ident(column),
179 text_config
180 ));
181 if let Some(k) = k1 {
182 sql.push_str(&format!(", k1={}", k));
183 }
184 if let Some(b_val) = b {
185 sql.push_str(&format!(", b={}", b_val));
186 }
187 sql.push(')');
188
189 if let Some(w) = where_clause {
190 sql.push_str(&format!(" WHERE {}", w));
191 }
192
193 client.execute(&sql, &[]).await?;
194 Ok(json!({ "success": true, "index": idx_name, "sql": sql }))
195}
196
197pub async fn drop_bm25_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
198 let index_name = params
199 .as_ref()
200 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
201 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
202 let schema = params
203 .as_ref()
204 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
205 .unwrap_or("public");
206 let if_exists = params
207 .as_ref()
208 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
209 .unwrap_or(false);
210 let concurrently = params
211 .as_ref()
212 .and_then(|p| p.get("concurrently").and_then(|v| v.as_bool()))
213 .unwrap_or(false);
214
215 let mut sql = "DROP INDEX".to_string();
216 if concurrently {
217 sql.push_str(" CONCURRENTLY");
218 }
219 if if_exists {
220 sql.push_str(" IF EXISTS");
221 }
222 sql.push_str(&format!(
223 " {}.{}",
224 crate::validation::quote_ident(schema),
225 crate::validation::quote_ident(index_name)
226 ));
227
228 client.execute(&sql, &[]).await?;
229 Ok(json!({ "success": true, "sql": sql }))
230}
231
232pub async fn bm25_force_merge(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
233 let index_name = params
234 .as_ref()
235 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
236 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
237
238 let _rows = client
239 .query("SELECT bm25_force_merge($1)", &[&index_name])
240 .await?;
241
242 Ok(json!({
243 "success": true,
244 "index": index_name,
245 "message": "All segments merged into one",
246 }))
247}
248
249pub async fn bm25_index_stats(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
250 let index_name = params
251 .as_ref()
252 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
253 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index_name'".into()))?;
254
255 let rows = client
256 .query("SELECT bm25_summarize_index($1)", &[&index_name])
257 .await?;
258 let stats: String = rows[0].get(0);
259
260 Ok(json!({
261 "index": index_name,
262 "statistics": stats,
263 }))
264}