1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn create_hypertable(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
6 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
7 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
8 let time_column = params.as_ref().and_then(|p| p.get("time_column").and_then(|v| v.as_str()))
9 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'time_column'".into()))?;
10 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
11 let chunk_time = params.as_ref().and_then(|p| p.get("chunk_time_interval").and_then(|v| v.as_str()));
12
13 let mut sql = format!("SELECT create_hypertable('{}.{}', '{}'",
14 crate::validation::quote_ident(schema), crate::validation::quote_ident(table), time_column);
15 if let Some(ct) = chunk_time {
16 sql.push_str(&format!(", chunk_time_interval => INTERVAL '{}'", ct));
17 }
18 sql.push(')');
19
20 let rows = client.query(&sql, &[]).await?;
21 let created: bool = rows[0].get(0);
22
23 Ok(json!({ "success": created, "hypertable": format!("{}.{}", schema, table), "sql": sql }))
24}
25
26pub async fn show_hypertable_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
27 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
28 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
29 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
30
31 let rows = client
32 .query(
33 "SELECT hypertable_name, hypertable_schema, owner,
34 num_dimensions, chunk_target_size,
35 compression_state, tablespaces
36 FROM timescaledb_information.hypertables
37 WHERE hypertable_name = $1 AND hypertable_schema = $2",
38 &[&table, &schema],
39 )
40 .await?;
41
42 if rows.is_empty() {
43 return Ok(json!({ "table": format!("{}.{}", schema, table), "is_hypertable": false }));
44 }
45
46 let row = &rows[0];
47 Ok(json!({
48 "table": row.get::<_, String>(0),
49 "schema": row.get::<_, String>(1),
50 "owner": row.get::<_, Option<String>>(2),
51 "dimensions": row.get::<_, Option<i32>>(3),
52 "chunk_target_size": row.get::<_, Option<String>>(4),
53 "compression": row.get::<_, Option<String>>(5),
54 "tablespaces": row.get::<_, Option<String>>(6),
55 "is_hypertable": true,
56 }))
57}
58
59pub async fn show_chunks(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
60 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
61 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
62 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
63
64 let rows = client
65 .query(
66 "SELECT chunk_name, chunk_schema, table_name, table_schema,
67 range_start::text, range_end::text, is_compressed::text,
68 disk_size::text
69 FROM timescaledb_information.chunks
70 WHERE table_name = $1 AND table_schema = $2
71 ORDER BY range_start",
72 &[&table, &schema],
73 )
74 .await?;
75
76 let chunks: Vec<Value> = rows.iter().map(|row| {
77 json!({
78 "chunk_name": row.get::<_, String>(0),
79 "chunk_schema": row.get::<_, String>(1),
80 "range_start": row.get::<_, String>(3),
81 "range_end": row.get::<_, String>(4),
82 "compressed": row.get::<_, String>(5),
83 "disk_size": row.get::<_, Option<String>>(6),
84 })
85 }).collect();
86
87 Ok(json!({ "chunks": chunks, "count": chunks.len() }))
88}
89
90pub async fn add_retention_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
91 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
92 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
93 let drop_after = params.as_ref().and_then(|p| p.get("drop_after").and_then(|v| v.as_str()))
94 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'drop_after' (e.g. '90 days')".into()))?;
95 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
96
97 let sql = format!(
98 "SELECT add_retention_policy('{}.{}', INTERVAL '{}')",
99 crate::validation::quote_ident(schema),
100 crate::validation::quote_ident(table),
101 drop_after
102 );
103
104 let rows = client.query(&sql, &[]).await?;
105 let job_id: i32 = rows[0].get(0);
106
107 Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
108}
109
110pub async fn add_compression_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
111 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
112 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
113 let compress_after = params.as_ref().and_then(|p| p.get("compress_after").and_then(|v| v.as_str()))
114 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'compress_after' (e.g. '7 days')".into()))?;
115 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
116
117 let sql = format!(
118 "SELECT add_compression_policy('{}.{}', INTERVAL '{}')",
119 crate::validation::quote_ident(schema),
120 crate::validation::quote_ident(table),
121 compress_after
122 );
123
124 let rows = client.query(&sql, &[]).await?;
125 let job_id: i32 = rows[0].get(0);
126
127 Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
128}
129
130pub async fn compress_chunk(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
131 let chunk_name = params.as_ref().and_then(|p| p.get("chunk_name").and_then(|v| v.as_str()))
132 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'chunk_name'".into()))?;
133 let chunk_schema = params.as_ref().and_then(|p| p.get("chunk_schema").and_then(|v| v.as_str())).unwrap_or("_hyper");
134
135 let sql = format!("SELECT compress_chunk('{}.{}')", crate::validation::quote_ident(chunk_schema), crate::validation::quote_ident(chunk_name));
136 let rows = client.query(&sql, &[]).await?;
137 let result: String = rows[0].get(0);
138
139 Ok(json!({ "success": true, "chunk": format!("{}.{}", chunk_schema, chunk_name), "result": result }))
140}
141
142pub async fn add_continuous_aggregate(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
143 let name = params.as_ref().and_then(|p| p.get("name").and_then(|v| v.as_str()))
144 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'name'".into()))?;
145 let query = params.as_ref().and_then(|p| p.get("query").and_then(|v| v.as_str()))
146 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
147 let refresh_interval = params.as_ref().and_then(|p| p.get("refresh_interval").and_then(|v| v.as_str()));
148
149 let q_name = crate::validation::quote_ident(name);
150 let mut sql = format!("CREATE MATERIALIZED VIEW {q_name}");
151 sql.push_str("\nWITH (timescaledb.continuous) AS\n");
152 sql.push_str(query);
153 if let Some(ri) = refresh_interval {
154 sql.push_str(&format!("\nWITH DATA;\nSELECT add_continuous_aggregate_policy('{q_name}', INTERVAL '{ri}', INTERVAL '{ri}')"));
155 }
156
157 client.execute(&sql, &[]).await?;
158 Ok(json!({ "success": true, "name": name, "sql": sql }))
159}