Skip to main content

mcp_postgres/actions/
timescaledb.rs

1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5pub async fn create_hypertable(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
6    let table = params
7        .as_ref()
8        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
9        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
10    let time_column = params
11        .as_ref()
12        .and_then(|p| p.get("time_column").and_then(|v| v.as_str()))
13        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'time_column'".into()))?;
14    let schema = params
15        .as_ref()
16        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
17        .unwrap_or("public");
18    let chunk_time = params
19        .as_ref()
20        .and_then(|p| p.get("chunk_time_interval").and_then(|v| v.as_str()));
21
22    let mut sql = format!(
23        "SELECT create_hypertable('{}.{}', '{}'",
24        crate::validation::quote_ident(schema),
25        crate::validation::quote_ident(table),
26        time_column
27    );
28    if let Some(ct) = chunk_time {
29        sql.push_str(&format!(", chunk_time_interval => INTERVAL '{}'", ct));
30    }
31    sql.push(')');
32
33    let rows = client.query(&sql, &[]).await?;
34    let created: bool = rows[0].get(0);
35
36    Ok(json!({ "success": created, "hypertable": format!("{}.{}", schema, table), "sql": sql }))
37}
38
39pub async fn show_hypertable_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
40    let table = params
41        .as_ref()
42        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
43        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
44    let schema = params
45        .as_ref()
46        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
47        .unwrap_or("public");
48
49    let rows = client
50        .query(
51            "SELECT hypertable_name, hypertable_schema, owner,
52                    num_dimensions, chunk_target_size,
53                    compression_state, tablespaces
54             FROM timescaledb_information.hypertables
55             WHERE hypertable_name = $1 AND hypertable_schema = $2",
56            &[&table, &schema],
57        )
58        .await?;
59
60    if rows.is_empty() {
61        return Ok(json!({ "table": format!("{}.{}", schema, table), "is_hypertable": false }));
62    }
63
64    let row = &rows[0];
65    Ok(json!({
66        "table": row.get::<_, String>(0),
67        "schema": row.get::<_, String>(1),
68        "owner": row.get::<_, Option<String>>(2),
69        "dimensions": row.get::<_, Option<i32>>(3),
70        "chunk_target_size": row.get::<_, Option<String>>(4),
71        "compression": row.get::<_, Option<String>>(5),
72        "tablespaces": row.get::<_, Option<String>>(6),
73        "is_hypertable": true,
74    }))
75}
76
77pub async fn show_chunks(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
78    let table = params
79        .as_ref()
80        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
81        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
82    let schema = params
83        .as_ref()
84        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
85        .unwrap_or("public");
86
87    let rows = client
88        .query(
89            "SELECT chunk_name, chunk_schema, table_name, table_schema,
90                    range_start::text, range_end::text, is_compressed::text,
91                    disk_size::text
92             FROM timescaledb_information.chunks
93             WHERE table_name = $1 AND table_schema = $2
94             ORDER BY range_start",
95            &[&table, &schema],
96        )
97        .await?;
98
99    let chunks: Vec<Value> = rows
100        .iter()
101        .map(|row| {
102            json!({
103                "chunk_name": row.get::<_, String>(0),
104                "chunk_schema": row.get::<_, String>(1),
105                "range_start": row.get::<_, String>(3),
106                "range_end": row.get::<_, String>(4),
107                "compressed": row.get::<_, String>(5),
108                "disk_size": row.get::<_, Option<String>>(6),
109            })
110        })
111        .collect();
112
113    Ok(json!({ "chunks": chunks, "count": chunks.len() }))
114}
115
116pub async fn add_retention_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
117    let table = params
118        .as_ref()
119        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
120        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
121    let drop_after = params
122        .as_ref()
123        .and_then(|p| p.get("drop_after").and_then(|v| v.as_str()))
124        .ok_or_else(|| {
125            crate::errors::MCPError::InvalidParams("Missing 'drop_after' (e.g. '90 days')".into())
126        })?;
127    let schema = params
128        .as_ref()
129        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
130        .unwrap_or("public");
131
132    let sql = format!(
133        "SELECT add_retention_policy('{}.{}', INTERVAL '{}')",
134        crate::validation::quote_ident(schema),
135        crate::validation::quote_ident(table),
136        drop_after
137    );
138
139    let rows = client.query(&sql, &[]).await?;
140    let job_id: i32 = rows[0].get(0);
141
142    Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
143}
144
145pub async fn add_compression_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
146    let table = params
147        .as_ref()
148        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
149        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
150    let compress_after = params
151        .as_ref()
152        .and_then(|p| p.get("compress_after").and_then(|v| v.as_str()))
153        .ok_or_else(|| {
154            crate::errors::MCPError::InvalidParams(
155                "Missing 'compress_after' (e.g. '7 days')".into(),
156            )
157        })?;
158    let schema = params
159        .as_ref()
160        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
161        .unwrap_or("public");
162
163    let sql = format!(
164        "SELECT add_compression_policy('{}.{}', INTERVAL '{}')",
165        crate::validation::quote_ident(schema),
166        crate::validation::quote_ident(table),
167        compress_after
168    );
169
170    let rows = client.query(&sql, &[]).await?;
171    let job_id: i32 = rows[0].get(0);
172
173    Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
174}
175
176pub async fn compress_chunk(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
177    let chunk_name = params
178        .as_ref()
179        .and_then(|p| p.get("chunk_name").and_then(|v| v.as_str()))
180        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'chunk_name'".into()))?;
181    let chunk_schema = params
182        .as_ref()
183        .and_then(|p| p.get("chunk_schema").and_then(|v| v.as_str()))
184        .unwrap_or("_hyper");
185
186    let sql = format!(
187        "SELECT compress_chunk('{}.{}')",
188        crate::validation::quote_ident(chunk_schema),
189        crate::validation::quote_ident(chunk_name)
190    );
191    let rows = client.query(&sql, &[]).await?;
192    let result: String = rows[0].get(0);
193
194    Ok(
195        json!({ "success": true, "chunk": format!("{}.{}", chunk_schema, chunk_name), "result": result }),
196    )
197}
198
199pub async fn add_continuous_aggregate(
200    client: &Client,
201    params: &Option<&Value>,
202) -> MCPResult<Value> {
203    let name = params
204        .as_ref()
205        .and_then(|p| p.get("name").and_then(|v| v.as_str()))
206        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'name'".into()))?;
207    let query = params
208        .as_ref()
209        .and_then(|p| p.get("query").and_then(|v| v.as_str()))
210        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
211    let refresh_interval = params
212        .as_ref()
213        .and_then(|p| p.get("refresh_interval").and_then(|v| v.as_str()));
214
215    let q_name = crate::validation::quote_ident(name);
216    let mut sql = format!("CREATE MATERIALIZED VIEW {q_name}");
217    sql.push_str("\nWITH (timescaledb.continuous) AS\n");
218    sql.push_str(query);
219    if let Some(ri) = refresh_interval {
220        sql.push_str(&format!("\nWITH DATA;\nSELECT add_continuous_aggregate_policy('{q_name}', INTERVAL '{ri}', INTERVAL '{ri}')"));
221    }
222
223    client.execute(&sql, &[]).await?;
224    Ok(json!({ "success": true, "name": name, "sql": sql }))
225}