1use std::sync::Arc;
10use axum::{
11 Router,
12 routing::{get, post, put, delete},
13 extract::{Path, Query, State, Json},
14 http::StatusCode,
15 response::IntoResponse,
16};
17use serde::{Deserialize, Serialize};
18use tracing::info;
19
20use super::{ResourceKind, ResourceStore, StoreError};
21
22#[derive(Debug, Clone)]
24pub struct ApiServerConfig {
25 pub listen_addr: String,
27 pub tls_enabled: bool,
29 pub tls_cert: Option<String>,
31 pub tls_key: Option<String>,
33 pub auth_enabled: bool,
35 pub admission_enabled: bool,
37}
38
39impl Default for ApiServerConfig {
40 fn default() -> Self {
41 Self {
42 listen_addr: "0.0.0.0:6443".to_string(),
43 tls_enabled: false,
44 tls_cert: None,
45 tls_key: None,
46 auth_enabled: false,
47 admission_enabled: true,
48 }
49 }
50}
51
52#[derive(Clone)]
54pub struct ApiServerState {
55 pub store: Arc<ResourceStore>,
57}
58
59pub struct ApiServer {
61 config: ApiServerConfig,
62 state: ApiServerState,
63}
64
65impl ApiServer {
66 pub fn new(config: ApiServerConfig, store: Arc<ResourceStore>) -> Self {
68 Self {
69 config,
70 state: ApiServerState { store },
71 }
72 }
73
74 pub fn router(&self) -> Router {
76 Router::new()
77 .route("/healthz", get(health))
79 .route("/readyz", get(ready))
80 .route("/livez", get(live))
81
82 .route("/api", get(api_versions))
84 .route("/apis", get(api_groups))
85
86 .route("/api/v1/namespaces/:namespace/workloads",
88 get(list_workloads).post(create_workload))
89 .route("/api/v1/namespaces/:namespace/workloads/:name",
90 get(get_workload).put(update_workload).delete(delete_workload))
91 .route("/api/v1/namespaces/:namespace/workloads/:name/status",
92 get(get_workload_status).put(update_workload_status))
93
94 .route("/api/v1/nodes", get(list_nodes).post(create_node))
96 .route("/api/v1/nodes/:name", get(get_node).put(update_node).delete(delete_node))
97
98 .route("/api/v1/watch/namespaces/:namespace/workloads", get(watch_workloads))
100 .route("/api/v1/watch/nodes", get(watch_nodes))
101
102 .with_state(self.state.clone())
103 }
104
105 pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
107 let router = self.router();
108 let listener = tokio::net::TcpListener::bind(&self.config.listen_addr).await?;
109
110 info!(addr = %self.config.listen_addr, "API server starting");
111 axum::serve(listener, router).await?;
112
113 Ok(())
114 }
115}
116
117async fn health() -> impl IntoResponse {
119 StatusCode::OK
120}
121
122async fn ready() -> impl IntoResponse {
123 StatusCode::OK
124}
125
126async fn live() -> impl IntoResponse {
127 StatusCode::OK
128}
129
130async fn api_versions() -> impl IntoResponse {
132 Json(serde_json::json!({
133 "kind": "APIVersions",
134 "versions": ["v1"],
135 "serverAddressByClientCIDRs": []
136 }))
137}
138
139async fn api_groups() -> impl IntoResponse {
140 Json(serde_json::json!({
141 "kind": "APIGroupList",
142 "apiVersion": "v1",
143 "groups": [
144 {
145 "name": "forge.io",
146 "versions": [
147 {"groupVersion": "forge.io/v1", "version": "v1"}
148 ],
149 "preferredVersion": {"groupVersion": "forge.io/v1", "version": "v1"}
150 }
151 ]
152 }))
153}
154
155#[derive(Debug, Deserialize)]
157pub struct ListParams {
158 #[serde(rename = "labelSelector")]
160 pub label_selector: Option<String>,
161 #[serde(rename = "fieldSelector")]
163 pub field_selector: Option<String>,
164 pub limit: Option<u32>,
166 #[serde(rename = "continue")]
168 pub continue_token: Option<String>,
169 #[serde(rename = "resourceVersion")]
171 pub resource_version: Option<u64>,
172}
173
174#[derive(Debug, Serialize)]
176pub struct ApiResponse<T> {
177 #[serde(rename = "apiVersion")]
178 pub api_version: String,
179 pub kind: String,
180 pub metadata: ListMeta,
181 pub items: Vec<T>,
182}
183
184#[derive(Debug, Serialize)]
186pub struct ListMeta {
187 #[serde(rename = "resourceVersion")]
188 pub resource_version: String,
189 #[serde(rename = "continue", skip_serializing_if = "Option::is_none")]
190 pub continue_token: Option<String>,
191}
192
193#[derive(Debug, Serialize)]
195pub struct ApiError {
196 #[serde(rename = "apiVersion")]
197 pub api_version: String,
198 pub kind: String,
199 pub status: String,
200 pub message: String,
201 pub reason: String,
202 pub code: u16,
203}
204
205impl ApiError {
206 fn not_found(resource: &str, name: &str) -> Self {
207 Self {
208 api_version: "v1".to_string(),
209 kind: "Status".to_string(),
210 status: "Failure".to_string(),
211 message: format!("{} \"{}\" not found", resource, name),
212 reason: "NotFound".to_string(),
213 code: 404,
214 }
215 }
216
217 fn already_exists(resource: &str, name: &str) -> Self {
218 Self {
219 api_version: "v1".to_string(),
220 kind: "Status".to_string(),
221 status: "Failure".to_string(),
222 message: format!("{} \"{}\" already exists", resource, name),
223 reason: "AlreadyExists".to_string(),
224 code: 409,
225 }
226 }
227
228 fn conflict(message: &str) -> Self {
229 Self {
230 api_version: "v1".to_string(),
231 kind: "Status".to_string(),
232 status: "Failure".to_string(),
233 message: message.to_string(),
234 reason: "Conflict".to_string(),
235 code: 409,
236 }
237 }
238}
239
240impl IntoResponse for ApiError {
241 fn into_response(self) -> axum::response::Response {
242 let status = StatusCode::from_u16(self.code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
243 (status, Json(self)).into_response()
244 }
245}
246
247async fn list_workloads(
249 State(state): State<ApiServerState>,
250 Path(namespace): Path<String>,
251 Query(_params): Query<ListParams>,
252) -> impl IntoResponse {
253 let items = state.store.list(&ResourceKind::Workload, Some(&namespace));
254
255 Json(ApiResponse {
256 api_version: "forge.io/v1".to_string(),
257 kind: "WorkloadList".to_string(),
258 metadata: ListMeta {
259 resource_version: state.store.current_version().to_string(),
260 continue_token: None,
261 },
262 items,
263 })
264}
265
266async fn create_workload(
267 State(state): State<ApiServerState>,
268 Path(namespace): Path<String>,
269 Json(body): Json<serde_json::Value>,
270) -> Result<impl IntoResponse, ApiError> {
271 let name = body.get("metadata")
272 .and_then(|m| m.get("name"))
273 .and_then(|n| n.as_str())
274 .ok_or_else(|| ApiError {
275 api_version: "v1".to_string(),
276 kind: "Status".to_string(),
277 status: "Failure".to_string(),
278 message: "metadata.name is required".to_string(),
279 reason: "Invalid".to_string(),
280 code: 400,
281 })?;
282
283 let key = format!("{}/{}", namespace, name);
284
285 state.store.create(ResourceKind::Workload, &key, body.clone())
286 .map_err(|e| match e {
287 StoreError::AlreadyExists(_) => ApiError::already_exists("workload", name),
288 _ => ApiError {
289 api_version: "v1".to_string(),
290 kind: "Status".to_string(),
291 status: "Failure".to_string(),
292 message: e.to_string(),
293 reason: "InternalError".to_string(),
294 code: 500,
295 },
296 })?;
297
298 Ok((StatusCode::CREATED, Json(body)))
299}
300
301async fn get_workload(
302 State(state): State<ApiServerState>,
303 Path((namespace, name)): Path<(String, String)>,
304) -> Result<impl IntoResponse, ApiError> {
305 let key = format!("{}/{}", namespace, name);
306
307 state.store.get(&ResourceKind::Workload, &key)
308 .map(Json)
309 .ok_or_else(|| ApiError::not_found("workload", &name))
310}
311
312async fn update_workload(
313 State(state): State<ApiServerState>,
314 Path((namespace, name)): Path<(String, String)>,
315 Json(body): Json<serde_json::Value>,
316) -> Result<impl IntoResponse, ApiError> {
317 let key = format!("{}/{}", namespace, name);
318
319 let resource_version = body.get("metadata")
321 .and_then(|m| m.get("resourceVersion"))
322 .and_then(|v| v.as_str())
323 .and_then(|v| v.parse::<u64>().ok());
324
325 state.store.update(ResourceKind::Workload, &key, body.clone(), resource_version)
326 .map_err(|e| match e {
327 StoreError::NotFound(_) => ApiError::not_found("workload", &name),
328 StoreError::Conflict(expected, actual) => {
329 ApiError::conflict(&format!("resource version mismatch: expected {}, got {}", expected, actual))
330 }
331 _ => ApiError {
332 api_version: "v1".to_string(),
333 kind: "Status".to_string(),
334 status: "Failure".to_string(),
335 message: e.to_string(),
336 reason: "InternalError".to_string(),
337 code: 500,
338 },
339 })?;
340
341 Ok(Json(body))
342}
343
344async fn delete_workload(
345 State(state): State<ApiServerState>,
346 Path((namespace, name)): Path<(String, String)>,
347) -> Result<impl IntoResponse, ApiError> {
348 let key = format!("{}/{}", namespace, name);
349
350 state.store.delete(&ResourceKind::Workload, &key)
351 .map_err(|e| match e {
352 StoreError::NotFound(_) => ApiError::not_found("workload", &name),
353 _ => ApiError {
354 api_version: "v1".to_string(),
355 kind: "Status".to_string(),
356 status: "Failure".to_string(),
357 message: e.to_string(),
358 reason: "InternalError".to_string(),
359 code: 500,
360 },
361 })?;
362
363 Ok(StatusCode::OK)
364}
365
366async fn get_workload_status(
367 State(state): State<ApiServerState>,
368 Path((namespace, name)): Path<(String, String)>,
369) -> Result<impl IntoResponse, ApiError> {
370 let key = format!("{}/{}", namespace, name);
371
372 let workload = state.store.get(&ResourceKind::Workload, &key)
373 .ok_or_else(|| ApiError::not_found("workload", &name))?;
374
375 let status = workload.get("status").cloned().unwrap_or(serde_json::json!({}));
377 Ok(Json(status))
378}
379
380async fn update_workload_status(
381 State(state): State<ApiServerState>,
382 Path((namespace, name)): Path<(String, String)>,
383 Json(status): Json<serde_json::Value>,
384) -> Result<impl IntoResponse, ApiError> {
385 let key = format!("{}/{}", namespace, name);
386
387 let mut workload = state.store.get(&ResourceKind::Workload, &key)
388 .ok_or_else(|| ApiError::not_found("workload", &name))?;
389
390 if let Some(obj) = workload.as_object_mut() {
392 obj.insert("status".to_string(), status.clone());
393 }
394
395 state.store.update(ResourceKind::Workload, &key, workload.clone(), None)
396 .map_err(|e| ApiError {
397 api_version: "v1".to_string(),
398 kind: "Status".to_string(),
399 status: "Failure".to_string(),
400 message: e.to_string(),
401 reason: "InternalError".to_string(),
402 code: 500,
403 })?;
404
405 Ok(Json(status))
406}
407
408async fn list_nodes(
410 State(state): State<ApiServerState>,
411 Query(_params): Query<ListParams>,
412) -> impl IntoResponse {
413 let items = state.store.list(&ResourceKind::Node, None);
414
415 Json(ApiResponse {
416 api_version: "v1".to_string(),
417 kind: "NodeList".to_string(),
418 metadata: ListMeta {
419 resource_version: state.store.current_version().to_string(),
420 continue_token: None,
421 },
422 items,
423 })
424}
425
426async fn create_node(
427 State(state): State<ApiServerState>,
428 Json(body): Json<serde_json::Value>,
429) -> Result<impl IntoResponse, ApiError> {
430 let name = body.get("metadata")
431 .and_then(|m| m.get("name"))
432 .and_then(|n| n.as_str())
433 .ok_or_else(|| ApiError {
434 api_version: "v1".to_string(),
435 kind: "Status".to_string(),
436 status: "Failure".to_string(),
437 message: "metadata.name is required".to_string(),
438 reason: "Invalid".to_string(),
439 code: 400,
440 })?;
441
442 state.store.create(ResourceKind::Node, name, body.clone())
443 .map_err(|e| match e {
444 StoreError::AlreadyExists(_) => ApiError::already_exists("node", name),
445 _ => ApiError {
446 api_version: "v1".to_string(),
447 kind: "Status".to_string(),
448 status: "Failure".to_string(),
449 message: e.to_string(),
450 reason: "InternalError".to_string(),
451 code: 500,
452 },
453 })?;
454
455 Ok((StatusCode::CREATED, Json(body)))
456}
457
458async fn get_node(
459 State(state): State<ApiServerState>,
460 Path(name): Path<String>,
461) -> Result<impl IntoResponse, ApiError> {
462 state.store.get(&ResourceKind::Node, &name)
463 .map(Json)
464 .ok_or_else(|| ApiError::not_found("node", &name))
465}
466
467async fn update_node(
468 State(state): State<ApiServerState>,
469 Path(name): Path<String>,
470 Json(body): Json<serde_json::Value>,
471) -> Result<impl IntoResponse, ApiError> {
472 state.store.update(ResourceKind::Node, &name, body.clone(), None)
473 .map_err(|e| match e {
474 StoreError::NotFound(_) => ApiError::not_found("node", &name),
475 _ => ApiError {
476 api_version: "v1".to_string(),
477 kind: "Status".to_string(),
478 status: "Failure".to_string(),
479 message: e.to_string(),
480 reason: "InternalError".to_string(),
481 code: 500,
482 },
483 })?;
484
485 Ok(Json(body))
486}
487
488async fn delete_node(
489 State(state): State<ApiServerState>,
490 Path(name): Path<String>,
491) -> Result<impl IntoResponse, ApiError> {
492 state.store.delete(&ResourceKind::Node, &name)
493 .map_err(|e| match e {
494 StoreError::NotFound(_) => ApiError::not_found("node", &name),
495 _ => ApiError {
496 api_version: "v1".to_string(),
497 kind: "Status".to_string(),
498 status: "Failure".to_string(),
499 message: e.to_string(),
500 reason: "InternalError".to_string(),
501 code: 500,
502 },
503 })?;
504
505 Ok(StatusCode::OK)
506}
507
508async fn watch_workloads(
510 State(_state): State<ApiServerState>,
511 Path(_namespace): Path<String>,
512 Query(_params): Query<ListParams>,
513) -> impl IntoResponse {
514 Json(serde_json::json!({
516 "type": "ADDED",
517 "object": {}
518 }))
519}
520
521async fn watch_nodes(
522 State(_state): State<ApiServerState>,
523 Query(_params): Query<ListParams>,
524) -> impl IntoResponse {
525 Json(serde_json::json!({
526 "type": "ADDED",
527 "object": {}
528 }))
529}