1use adk_mcp_sdk::{HealthCheck, HealthStatus};
2use std::collections::HashMap;
3use std::sync::Arc;
4use chrono::Utc;
5use rmcp::{handler::server::wrapper::Parameters, schemars, tool, tool_router};
6use serde::{Deserialize, Serialize};
7use tokio::sync::RwLock;
8use uuid::Uuid;
9
10use crate::types::*;
11
12#[derive(Clone)]
13pub struct EnvironmentServer {
14 envs: Arc<RwLock<HashMap<String, Environment>>>,
15 deploys: Arc<RwLock<Vec<Deployment>>>,
16}
17
18impl EnvironmentServer {
19 pub fn new() -> Self {
20 let mut envs = HashMap::new();
21 let now = Utc::now();
22 for (id, name, env_type, region) in [
23 ("env_dev", "Development", EnvironmentType::Development, "local"),
24 ("env_staging", "Staging", EnvironmentType::Staging, "us-east-1"),
25 ("env_prod", "Production", EnvironmentType::Production, "us-east-1"),
26 ("env_eu_prod", "EU Production", EnvironmentType::Regional, "eu-west-1"),
27 ] {
28 envs.insert(id.into(), Environment {
29 environment_id: id.into(), name: name.into(), env_type, region: region.into(),
30 status: EnvironmentStatus::Healthy, runtime_version: "adk-rust-0.12.4".into(),
31 active_release_id: Some("rel_2026_0518_1842".into()), config_version: 1,
32 config: serde_json::json!({"model_route": "gemini-3.1-flash", "memory_backend": "redis", "payment_mode": "production"}),
33 worker_pools: vec![
34 WorkerPool { pool_id: format!("{}_model", id), pool_type: PoolType::Model, min_capacity: 2, desired_capacity: 4, max_capacity: 20, current_capacity: 4, status: "healthy".into() },
35 WorkerPool { pool_id: format!("{}_graph", id), pool_type: PoolType::Graph, min_capacity: 1, desired_capacity: 2, max_capacity: 10, current_capacity: 2, status: "healthy".into() },
36 WorkerPool { pool_id: format!("{}_browser", id), pool_type: PoolType::Browser, min_capacity: 2, desired_capacity: 5, max_capacity: 50, current_capacity: 5, status: "healthy".into() },
37 WorkerPool { pool_id: format!("{}_payment", id), pool_type: PoolType::Payment, min_capacity: 1, desired_capacity: 2, max_capacity: 8, current_capacity: 2, status: "healthy".into() },
38 ],
39 credential_refs: vec!["credref_anthropic_prod".into(), "credref_payments_signing_key".into()],
40 protocol_bindings: serde_json::json!({"a2a": "enabled", "mcp": "enabled", "acp": "enabled"}),
41 updated_at: now,
42 });
43 }
44 Self { envs: Arc::new(RwLock::new(envs)), deploys: Arc::new(RwLock::new(Vec::new())) }
45 }
46}
47
48#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
50pub struct ListEnvironmentsInput { #[serde(default)] pub status_filter: Option<EnvironmentStatus> }
51
52#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
53pub struct GetEnvironmentInput { pub environment_id: String }
54
55#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
56pub struct SyncConfigInput { pub environment_id: String, #[serde(default)] pub config: Option<serde_json::Value>, #[serde(default)] pub mode: Option<String> }
57
58#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
59pub struct ValidateEnvironmentInput { pub environment_id: String, #[serde(default)] pub scope: Option<String> }
60
61#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
62pub struct ScaleWorkerPoolInput { pub environment_id: String, pub pool_type: PoolType, pub desired_capacity: u32, #[serde(default)] pub reason: Option<String> }
63
64#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
65pub struct PromoteBuildInput { pub release_bundle_id: String, pub target_environment_id: String, pub promoted_by: String, #[serde(default)] pub strategy: Option<String> }
66
67#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
68pub struct RollbackDeployInput { pub environment_id: String, pub reason: String, #[serde(default)] pub reason_code: Option<String> }
69
70#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
71pub struct GetDeployHistoryInput { pub environment_id: String, #[serde(default)] pub limit: Option<usize> }
72
73#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
74pub struct DetectConfigDriftInput { pub environment_id: String }
75
76#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
77pub struct GetWorkerPoolStatusInput { pub environment_id: String }
78
79#[tool_router(server_handler)]
80impl EnvironmentServer {
81 #[tool(description = "Show dev, staging, production, EU production, tenant envs")]
82 async fn list_environments(&self, Parameters(i): Parameters<ListEnvironmentsInput>) -> String {
83 let envs = self.envs.read().await;
84 let summary: Vec<_> = envs.values()
85 .filter(|e| i.status_filter.as_ref().is_none_or(|s| &e.status == s))
86 .map(|e| {
87 let workers: String = e.worker_pools.iter().map(|w| format!("{:?}:{}/{}", w.pool_type, w.current_capacity, w.desired_capacity)).collect::<Vec<_>>().join(", ");
88 serde_json::json!({
89 "environment_id": e.environment_id, "name": e.name, "type": e.env_type,
90 "region": e.region, "status": e.status, "runtime_version": e.runtime_version,
91 "active_release_id": e.active_release_id, "workers": workers,
92 })
93 }).collect();
94 serde_json::to_string_pretty(&summary).unwrap()
95 }
96
97 #[tool(description = "Inspect selected environment configuration")]
98 async fn get_environment(&self, Parameters(i): Parameters<GetEnvironmentInput>) -> String {
99 match self.envs.read().await.get(&i.environment_id) {
100 Some(env) => serde_json::to_string_pretty(env).unwrap(),
101 None => format!("Environment not found: {}", i.environment_id),
102 }
103 }
104
105 #[tool(description = "Sync secrets, policies, providers, memory, MCP bindings (plan/apply)")]
106 async fn sync_environment_config(&self, Parameters(i): Parameters<SyncConfigInput>) -> String {
107 let mode = i.mode.unwrap_or_else(|| "plan".into());
108 let mut envs = self.envs.write().await;
109 match envs.get_mut(&i.environment_id) {
110 Some(env) => {
111 if mode == "apply" {
112 if let Some(config) = i.config {
113 env.config = config;
114 env.config_version += 1;
115 env.updated_at = Utc::now();
116 }
117 serde_json::to_string_pretty(&serde_json::json!({"environment_id": i.environment_id, "mode": "apply", "config_version": env.config_version, "status": "applied"})).unwrap()
118 } else {
119 serde_json::to_string_pretty(&serde_json::json!({"environment_id": i.environment_id, "mode": "plan", "current_config_version": env.config_version, "changes_pending": i.config.is_some(), "requires_approval": env.env_type == EnvironmentType::Production})).unwrap()
120 }
121 }
122 None => format!("Environment not found: {}", i.environment_id),
123 }
124 }
125
126 #[tool(description = "Run provider, memory, protocol, payment, and policy checks")]
127 async fn validate_environment(&self, Parameters(i): Parameters<ValidateEnvironmentInput>) -> String {
128 let envs = self.envs.read().await;
129 match envs.get(&i.environment_id) {
130 Some(env) => {
131 let workers_healthy = env.worker_pools.iter().all(|w| w.current_capacity >= w.min_capacity);
132 let checks = vec![
133 Check { name: "provider_connectivity".into(), status: "passing".into(), message: "Model routes reachable".into() },
134 Check { name: "memory_backend".into(), status: "passing".into(), message: "Redis/Postgres connected".into() },
135 Check { name: "worker_health".into(), status: if workers_healthy { "passing" } else { "failing" }.into(), message: format!("{} pools", env.worker_pools.len()) },
136 Check { name: "protocol_bindings".into(), status: "passing".into(), message: "A2A/MCP/ACP enabled".into() },
137 Check { name: "credential_bindings".into(), status: "passing".into(), message: format!("{} refs valid", env.credential_refs.len()) },
138 Check { name: "config_schema".into(), status: "passing".into(), message: "Configuration valid".into() },
139 Check { name: "rollback_readiness".into(), status: "passing".into(), message: "Previous release available".into() },
140 ];
141 let passing = checks.iter().filter(|c| c.status == "passing").count() as u32;
142 let failing = checks.iter().filter(|c| c.status == "failing").count() as u32;
143 let review = checks.iter().filter(|c| c.status == "review").count() as u32;
144 let result = ValidationResult {
145 validation_id: format!("val_{}", Uuid::new_v4().simple()),
146 environment_id: i.environment_id, scope: i.scope.unwrap_or_else(|| "full".into()),
147 checks, passing, failing, review,
148 promotion_allowed: failing == 0, validated_at: Utc::now(),
149 };
150 serde_json::to_string_pretty(&result).unwrap()
151 }
152 None => format!("Environment not found: {}", i.environment_id),
153 }
154 }
155
156 #[tool(description = "Scale model, graph, browser, code, realtime, payment workers")]
157 async fn scale_worker_pool(&self, Parameters(i): Parameters<ScaleWorkerPoolInput>) -> String {
158 let mut envs = self.envs.write().await;
159 match envs.get_mut(&i.environment_id) {
160 Some(env) => {
161 if let Some(pool) = env.worker_pools.iter_mut().find(|p| p.pool_type == i.pool_type) {
162 let prev = pool.desired_capacity;
163 pool.desired_capacity = i.desired_capacity.clamp(pool.min_capacity, pool.max_capacity);
164 pool.current_capacity = pool.desired_capacity;
165 env.updated_at = Utc::now();
166 serde_json::to_string_pretty(&serde_json::json!({
167 "environment_id": i.environment_id, "pool_type": i.pool_type,
168 "previous_capacity": prev, "target_capacity": pool.desired_capacity,
169 "reason": i.reason.unwrap_or_default(), "status": "scaled"
170 })).unwrap()
171 } else {
172 format!("Pool type {:?} not found", i.pool_type)
173 }
174 }
175 None => format!("Environment not found: {}", i.environment_id),
176 }
177 }
178
179 #[tool(description = "Promote immutable release bundle to target environment")]
180 async fn promote_build(&self, Parameters(i): Parameters<PromoteBuildInput>) -> String {
181 let mut envs = self.envs.write().await;
182 match envs.get_mut(&i.target_environment_id) {
183 Some(env) => {
184 let prev = env.active_release_id.clone();
185 env.active_release_id = Some(i.release_bundle_id.clone());
186 env.updated_at = Utc::now();
187 let deploy = Deployment {
188 deployment_id: format!("deploy_{}", Uuid::new_v4().simple()),
189 environment_id: i.target_environment_id.clone(),
190 release_bundle_id: i.release_bundle_id, previous_release_id: prev,
191 strategy: i.strategy.unwrap_or_else(|| "immediate".into()),
192 status: DeployStatus::Succeeded, promoted_by: i.promoted_by,
193 validation_id: None, approval_id: None,
194 created_at: Utc::now(), completed_at: Some(Utc::now()),
195 };
196 drop(envs);
197 self.deploys.write().await.push(deploy.clone());
198 serde_json::to_string_pretty(&deploy).unwrap()
199 }
200 None => format!("Environment not found: {}", i.target_environment_id),
201 }
202 }
203
204 #[tool(description = "Roll back to previous release bundle")]
205 async fn rollback_deploy(&self, Parameters(i): Parameters<RollbackDeployInput>) -> String {
206 let mut envs = self.envs.write().await;
207 match envs.get_mut(&i.environment_id) {
208 Some(env) => {
209 let deploys = self.deploys.read().await;
210 let prev = deploys.iter().rev()
211 .filter(|d| d.environment_id == i.environment_id && d.status == DeployStatus::Succeeded)
212 .nth(1);
213 match prev {
214 Some(prev_deploy) => {
215 let from = env.active_release_id.clone();
216 env.active_release_id = Some(prev_deploy.release_bundle_id.clone());
217 env.updated_at = Utc::now();
218 serde_json::to_string_pretty(&serde_json::json!({
219 "rollback_id": format!("rb_{}", Uuid::new_v4().simple()),
220 "environment_id": i.environment_id,
221 "from_release": from, "to_release": prev_deploy.release_bundle_id,
222 "reason": i.reason, "reason_code": i.reason_code,
223 "status": "rolled_back"
224 })).unwrap()
225 }
226 None => "No previous deployment to roll back to".into(),
227 }
228 }
229 None => format!("Environment not found: {}", i.environment_id),
230 }
231 }
232
233 #[tool(description = "Inspect release events and promotion evidence")]
234 async fn get_deploy_history(&self, Parameters(i): Parameters<GetDeployHistoryInput>) -> String {
235 let deploys = self.deploys.read().await;
236 let history: Vec<_> = deploys.iter().rev()
237 .filter(|d| d.environment_id == i.environment_id)
238 .take(i.limit.unwrap_or(10)).cloned().collect();
239 serde_json::to_string_pretty(&history).unwrap()
240 }
241
242 #[tool(description = "Detect configuration drift from expected state")]
243 async fn detect_config_drift(&self, Parameters(i): Parameters<DetectConfigDriftInput>) -> String {
244 let envs = self.envs.read().await;
245 match envs.get(&i.environment_id) {
246 Some(env) => {
247 let mut items = Vec::new();
249 if env.config_version > 1 {
250 items.push(DriftItem { target: "config_version".into(), expected: "1".into(), actual: env.config_version.to_string(), severity: "low".into() });
251 }
252 let report = DriftReport {
253 drift_id: format!("drift_{}", Uuid::new_v4().simple()),
254 environment_id: i.environment_id,
255 total_drifted: items.len() as u32,
256 drifted_items: items,
257 detected_at: Utc::now(),
258 };
259 serde_json::to_string_pretty(&report).unwrap()
260 }
261 None => format!("Environment not found: {}", i.environment_id),
262 }
263 }
264
265 #[tool(description = "Get detailed worker pool status for an environment")]
266 async fn get_worker_pool_status(&self, Parameters(i): Parameters<GetWorkerPoolStatusInput>) -> String {
267 let envs = self.envs.read().await;
268 match envs.get(&i.environment_id) {
269 Some(env) => {
270 let pools: Vec<_> = env.worker_pools.iter().map(|p| serde_json::json!({
271 "pool_id": p.pool_id, "pool_type": p.pool_type,
272 "min": p.min_capacity, "desired": p.desired_capacity,
273 "max": p.max_capacity, "current": p.current_capacity,
274 "status": p.status,
275 "utilization": format!("{}%", (p.current_capacity as f64 / p.desired_capacity.max(1) as f64 * 100.0) as u32),
276 })).collect();
277 serde_json::to_string_pretty(&serde_json::json!({
278 "environment_id": i.environment_id, "pools": pools, "total_pools": pools.len()
279 })).unwrap()
280 }
281 None => format!("Environment not found: {}", i.environment_id),
282 }
283 }
284}
285
286#[async_trait::async_trait]
287impl HealthCheck for EnvironmentServer {
288 async fn check_health(&self) -> HealthStatus {
289 HealthStatus {
290 healthy: true,
291 message: Some("operational".into()),
292 latency_ms: Some(1),
293 }
294 }
295}