Skip to main content

mcp_postgres/actions/
timescaledb.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn create_hypertable(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
6    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
7        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
8    let time_column = params.as_ref().and_then(|p| p.get("time_column").and_then(|v| v.as_str()))
9        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'time_column'".into()))?;
10    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
11    let chunk_time = params.as_ref().and_then(|p| p.get("chunk_time_interval").and_then(|v| v.as_str()));
12
13    let mut sql = format!("SELECT create_hypertable('{}.{}', '{}'",
14        crate::validation::quote_ident(schema), crate::validation::quote_ident(table), time_column);
15    if let Some(ct) = chunk_time {
16        sql.push_str(&format!(", chunk_time_interval => INTERVAL '{}'", ct));
17    }
18    sql.push(')');
19
20    let rows = client.query(&sql, &[]).await?;
21    let created: bool = rows[0].get(0);
22
23    Ok(json!({ "success": created, "hypertable": format!("{}.{}", schema, table), "sql": sql }))
24}
25
26pub async fn show_hypertable_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
27    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
28        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
29    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
30
31    let rows = client
32        .query(
33            "SELECT hypertable_name, hypertable_schema, owner,
34                    num_dimensions, chunk_target_size,
35                    compression_state, tablespaces
36             FROM timescaledb_information.hypertables
37             WHERE hypertable_name = $1 AND hypertable_schema = $2",
38            &[&table, &schema],
39        )
40        .await?;
41
42    if rows.is_empty() {
43        return Ok(json!({ "table": format!("{}.{}", schema, table), "is_hypertable": false }));
44    }
45
46    let row = &rows[0];
47    Ok(json!({
48        "table": row.get::<_, String>(0),
49        "schema": row.get::<_, String>(1),
50        "owner": row.get::<_, Option<String>>(2),
51        "dimensions": row.get::<_, Option<i32>>(3),
52        "chunk_target_size": row.get::<_, Option<String>>(4),
53        "compression": row.get::<_, Option<String>>(5),
54        "tablespaces": row.get::<_, Option<String>>(6),
55        "is_hypertable": true,
56    }))
57}
58
59pub async fn show_chunks(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
60    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
61        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
62    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
63
64    let rows = client
65        .query(
66            "SELECT chunk_name, chunk_schema, table_name, table_schema,
67                    range_start::text, range_end::text, is_compressed::text,
68                    disk_size::text
69             FROM timescaledb_information.chunks
70             WHERE table_name = $1 AND table_schema = $2
71             ORDER BY range_start",
72            &[&table, &schema],
73        )
74        .await?;
75
76    let chunks: Vec<Value> = rows.iter().map(|row| {
77        json!({
78            "chunk_name": row.get::<_, String>(0),
79            "chunk_schema": row.get::<_, String>(1),
80            "range_start": row.get::<_, String>(3),
81            "range_end": row.get::<_, String>(4),
82            "compressed": row.get::<_, String>(5),
83            "disk_size": row.get::<_, Option<String>>(6),
84        })
85    }).collect();
86
87    Ok(json!({ "chunks": chunks, "count": chunks.len() }))
88}
89
90pub async fn add_retention_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
91    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
92        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
93    let drop_after = params.as_ref().and_then(|p| p.get("drop_after").and_then(|v| v.as_str()))
94        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'drop_after' (e.g. '90 days')".into()))?;
95    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
96
97    let sql = format!(
98        "SELECT add_retention_policy('{}.{}', INTERVAL '{}')",
99        crate::validation::quote_ident(schema),
100        crate::validation::quote_ident(table),
101        drop_after
102    );
103
104    let rows = client.query(&sql, &[]).await?;
105    let job_id: i32 = rows[0].get(0);
106
107    Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
108}
109
110pub async fn add_compression_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
111    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
112        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
113    let compress_after = params.as_ref().and_then(|p| p.get("compress_after").and_then(|v| v.as_str()))
114        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'compress_after' (e.g. '7 days')".into()))?;
115    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
116
117    let sql = format!(
118        "SELECT add_compression_policy('{}.{}', INTERVAL '{}')",
119        crate::validation::quote_ident(schema),
120        crate::validation::quote_ident(table),
121        compress_after
122    );
123
124    let rows = client.query(&sql, &[]).await?;
125    let job_id: i32 = rows[0].get(0);
126
127    Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
128}
129
130pub async fn compress_chunk(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
131    let chunk_name = params.as_ref().and_then(|p| p.get("chunk_name").and_then(|v| v.as_str()))
132        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'chunk_name'".into()))?;
133    let chunk_schema = params.as_ref().and_then(|p| p.get("chunk_schema").and_then(|v| v.as_str())).unwrap_or("_hyper");
134
135    let sql = format!("SELECT compress_chunk('{}.{}')", crate::validation::quote_ident(chunk_schema), crate::validation::quote_ident(chunk_name));
136    let rows = client.query(&sql, &[]).await?;
137    let result: String = rows[0].get(0);
138
139    Ok(json!({ "success": true, "chunk": format!("{}.{}", chunk_schema, chunk_name), "result": result }))
140}
141
142pub async fn add_continuous_aggregate(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
143    let name = params.as_ref().and_then(|p| p.get("name").and_then(|v| v.as_str()))
144        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'name'".into()))?;
145    let query = params.as_ref().and_then(|p| p.get("query").and_then(|v| v.as_str()))
146        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
147    let refresh_interval = params.as_ref().and_then(|p| p.get("refresh_interval").and_then(|v| v.as_str()));
148
149    let q_name = crate::validation::quote_ident(name);
150    let mut sql = format!("CREATE MATERIALIZED VIEW {q_name}");
151    sql.push_str("\nWITH (timescaledb.continuous) AS\n");
152    sql.push_str(query);
153    if let Some(ri) = refresh_interval {
154        sql.push_str(&format!("\nWITH DATA;\nSELECT add_continuous_aggregate_policy('{q_name}', INTERVAL '{ri}', INTERVAL '{ri}')"));
155    }
156
157    client.execute(&sql, &[]).await?;
158    Ok(json!({ "success": true, "name": name, "sql": sql }))
159}