forge_orchestration/controlplane/
mod.rs

1//! Control Plane for Forge Orchestration
2//!
3//! A complete control plane comparable to Kubernetes API server:
4//! - RESTful API for workload management
5//! - Watch/subscribe for real-time updates
6//! - RBAC and authentication
7//! - Admission controllers
8//! - Resource versioning and optimistic concurrency
9
10pub mod api;
11pub mod admission;
12pub mod watch;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tracing::{info, warn};
19
20pub use api::{ApiServer, ApiServerConfig};
21pub use admission::{AdmissionController, AdmissionResult, ValidationWebhook, MutatingWebhook};
22pub use watch::{WatchEvent, WatchStream, ResourceVersion};
23
24/// Resource kind
25#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26pub enum ResourceKind {
27    /// Workload resource
28    Workload,
29    /// Node resource
30    Node,
31    /// Service resource
32    Service,
33    /// ConfigMap resource
34    ConfigMap,
35    /// Secret resource
36    Secret,
37    /// PriorityClass resource
38    PriorityClass,
39    /// Custom resource
40    Custom(String),
41}
42
43impl std::fmt::Display for ResourceKind {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            ResourceKind::Workload => write!(f, "workloads"),
47            ResourceKind::Node => write!(f, "nodes"),
48            ResourceKind::Service => write!(f, "services"),
49            ResourceKind::ConfigMap => write!(f, "configmaps"),
50            ResourceKind::Secret => write!(f, "secrets"),
51            ResourceKind::PriorityClass => write!(f, "priorityclasses"),
52            ResourceKind::Custom(name) => write!(f, "{}", name),
53        }
54    }
55}
56
57/// Object metadata
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ObjectMeta {
60    /// Resource name
61    pub name: String,
62    /// Namespace
63    pub namespace: String,
64    /// Unique identifier
65    pub uid: String,
66    /// Resource version for optimistic concurrency
67    pub resource_version: u64,
68    /// Generation (incremented on spec changes)
69    pub generation: u64,
70    /// Creation timestamp
71    pub creation_timestamp: chrono::DateTime<chrono::Utc>,
72    /// Deletion timestamp (if being deleted)
73    pub deletion_timestamp: Option<chrono::DateTime<chrono::Utc>>,
74    /// Labels
75    pub labels: HashMap<String, String>,
76    /// Annotations
77    pub annotations: HashMap<String, String>,
78    /// Owner references
79    pub owner_references: Vec<OwnerReference>,
80    /// Finalizers
81    pub finalizers: Vec<String>,
82}
83
84impl ObjectMeta {
85    /// Create new metadata
86    pub fn new(name: impl Into<String>, namespace: impl Into<String>) -> Self {
87        Self {
88            name: name.into(),
89            namespace: namespace.into(),
90            uid: uuid::Uuid::new_v4().to_string(),
91            resource_version: 1,
92            generation: 1,
93            creation_timestamp: chrono::Utc::now(),
94            deletion_timestamp: None,
95            labels: HashMap::new(),
96            annotations: HashMap::new(),
97            owner_references: Vec::new(),
98            finalizers: Vec::new(),
99        }
100    }
101
102    /// Add label
103    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
104        self.labels.insert(key.into(), value.into());
105        self
106    }
107
108    /// Add annotation
109    pub fn with_annotation(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
110        self.annotations.insert(key.into(), value.into());
111        self
112    }
113
114    /// Add finalizer
115    pub fn with_finalizer(mut self, finalizer: impl Into<String>) -> Self {
116        self.finalizers.push(finalizer.into());
117        self
118    }
119
120    /// Increment resource version
121    pub fn bump_version(&mut self) {
122        self.resource_version += 1;
123    }
124
125    /// Increment generation
126    pub fn bump_generation(&mut self) {
127        self.generation += 1;
128        self.bump_version();
129    }
130}
131
132/// Owner reference for garbage collection
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct OwnerReference {
135    /// API version
136    pub api_version: String,
137    /// Resource kind
138    pub kind: String,
139    /// Owner name
140    pub name: String,
141    /// Owner UID
142    pub uid: String,
143    /// Controller flag
144    pub controller: bool,
145    /// Block owner deletion
146    pub block_owner_deletion: bool,
147}
148
149/// Generic resource wrapper
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct Resource<T> {
152    /// API version
153    pub api_version: String,
154    /// Resource kind
155    pub kind: ResourceKind,
156    /// Metadata
157    pub metadata: ObjectMeta,
158    /// Spec
159    pub spec: T,
160    /// Status (optional)
161    pub status: Option<serde_json::Value>,
162}
163
164impl<T> Resource<T> {
165    /// Create new resource
166    pub fn new(kind: ResourceKind, name: impl Into<String>, spec: T) -> Self {
167        Self {
168            api_version: "forge.io/v1".to_string(),
169            kind,
170            metadata: ObjectMeta::new(name, "default"),
171            spec,
172            status: None,
173        }
174    }
175
176    /// Set namespace
177    pub fn in_namespace(mut self, namespace: impl Into<String>) -> Self {
178        self.metadata.namespace = namespace.into();
179        self
180    }
181
182    /// Add label
183    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
184        self.metadata = self.metadata.with_label(key, value);
185        self
186    }
187}
188
189/// Resource store for the control plane
190pub struct ResourceStore {
191    /// Resources by kind and namespace/name
192    resources: RwLock<HashMap<ResourceKind, HashMap<String, serde_json::Value>>>,
193    /// Global resource version counter
194    version_counter: RwLock<u64>,
195    /// Watch subscribers
196    watchers: Arc<watch::WatchRegistry>,
197}
198
199impl ResourceStore {
200    /// Create new resource store
201    pub fn new() -> Self {
202        Self {
203            resources: RwLock::new(HashMap::new()),
204            version_counter: RwLock::new(0),
205            watchers: Arc::new(watch::WatchRegistry::new()),
206        }
207    }
208
209    /// Get next resource version
210    fn next_version(&self) -> u64 {
211        let mut counter = self.version_counter.write();
212        *counter += 1;
213        *counter
214    }
215
216    /// Create resource
217    pub fn create(&self, kind: ResourceKind, key: &str, value: serde_json::Value) -> Result<u64, StoreError> {
218        let mut resources = self.resources.write();
219        let kind_map = resources.entry(kind.clone()).or_insert_with(HashMap::new);
220
221        if kind_map.contains_key(key) {
222            return Err(StoreError::AlreadyExists(key.to_string()));
223        }
224
225        let version = self.next_version();
226        kind_map.insert(key.to_string(), value.clone());
227
228        // Notify watchers
229        self.watchers.notify(WatchEvent::Added {
230            kind: kind.clone(),
231            key: key.to_string(),
232            value,
233            version,
234        });
235
236        info!(kind = %kind, key = key, version = version, "Resource created");
237        Ok(version)
238    }
239
240    /// Get resource
241    pub fn get(&self, kind: &ResourceKind, key: &str) -> Option<serde_json::Value> {
242        self.resources.read()
243            .get(kind)
244            .and_then(|m| m.get(key).cloned())
245    }
246
247    /// Update resource
248    pub fn update(&self, kind: ResourceKind, key: &str, value: serde_json::Value, expected_version: Option<u64>) -> Result<u64, StoreError> {
249        let mut resources = self.resources.write();
250        let kind_map = resources.entry(kind.clone()).or_insert_with(HashMap::new);
251
252        if !kind_map.contains_key(key) {
253            return Err(StoreError::NotFound(key.to_string()));
254        }
255
256        // Check version for optimistic concurrency
257        if let Some(expected) = expected_version {
258            // In a real implementation, we'd check the stored version
259            let current = *self.version_counter.read();
260            if current != expected {
261                return Err(StoreError::Conflict(expected, current));
262            }
263        }
264
265        let version = self.next_version();
266        kind_map.insert(key.to_string(), value.clone());
267
268        // Notify watchers
269        self.watchers.notify(WatchEvent::Modified {
270            kind: kind.clone(),
271            key: key.to_string(),
272            value,
273            version,
274        });
275
276        Ok(version)
277    }
278
279    /// Delete resource
280    pub fn delete(&self, kind: &ResourceKind, key: &str) -> Result<(), StoreError> {
281        let mut resources = self.resources.write();
282        
283        if let Some(kind_map) = resources.get_mut(kind) {
284            if let Some(value) = kind_map.remove(key) {
285                let version = self.next_version();
286                
287                // Notify watchers
288                self.watchers.notify(WatchEvent::Deleted {
289                    kind: kind.clone(),
290                    key: key.to_string(),
291                    value,
292                    version,
293                });
294
295                info!(kind = %kind, key = key, "Resource deleted");
296                return Ok(());
297            }
298        }
299
300        Err(StoreError::NotFound(key.to_string()))
301    }
302
303    /// List resources of a kind
304    pub fn list(&self, kind: &ResourceKind, namespace: Option<&str>) -> Vec<serde_json::Value> {
305        self.resources.read()
306            .get(kind)
307            .map(|m| {
308                m.iter()
309                    .filter(|(k, _)| {
310                        namespace.map(|ns| k.starts_with(&format!("{}/", ns))).unwrap_or(true)
311                    })
312                    .map(|(_, v)| v.clone())
313                    .collect()
314            })
315            .unwrap_or_default()
316    }
317
318    /// Subscribe to watch events
319    pub fn watch(&self, kind: ResourceKind) -> watch::WatchStream {
320        self.watchers.subscribe(kind)
321    }
322
323    /// Get current resource version
324    pub fn current_version(&self) -> u64 {
325        *self.version_counter.read()
326    }
327}
328
329impl Default for ResourceStore {
330    fn default() -> Self {
331        Self::new()
332    }
333}
334
335/// Store error
336#[derive(Debug, thiserror::Error)]
337pub enum StoreError {
338    /// Resource already exists
339    #[error("Resource already exists: {0}")]
340    AlreadyExists(String),
341    /// Resource not found
342    #[error("Resource not found: {0}")]
343    NotFound(String),
344    /// Version conflict
345    #[error("Version conflict: expected {0}, got {1}")]
346    Conflict(u64, u64),
347    /// Serialization error
348    #[error("Serialization error: {0}")]
349    Serialization(String),
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[test]
357    fn test_resource_store_crud() {
358        let store = ResourceStore::new();
359        let value = serde_json::json!({"name": "test"});
360
361        // Create
362        let version = store.create(ResourceKind::Workload, "default/test", value.clone()).unwrap();
363        assert!(version > 0);
364
365        // Get
366        let retrieved = store.get(&ResourceKind::Workload, "default/test").unwrap();
367        assert_eq!(retrieved, value);
368
369        // Update
370        let new_value = serde_json::json!({"name": "updated"});
371        store.update(ResourceKind::Workload, "default/test", new_value.clone(), None).unwrap();
372        
373        let retrieved = store.get(&ResourceKind::Workload, "default/test").unwrap();
374        assert_eq!(retrieved, new_value);
375
376        // Delete
377        store.delete(&ResourceKind::Workload, "default/test").unwrap();
378        assert!(store.get(&ResourceKind::Workload, "default/test").is_none());
379    }
380
381    #[test]
382    fn test_resource_store_conflict() {
383        let store = ResourceStore::new();
384        let value = serde_json::json!({"name": "test"});
385
386        store.create(ResourceKind::Workload, "default/test", value.clone()).unwrap();
387
388        // Try to create again
389        let result = store.create(ResourceKind::Workload, "default/test", value);
390        assert!(matches!(result, Err(StoreError::AlreadyExists(_))));
391    }
392}