Skip to main content

mcp_environment/
server.rs

1use adk_mcp_sdk::{HealthCheck, HealthStatus};
2use std::collections::HashMap;
3use std::sync::Arc;
4use chrono::Utc;
5use rmcp::{handler::server::wrapper::Parameters, schemars, tool, tool_router};
6use serde::{Deserialize, Serialize};
7use tokio::sync::RwLock;
8use uuid::Uuid;
9
10use crate::types::*;
11
12#[derive(Clone)]
13pub struct EnvironmentServer {
14    envs: Arc<RwLock<HashMap<String, Environment>>>,
15    deploys: Arc<RwLock<Vec<Deployment>>>,
16}
17
18impl EnvironmentServer {
19    pub fn new() -> Self {
20        let mut envs = HashMap::new();
21        let now = Utc::now();
22        for (id, name, env_type, region) in [
23            ("env_dev", "Development", EnvironmentType::Development, "local"),
24            ("env_staging", "Staging", EnvironmentType::Staging, "us-east-1"),
25            ("env_prod", "Production", EnvironmentType::Production, "us-east-1"),
26            ("env_eu_prod", "EU Production", EnvironmentType::Regional, "eu-west-1"),
27        ] {
28            envs.insert(id.into(), Environment {
29                environment_id: id.into(), name: name.into(), env_type, region: region.into(),
30                status: EnvironmentStatus::Healthy, runtime_version: "adk-rust-0.12.4".into(),
31                active_release_id: Some("rel_2026_0518_1842".into()), config_version: 1,
32                config: serde_json::json!({"model_route": "gemini-3.1-flash", "memory_backend": "redis", "payment_mode": "production"}),
33                worker_pools: vec![
34                    WorkerPool { pool_id: format!("{}_model", id), pool_type: PoolType::Model, min_capacity: 2, desired_capacity: 4, max_capacity: 20, current_capacity: 4, status: "healthy".into() },
35                    WorkerPool { pool_id: format!("{}_graph", id), pool_type: PoolType::Graph, min_capacity: 1, desired_capacity: 2, max_capacity: 10, current_capacity: 2, status: "healthy".into() },
36                    WorkerPool { pool_id: format!("{}_browser", id), pool_type: PoolType::Browser, min_capacity: 2, desired_capacity: 5, max_capacity: 50, current_capacity: 5, status: "healthy".into() },
37                    WorkerPool { pool_id: format!("{}_payment", id), pool_type: PoolType::Payment, min_capacity: 1, desired_capacity: 2, max_capacity: 8, current_capacity: 2, status: "healthy".into() },
38                ],
39                credential_refs: vec!["credref_anthropic_prod".into(), "credref_payments_signing_key".into()],
40                protocol_bindings: serde_json::json!({"a2a": "enabled", "mcp": "enabled", "acp": "enabled"}),
41                updated_at: now,
42            });
43        }
44        Self { envs: Arc::new(RwLock::new(envs)), deploys: Arc::new(RwLock::new(Vec::new())) }
45    }
46}
47
48// --- Inputs ---
49#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
50pub struct ListEnvironmentsInput { #[serde(default)] pub status_filter: Option<EnvironmentStatus> }
51
52#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
53pub struct GetEnvironmentInput { pub environment_id: String }
54
55#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
56pub struct SyncConfigInput { pub environment_id: String, #[serde(default)] pub config: Option<serde_json::Value>, #[serde(default)] pub mode: Option<String> }
57
58#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
59pub struct ValidateEnvironmentInput { pub environment_id: String, #[serde(default)] pub scope: Option<String> }
60
61#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
62pub struct ScaleWorkerPoolInput { pub environment_id: String, pub pool_type: PoolType, pub desired_capacity: u32, #[serde(default)] pub reason: Option<String> }
63
64#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
65pub struct PromoteBuildInput { pub release_bundle_id: String, pub target_environment_id: String, pub promoted_by: String, #[serde(default)] pub strategy: Option<String> }
66
67#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
68pub struct RollbackDeployInput { pub environment_id: String, pub reason: String, #[serde(default)] pub reason_code: Option<String> }
69
70#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
71pub struct GetDeployHistoryInput { pub environment_id: String, #[serde(default)] pub limit: Option<usize> }
72
73#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
74pub struct DetectConfigDriftInput { pub environment_id: String }
75
76#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
77pub struct GetWorkerPoolStatusInput { pub environment_id: String }
78
79#[tool_router(server_handler)]
80impl EnvironmentServer {
81    #[tool(description = "Show dev, staging, production, EU production, tenant envs")]
82    async fn list_environments(&self, Parameters(i): Parameters<ListEnvironmentsInput>) -> String {
83        let envs = self.envs.read().await;
84        let summary: Vec<_> = envs.values()
85            .filter(|e| i.status_filter.as_ref().is_none_or(|s| &e.status == s))
86            .map(|e| {
87                let workers: String = e.worker_pools.iter().map(|w| format!("{:?}:{}/{}", w.pool_type, w.current_capacity, w.desired_capacity)).collect::<Vec<_>>().join(", ");
88                serde_json::json!({
89                    "environment_id": e.environment_id, "name": e.name, "type": e.env_type,
90                    "region": e.region, "status": e.status, "runtime_version": e.runtime_version,
91                    "active_release_id": e.active_release_id, "workers": workers,
92                })
93            }).collect();
94        serde_json::to_string_pretty(&summary).unwrap()
95    }
96
97    #[tool(description = "Inspect selected environment configuration")]
98    async fn get_environment(&self, Parameters(i): Parameters<GetEnvironmentInput>) -> String {
99        match self.envs.read().await.get(&i.environment_id) {
100            Some(env) => serde_json::to_string_pretty(env).unwrap(),
101            None => format!("Environment not found: {}", i.environment_id),
102        }
103    }
104
105    #[tool(description = "Sync secrets, policies, providers, memory, MCP bindings (plan/apply)")]
106    async fn sync_environment_config(&self, Parameters(i): Parameters<SyncConfigInput>) -> String {
107        let mode = i.mode.unwrap_or_else(|| "plan".into());
108        let mut envs = self.envs.write().await;
109        match envs.get_mut(&i.environment_id) {
110            Some(env) => {
111                if mode == "apply" {
112                    if let Some(config) = i.config {
113                        env.config = config;
114                        env.config_version += 1;
115                        env.updated_at = Utc::now();
116                    }
117                    serde_json::to_string_pretty(&serde_json::json!({"environment_id": i.environment_id, "mode": "apply", "config_version": env.config_version, "status": "applied"})).unwrap()
118                } else {
119                    serde_json::to_string_pretty(&serde_json::json!({"environment_id": i.environment_id, "mode": "plan", "current_config_version": env.config_version, "changes_pending": i.config.is_some(), "requires_approval": env.env_type == EnvironmentType::Production})).unwrap()
120                }
121            }
122            None => format!("Environment not found: {}", i.environment_id),
123        }
124    }
125
126    #[tool(description = "Run provider, memory, protocol, payment, and policy checks")]
127    async fn validate_environment(&self, Parameters(i): Parameters<ValidateEnvironmentInput>) -> String {
128        let envs = self.envs.read().await;
129        match envs.get(&i.environment_id) {
130            Some(env) => {
131                let workers_healthy = env.worker_pools.iter().all(|w| w.current_capacity >= w.min_capacity);
132                let checks = vec![
133                    Check { name: "provider_connectivity".into(), status: "passing".into(), message: "Model routes reachable".into() },
134                    Check { name: "memory_backend".into(), status: "passing".into(), message: "Redis/Postgres connected".into() },
135                    Check { name: "worker_health".into(), status: if workers_healthy { "passing" } else { "failing" }.into(), message: format!("{} pools", env.worker_pools.len()) },
136                    Check { name: "protocol_bindings".into(), status: "passing".into(), message: "A2A/MCP/ACP enabled".into() },
137                    Check { name: "credential_bindings".into(), status: "passing".into(), message: format!("{} refs valid", env.credential_refs.len()) },
138                    Check { name: "config_schema".into(), status: "passing".into(), message: "Configuration valid".into() },
139                    Check { name: "rollback_readiness".into(), status: "passing".into(), message: "Previous release available".into() },
140                ];
141                let passing = checks.iter().filter(|c| c.status == "passing").count() as u32;
142                let failing = checks.iter().filter(|c| c.status == "failing").count() as u32;
143                let review = checks.iter().filter(|c| c.status == "review").count() as u32;
144                let result = ValidationResult {
145                    validation_id: format!("val_{}", Uuid::new_v4().simple()),
146                    environment_id: i.environment_id, scope: i.scope.unwrap_or_else(|| "full".into()),
147                    checks, passing, failing, review,
148                    promotion_allowed: failing == 0, validated_at: Utc::now(),
149                };
150                serde_json::to_string_pretty(&result).unwrap()
151            }
152            None => format!("Environment not found: {}", i.environment_id),
153        }
154    }
155
156    #[tool(description = "Scale model, graph, browser, code, realtime, payment workers")]
157    async fn scale_worker_pool(&self, Parameters(i): Parameters<ScaleWorkerPoolInput>) -> String {
158        let mut envs = self.envs.write().await;
159        match envs.get_mut(&i.environment_id) {
160            Some(env) => {
161                if let Some(pool) = env.worker_pools.iter_mut().find(|p| p.pool_type == i.pool_type) {
162                    let prev = pool.desired_capacity;
163                    pool.desired_capacity = i.desired_capacity.clamp(pool.min_capacity, pool.max_capacity);
164                    pool.current_capacity = pool.desired_capacity;
165                    env.updated_at = Utc::now();
166                    serde_json::to_string_pretty(&serde_json::json!({
167                        "environment_id": i.environment_id, "pool_type": i.pool_type,
168                        "previous_capacity": prev, "target_capacity": pool.desired_capacity,
169                        "reason": i.reason.unwrap_or_default(), "status": "scaled"
170                    })).unwrap()
171                } else {
172                    format!("Pool type {:?} not found", i.pool_type)
173                }
174            }
175            None => format!("Environment not found: {}", i.environment_id),
176        }
177    }
178
179    #[tool(description = "Promote immutable release bundle to target environment")]
180    async fn promote_build(&self, Parameters(i): Parameters<PromoteBuildInput>) -> String {
181        let mut envs = self.envs.write().await;
182        match envs.get_mut(&i.target_environment_id) {
183            Some(env) => {
184                let prev = env.active_release_id.clone();
185                env.active_release_id = Some(i.release_bundle_id.clone());
186                env.updated_at = Utc::now();
187                let deploy = Deployment {
188                    deployment_id: format!("deploy_{}", Uuid::new_v4().simple()),
189                    environment_id: i.target_environment_id.clone(),
190                    release_bundle_id: i.release_bundle_id, previous_release_id: prev,
191                    strategy: i.strategy.unwrap_or_else(|| "immediate".into()),
192                    status: DeployStatus::Succeeded, promoted_by: i.promoted_by,
193                    validation_id: None, approval_id: None,
194                    created_at: Utc::now(), completed_at: Some(Utc::now()),
195                };
196                drop(envs);
197                self.deploys.write().await.push(deploy.clone());
198                serde_json::to_string_pretty(&deploy).unwrap()
199            }
200            None => format!("Environment not found: {}", i.target_environment_id),
201        }
202    }
203
204    #[tool(description = "Roll back to previous release bundle")]
205    async fn rollback_deploy(&self, Parameters(i): Parameters<RollbackDeployInput>) -> String {
206        let mut envs = self.envs.write().await;
207        match envs.get_mut(&i.environment_id) {
208            Some(env) => {
209                let deploys = self.deploys.read().await;
210                let prev = deploys.iter().rev()
211                    .filter(|d| d.environment_id == i.environment_id && d.status == DeployStatus::Succeeded)
212                    .nth(1);
213                match prev {
214                    Some(prev_deploy) => {
215                        let from = env.active_release_id.clone();
216                        env.active_release_id = Some(prev_deploy.release_bundle_id.clone());
217                        env.updated_at = Utc::now();
218                        serde_json::to_string_pretty(&serde_json::json!({
219                            "rollback_id": format!("rb_{}", Uuid::new_v4().simple()),
220                            "environment_id": i.environment_id,
221                            "from_release": from, "to_release": prev_deploy.release_bundle_id,
222                            "reason": i.reason, "reason_code": i.reason_code,
223                            "status": "rolled_back"
224                        })).unwrap()
225                    }
226                    None => "No previous deployment to roll back to".into(),
227                }
228            }
229            None => format!("Environment not found: {}", i.environment_id),
230        }
231    }
232
233    #[tool(description = "Inspect release events and promotion evidence")]
234    async fn get_deploy_history(&self, Parameters(i): Parameters<GetDeployHistoryInput>) -> String {
235        let deploys = self.deploys.read().await;
236        let history: Vec<_> = deploys.iter().rev()
237            .filter(|d| d.environment_id == i.environment_id)
238            .take(i.limit.unwrap_or(10)).cloned().collect();
239        serde_json::to_string_pretty(&history).unwrap()
240    }
241
242    #[tool(description = "Detect configuration drift from expected state")]
243    async fn detect_config_drift(&self, Parameters(i): Parameters<DetectConfigDriftInput>) -> String {
244        let envs = self.envs.read().await;
245        match envs.get(&i.environment_id) {
246            Some(env) => {
247                // Simulate drift detection
248                let mut items = Vec::new();
249                if env.config_version > 1 {
250                    items.push(DriftItem { target: "config_version".into(), expected: "1".into(), actual: env.config_version.to_string(), severity: "low".into() });
251                }
252                let report = DriftReport {
253                    drift_id: format!("drift_{}", Uuid::new_v4().simple()),
254                    environment_id: i.environment_id,
255                    total_drifted: items.len() as u32,
256                    drifted_items: items,
257                    detected_at: Utc::now(),
258                };
259                serde_json::to_string_pretty(&report).unwrap()
260            }
261            None => format!("Environment not found: {}", i.environment_id),
262        }
263    }
264
265    #[tool(description = "Get detailed worker pool status for an environment")]
266    async fn get_worker_pool_status(&self, Parameters(i): Parameters<GetWorkerPoolStatusInput>) -> String {
267        let envs = self.envs.read().await;
268        match envs.get(&i.environment_id) {
269            Some(env) => {
270                let pools: Vec<_> = env.worker_pools.iter().map(|p| serde_json::json!({
271                    "pool_id": p.pool_id, "pool_type": p.pool_type,
272                    "min": p.min_capacity, "desired": p.desired_capacity,
273                    "max": p.max_capacity, "current": p.current_capacity,
274                    "status": p.status,
275                    "utilization": format!("{}%", (p.current_capacity as f64 / p.desired_capacity.max(1) as f64 * 100.0) as u32),
276                })).collect();
277                serde_json::to_string_pretty(&serde_json::json!({
278                    "environment_id": i.environment_id, "pools": pools, "total_pools": pools.len()
279                })).unwrap()
280            }
281            None => format!("Environment not found: {}", i.environment_id),
282        }
283    }
284}
285
286#[async_trait::async_trait]
287impl HealthCheck for EnvironmentServer {
288    async fn check_health(&self) -> HealthStatus {
289        HealthStatus {
290            healthy: true,
291            message: Some("operational".into()),
292            latency_ms: Some(1),
293        }
294    }
295}