1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5pub async fn create_hypertable(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
6 let table = params
7 .as_ref()
8 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
9 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
10 let time_column = params
11 .as_ref()
12 .and_then(|p| p.get("time_column").and_then(|v| v.as_str()))
13 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'time_column'".into()))?;
14 let schema = params
15 .as_ref()
16 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
17 .unwrap_or("public");
18 let chunk_time = params
19 .as_ref()
20 .and_then(|p| p.get("chunk_time_interval").and_then(|v| v.as_str()));
21
22 let mut sql = format!(
23 "SELECT create_hypertable('{}.{}', '{}'",
24 crate::validation::quote_ident(schema),
25 crate::validation::quote_ident(table),
26 time_column
27 );
28 if let Some(ct) = chunk_time {
29 sql.push_str(&format!(", chunk_time_interval => INTERVAL '{}'", ct));
30 }
31 sql.push(')');
32
33 let rows = client.query(&sql, &[]).await?;
34 let created: bool = rows[0].get(0);
35
36 Ok(json!({ "success": created, "hypertable": format!("{}.{}", schema, table), "sql": sql }))
37}
38
39pub async fn show_hypertable_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
40 let table = params
41 .as_ref()
42 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
43 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
44 let schema = params
45 .as_ref()
46 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
47 .unwrap_or("public");
48
49 let rows = client
50 .query(
51 "SELECT hypertable_name, hypertable_schema, owner,
52 num_dimensions, chunk_target_size,
53 compression_state, tablespaces
54 FROM timescaledb_information.hypertables
55 WHERE hypertable_name = $1 AND hypertable_schema = $2",
56 &[&table, &schema],
57 )
58 .await?;
59
60 if rows.is_empty() {
61 return Ok(json!({ "table": format!("{}.{}", schema, table), "is_hypertable": false }));
62 }
63
64 let row = &rows[0];
65 Ok(json!({
66 "table": row.get::<_, String>(0),
67 "schema": row.get::<_, String>(1),
68 "owner": row.get::<_, Option<String>>(2),
69 "dimensions": row.get::<_, Option<i32>>(3),
70 "chunk_target_size": row.get::<_, Option<String>>(4),
71 "compression": row.get::<_, Option<String>>(5),
72 "tablespaces": row.get::<_, Option<String>>(6),
73 "is_hypertable": true,
74 }))
75}
76
77pub async fn show_chunks(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
78 let table = params
79 .as_ref()
80 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
81 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
82 let schema = params
83 .as_ref()
84 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
85 .unwrap_or("public");
86
87 let rows = client
88 .query(
89 "SELECT chunk_name, chunk_schema, table_name, table_schema,
90 range_start::text, range_end::text, is_compressed::text,
91 disk_size::text
92 FROM timescaledb_information.chunks
93 WHERE table_name = $1 AND table_schema = $2
94 ORDER BY range_start",
95 &[&table, &schema],
96 )
97 .await?;
98
99 let chunks: Vec<Value> = rows
100 .iter()
101 .map(|row| {
102 json!({
103 "chunk_name": row.get::<_, String>(0),
104 "chunk_schema": row.get::<_, String>(1),
105 "range_start": row.get::<_, String>(3),
106 "range_end": row.get::<_, String>(4),
107 "compressed": row.get::<_, String>(5),
108 "disk_size": row.get::<_, Option<String>>(6),
109 })
110 })
111 .collect();
112
113 Ok(json!({ "chunks": chunks, "count": chunks.len() }))
114}
115
116pub async fn add_retention_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
117 let table = params
118 .as_ref()
119 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
120 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
121 let drop_after = params
122 .as_ref()
123 .and_then(|p| p.get("drop_after").and_then(|v| v.as_str()))
124 .ok_or_else(|| {
125 crate::errors::MCPError::InvalidParams("Missing 'drop_after' (e.g. '90 days')".into())
126 })?;
127 let schema = params
128 .as_ref()
129 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
130 .unwrap_or("public");
131
132 let sql = format!(
133 "SELECT add_retention_policy('{}.{}', INTERVAL '{}')",
134 crate::validation::quote_ident(schema),
135 crate::validation::quote_ident(table),
136 drop_after
137 );
138
139 let rows = client.query(&sql, &[]).await?;
140 let job_id: i32 = rows[0].get(0);
141
142 Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
143}
144
145pub async fn add_compression_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
146 let table = params
147 .as_ref()
148 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
149 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
150 let compress_after = params
151 .as_ref()
152 .and_then(|p| p.get("compress_after").and_then(|v| v.as_str()))
153 .ok_or_else(|| {
154 crate::errors::MCPError::InvalidParams(
155 "Missing 'compress_after' (e.g. '7 days')".into(),
156 )
157 })?;
158 let schema = params
159 .as_ref()
160 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
161 .unwrap_or("public");
162
163 let sql = format!(
164 "SELECT add_compression_policy('{}.{}', INTERVAL '{}')",
165 crate::validation::quote_ident(schema),
166 crate::validation::quote_ident(table),
167 compress_after
168 );
169
170 let rows = client.query(&sql, &[]).await?;
171 let job_id: i32 = rows[0].get(0);
172
173 Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
174}
175
176pub async fn compress_chunk(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
177 let chunk_name = params
178 .as_ref()
179 .and_then(|p| p.get("chunk_name").and_then(|v| v.as_str()))
180 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'chunk_name'".into()))?;
181 let chunk_schema = params
182 .as_ref()
183 .and_then(|p| p.get("chunk_schema").and_then(|v| v.as_str()))
184 .unwrap_or("_hyper");
185
186 let sql = format!(
187 "SELECT compress_chunk('{}.{}')",
188 crate::validation::quote_ident(chunk_schema),
189 crate::validation::quote_ident(chunk_name)
190 );
191 let rows = client.query(&sql, &[]).await?;
192 let result: String = rows[0].get(0);
193
194 Ok(
195 json!({ "success": true, "chunk": format!("{}.{}", chunk_schema, chunk_name), "result": result }),
196 )
197}
198
199pub async fn add_continuous_aggregate(
200 client: &Client,
201 params: &Option<&Value>,
202) -> MCPResult<Value> {
203 let name = params
204 .as_ref()
205 .and_then(|p| p.get("name").and_then(|v| v.as_str()))
206 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'name'".into()))?;
207 let query = params
208 .as_ref()
209 .and_then(|p| p.get("query").and_then(|v| v.as_str()))
210 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
211 let refresh_interval = params
212 .as_ref()
213 .and_then(|p| p.get("refresh_interval").and_then(|v| v.as_str()));
214
215 let q_name = crate::validation::quote_ident(name);
216 let mut sql = format!("CREATE MATERIALIZED VIEW {q_name}");
217 sql.push_str("\nWITH (timescaledb.continuous) AS\n");
218 sql.push_str(query);
219 if let Some(ri) = refresh_interval {
220 sql.push_str(&format!("\nWITH DATA;\nSELECT add_continuous_aggregate_policy('{q_name}', INTERVAL '{ri}', INTERVAL '{ri}')"));
221 }
222
223 client.execute(&sql, &[]).await?;
224 Ok(json!({ "success": true, "name": name, "sql": sql }))
225}