forge_orchestration/controlplane/
api.rs

1//! REST API Server for the Control Plane
2//!
3//! Provides a Kubernetes-style API with:
4//! - RESTful CRUD operations
5//! - Watch endpoints for real-time updates
6//! - Subresource endpoints (status, scale)
7//! - OpenAPI schema generation
8
9use std::sync::Arc;
10use axum::{
11    Router,
12    routing::{get, post, put, delete},
13    extract::{Path, Query, State, Json},
14    http::StatusCode,
15    response::IntoResponse,
16};
17use serde::{Deserialize, Serialize};
18use tracing::info;
19
20use super::{ResourceKind, ResourceStore, StoreError};
21
22/// API Server configuration
23#[derive(Debug, Clone)]
24pub struct ApiServerConfig {
25    /// Listen address
26    pub listen_addr: String,
27    /// Enable TLS
28    pub tls_enabled: bool,
29    /// TLS cert path
30    pub tls_cert: Option<String>,
31    /// TLS key path
32    pub tls_key: Option<String>,
33    /// Enable authentication
34    pub auth_enabled: bool,
35    /// Enable admission controllers
36    pub admission_enabled: bool,
37}
38
39impl Default for ApiServerConfig {
40    fn default() -> Self {
41        Self {
42            listen_addr: "0.0.0.0:6443".to_string(),
43            tls_enabled: false,
44            tls_cert: None,
45            tls_key: None,
46            auth_enabled: false,
47            admission_enabled: true,
48        }
49    }
50}
51
52/// API Server state
53#[derive(Clone)]
54pub struct ApiServerState {
55    /// Resource store
56    pub store: Arc<ResourceStore>,
57}
58
59/// API Server
60pub struct ApiServer {
61    config: ApiServerConfig,
62    state: ApiServerState,
63}
64
65impl ApiServer {
66    /// Create new API server
67    pub fn new(config: ApiServerConfig, store: Arc<ResourceStore>) -> Self {
68        Self {
69            config,
70            state: ApiServerState { store },
71        }
72    }
73
74    /// Build the router
75    pub fn router(&self) -> Router {
76        Router::new()
77            // Health endpoints
78            .route("/healthz", get(health))
79            .route("/readyz", get(ready))
80            .route("/livez", get(live))
81            
82            // API discovery
83            .route("/api", get(api_versions))
84            .route("/apis", get(api_groups))
85            
86            // Core API v1
87            .route("/api/v1/namespaces/:namespace/workloads", 
88                get(list_workloads).post(create_workload))
89            .route("/api/v1/namespaces/:namespace/workloads/:name",
90                get(get_workload).put(update_workload).delete(delete_workload))
91            .route("/api/v1/namespaces/:namespace/workloads/:name/status",
92                get(get_workload_status).put(update_workload_status))
93            
94            // Nodes (cluster-scoped)
95            .route("/api/v1/nodes", get(list_nodes).post(create_node))
96            .route("/api/v1/nodes/:name", get(get_node).put(update_node).delete(delete_node))
97            
98            // Watch endpoints
99            .route("/api/v1/watch/namespaces/:namespace/workloads", get(watch_workloads))
100            .route("/api/v1/watch/nodes", get(watch_nodes))
101            
102            .with_state(self.state.clone())
103    }
104
105    /// Start the API server
106    pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
107        let router = self.router();
108        let listener = tokio::net::TcpListener::bind(&self.config.listen_addr).await?;
109        
110        info!(addr = %self.config.listen_addr, "API server starting");
111        axum::serve(listener, router).await?;
112        
113        Ok(())
114    }
115}
116
117// Health endpoints
118async fn health() -> impl IntoResponse {
119    StatusCode::OK
120}
121
122async fn ready() -> impl IntoResponse {
123    StatusCode::OK
124}
125
126async fn live() -> impl IntoResponse {
127    StatusCode::OK
128}
129
130// API discovery
131async fn api_versions() -> impl IntoResponse {
132    Json(serde_json::json!({
133        "kind": "APIVersions",
134        "versions": ["v1"],
135        "serverAddressByClientCIDRs": []
136    }))
137}
138
139async fn api_groups() -> impl IntoResponse {
140    Json(serde_json::json!({
141        "kind": "APIGroupList",
142        "apiVersion": "v1",
143        "groups": [
144            {
145                "name": "forge.io",
146                "versions": [
147                    {"groupVersion": "forge.io/v1", "version": "v1"}
148                ],
149                "preferredVersion": {"groupVersion": "forge.io/v1", "version": "v1"}
150            }
151        ]
152    }))
153}
154
155/// Query parameters for list operations
156#[derive(Debug, Deserialize)]
157pub struct ListParams {
158    /// Label selector
159    #[serde(rename = "labelSelector")]
160    pub label_selector: Option<String>,
161    /// Field selector
162    #[serde(rename = "fieldSelector")]
163    pub field_selector: Option<String>,
164    /// Limit results
165    pub limit: Option<u32>,
166    /// Continue token
167    #[serde(rename = "continue")]
168    pub continue_token: Option<String>,
169    /// Resource version for watch
170    #[serde(rename = "resourceVersion")]
171    pub resource_version: Option<u64>,
172}
173
174/// API response wrapper
175#[derive(Debug, Serialize)]
176pub struct ApiResponse<T> {
177    #[serde(rename = "apiVersion")]
178    pub api_version: String,
179    pub kind: String,
180    pub metadata: ListMeta,
181    pub items: Vec<T>,
182}
183
184/// List metadata
185#[derive(Debug, Serialize)]
186pub struct ListMeta {
187    #[serde(rename = "resourceVersion")]
188    pub resource_version: String,
189    #[serde(rename = "continue", skip_serializing_if = "Option::is_none")]
190    pub continue_token: Option<String>,
191}
192
193/// API error response
194#[derive(Debug, Serialize)]
195pub struct ApiError {
196    #[serde(rename = "apiVersion")]
197    pub api_version: String,
198    pub kind: String,
199    pub status: String,
200    pub message: String,
201    pub reason: String,
202    pub code: u16,
203}
204
205impl ApiError {
206    fn not_found(resource: &str, name: &str) -> Self {
207        Self {
208            api_version: "v1".to_string(),
209            kind: "Status".to_string(),
210            status: "Failure".to_string(),
211            message: format!("{} \"{}\" not found", resource, name),
212            reason: "NotFound".to_string(),
213            code: 404,
214        }
215    }
216
217    fn already_exists(resource: &str, name: &str) -> Self {
218        Self {
219            api_version: "v1".to_string(),
220            kind: "Status".to_string(),
221            status: "Failure".to_string(),
222            message: format!("{} \"{}\" already exists", resource, name),
223            reason: "AlreadyExists".to_string(),
224            code: 409,
225        }
226    }
227
228    fn conflict(message: &str) -> Self {
229        Self {
230            api_version: "v1".to_string(),
231            kind: "Status".to_string(),
232            status: "Failure".to_string(),
233            message: message.to_string(),
234            reason: "Conflict".to_string(),
235            code: 409,
236        }
237    }
238}
239
240impl IntoResponse for ApiError {
241    fn into_response(self) -> axum::response::Response {
242        let status = StatusCode::from_u16(self.code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
243        (status, Json(self)).into_response()
244    }
245}
246
247// Workload handlers
248async fn list_workloads(
249    State(state): State<ApiServerState>,
250    Path(namespace): Path<String>,
251    Query(_params): Query<ListParams>,
252) -> impl IntoResponse {
253    let items = state.store.list(&ResourceKind::Workload, Some(&namespace));
254    
255    Json(ApiResponse {
256        api_version: "forge.io/v1".to_string(),
257        kind: "WorkloadList".to_string(),
258        metadata: ListMeta {
259            resource_version: state.store.current_version().to_string(),
260            continue_token: None,
261        },
262        items,
263    })
264}
265
266async fn create_workload(
267    State(state): State<ApiServerState>,
268    Path(namespace): Path<String>,
269    Json(body): Json<serde_json::Value>,
270) -> Result<impl IntoResponse, ApiError> {
271    let name = body.get("metadata")
272        .and_then(|m| m.get("name"))
273        .and_then(|n| n.as_str())
274        .ok_or_else(|| ApiError {
275            api_version: "v1".to_string(),
276            kind: "Status".to_string(),
277            status: "Failure".to_string(),
278            message: "metadata.name is required".to_string(),
279            reason: "Invalid".to_string(),
280            code: 400,
281        })?;
282
283    let key = format!("{}/{}", namespace, name);
284    
285    state.store.create(ResourceKind::Workload, &key, body.clone())
286        .map_err(|e| match e {
287            StoreError::AlreadyExists(_) => ApiError::already_exists("workload", name),
288            _ => ApiError {
289                api_version: "v1".to_string(),
290                kind: "Status".to_string(),
291                status: "Failure".to_string(),
292                message: e.to_string(),
293                reason: "InternalError".to_string(),
294                code: 500,
295            },
296        })?;
297
298    Ok((StatusCode::CREATED, Json(body)))
299}
300
301async fn get_workload(
302    State(state): State<ApiServerState>,
303    Path((namespace, name)): Path<(String, String)>,
304) -> Result<impl IntoResponse, ApiError> {
305    let key = format!("{}/{}", namespace, name);
306    
307    state.store.get(&ResourceKind::Workload, &key)
308        .map(Json)
309        .ok_or_else(|| ApiError::not_found("workload", &name))
310}
311
312async fn update_workload(
313    State(state): State<ApiServerState>,
314    Path((namespace, name)): Path<(String, String)>,
315    Json(body): Json<serde_json::Value>,
316) -> Result<impl IntoResponse, ApiError> {
317    let key = format!("{}/{}", namespace, name);
318    
319    // Extract resource version for optimistic concurrency
320    let resource_version = body.get("metadata")
321        .and_then(|m| m.get("resourceVersion"))
322        .and_then(|v| v.as_str())
323        .and_then(|v| v.parse::<u64>().ok());
324
325    state.store.update(ResourceKind::Workload, &key, body.clone(), resource_version)
326        .map_err(|e| match e {
327            StoreError::NotFound(_) => ApiError::not_found("workload", &name),
328            StoreError::Conflict(expected, actual) => {
329                ApiError::conflict(&format!("resource version mismatch: expected {}, got {}", expected, actual))
330            }
331            _ => ApiError {
332                api_version: "v1".to_string(),
333                kind: "Status".to_string(),
334                status: "Failure".to_string(),
335                message: e.to_string(),
336                reason: "InternalError".to_string(),
337                code: 500,
338            },
339        })?;
340
341    Ok(Json(body))
342}
343
344async fn delete_workload(
345    State(state): State<ApiServerState>,
346    Path((namespace, name)): Path<(String, String)>,
347) -> Result<impl IntoResponse, ApiError> {
348    let key = format!("{}/{}", namespace, name);
349    
350    state.store.delete(&ResourceKind::Workload, &key)
351        .map_err(|e| match e {
352            StoreError::NotFound(_) => ApiError::not_found("workload", &name),
353            _ => ApiError {
354                api_version: "v1".to_string(),
355                kind: "Status".to_string(),
356                status: "Failure".to_string(),
357                message: e.to_string(),
358                reason: "InternalError".to_string(),
359                code: 500,
360            },
361        })?;
362
363    Ok(StatusCode::OK)
364}
365
366async fn get_workload_status(
367    State(state): State<ApiServerState>,
368    Path((namespace, name)): Path<(String, String)>,
369) -> Result<impl IntoResponse, ApiError> {
370    let key = format!("{}/{}", namespace, name);
371    
372    let workload = state.store.get(&ResourceKind::Workload, &key)
373        .ok_or_else(|| ApiError::not_found("workload", &name))?;
374
375    // Return just the status subresource
376    let status = workload.get("status").cloned().unwrap_or(serde_json::json!({}));
377    Ok(Json(status))
378}
379
380async fn update_workload_status(
381    State(state): State<ApiServerState>,
382    Path((namespace, name)): Path<(String, String)>,
383    Json(status): Json<serde_json::Value>,
384) -> Result<impl IntoResponse, ApiError> {
385    let key = format!("{}/{}", namespace, name);
386    
387    let mut workload = state.store.get(&ResourceKind::Workload, &key)
388        .ok_or_else(|| ApiError::not_found("workload", &name))?;
389
390    // Update only the status field
391    if let Some(obj) = workload.as_object_mut() {
392        obj.insert("status".to_string(), status.clone());
393    }
394
395    state.store.update(ResourceKind::Workload, &key, workload.clone(), None)
396        .map_err(|e| ApiError {
397            api_version: "v1".to_string(),
398            kind: "Status".to_string(),
399            status: "Failure".to_string(),
400            message: e.to_string(),
401            reason: "InternalError".to_string(),
402            code: 500,
403        })?;
404
405    Ok(Json(status))
406}
407
408// Node handlers
409async fn list_nodes(
410    State(state): State<ApiServerState>,
411    Query(_params): Query<ListParams>,
412) -> impl IntoResponse {
413    let items = state.store.list(&ResourceKind::Node, None);
414    
415    Json(ApiResponse {
416        api_version: "v1".to_string(),
417        kind: "NodeList".to_string(),
418        metadata: ListMeta {
419            resource_version: state.store.current_version().to_string(),
420            continue_token: None,
421        },
422        items,
423    })
424}
425
426async fn create_node(
427    State(state): State<ApiServerState>,
428    Json(body): Json<serde_json::Value>,
429) -> Result<impl IntoResponse, ApiError> {
430    let name = body.get("metadata")
431        .and_then(|m| m.get("name"))
432        .and_then(|n| n.as_str())
433        .ok_or_else(|| ApiError {
434            api_version: "v1".to_string(),
435            kind: "Status".to_string(),
436            status: "Failure".to_string(),
437            message: "metadata.name is required".to_string(),
438            reason: "Invalid".to_string(),
439            code: 400,
440        })?;
441
442    state.store.create(ResourceKind::Node, name, body.clone())
443        .map_err(|e| match e {
444            StoreError::AlreadyExists(_) => ApiError::already_exists("node", name),
445            _ => ApiError {
446                api_version: "v1".to_string(),
447                kind: "Status".to_string(),
448                status: "Failure".to_string(),
449                message: e.to_string(),
450                reason: "InternalError".to_string(),
451                code: 500,
452            },
453        })?;
454
455    Ok((StatusCode::CREATED, Json(body)))
456}
457
458async fn get_node(
459    State(state): State<ApiServerState>,
460    Path(name): Path<String>,
461) -> Result<impl IntoResponse, ApiError> {
462    state.store.get(&ResourceKind::Node, &name)
463        .map(Json)
464        .ok_or_else(|| ApiError::not_found("node", &name))
465}
466
467async fn update_node(
468    State(state): State<ApiServerState>,
469    Path(name): Path<String>,
470    Json(body): Json<serde_json::Value>,
471) -> Result<impl IntoResponse, ApiError> {
472    state.store.update(ResourceKind::Node, &name, body.clone(), None)
473        .map_err(|e| match e {
474            StoreError::NotFound(_) => ApiError::not_found("node", &name),
475            _ => ApiError {
476                api_version: "v1".to_string(),
477                kind: "Status".to_string(),
478                status: "Failure".to_string(),
479                message: e.to_string(),
480                reason: "InternalError".to_string(),
481                code: 500,
482            },
483        })?;
484
485    Ok(Json(body))
486}
487
488async fn delete_node(
489    State(state): State<ApiServerState>,
490    Path(name): Path<String>,
491) -> Result<impl IntoResponse, ApiError> {
492    state.store.delete(&ResourceKind::Node, &name)
493        .map_err(|e| match e {
494            StoreError::NotFound(_) => ApiError::not_found("node", &name),
495            _ => ApiError {
496                api_version: "v1".to_string(),
497                kind: "Status".to_string(),
498                status: "Failure".to_string(),
499                message: e.to_string(),
500                reason: "InternalError".to_string(),
501                code: 500,
502            },
503        })?;
504
505    Ok(StatusCode::OK)
506}
507
508// Watch handlers (simplified - real implementation would use SSE)
509async fn watch_workloads(
510    State(_state): State<ApiServerState>,
511    Path(_namespace): Path<String>,
512    Query(_params): Query<ListParams>,
513) -> impl IntoResponse {
514    // In a real implementation, this would return an SSE stream
515    Json(serde_json::json!({
516        "type": "ADDED",
517        "object": {}
518    }))
519}
520
521async fn watch_nodes(
522    State(_state): State<ApiServerState>,
523    Query(_params): Query<ListParams>,
524) -> impl IntoResponse {
525    Json(serde_json::json!({
526        "type": "ADDED",
527        "object": {}
528    }))
529}