1pub mod api;
11pub mod admission;
12pub mod watch;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use tracing::{info, warn};
19
20pub use api::{ApiServer, ApiServerConfig};
21pub use admission::{AdmissionController, AdmissionResult, ValidationWebhook, MutatingWebhook};
22pub use watch::{WatchEvent, WatchStream, ResourceVersion};
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
26pub enum ResourceKind {
27 Workload,
29 Node,
31 Service,
33 ConfigMap,
35 Secret,
37 PriorityClass,
39 Custom(String),
41}
42
43impl std::fmt::Display for ResourceKind {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 ResourceKind::Workload => write!(f, "workloads"),
47 ResourceKind::Node => write!(f, "nodes"),
48 ResourceKind::Service => write!(f, "services"),
49 ResourceKind::ConfigMap => write!(f, "configmaps"),
50 ResourceKind::Secret => write!(f, "secrets"),
51 ResourceKind::PriorityClass => write!(f, "priorityclasses"),
52 ResourceKind::Custom(name) => write!(f, "{}", name),
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ObjectMeta {
60 pub name: String,
62 pub namespace: String,
64 pub uid: String,
66 pub resource_version: u64,
68 pub generation: u64,
70 pub creation_timestamp: chrono::DateTime<chrono::Utc>,
72 pub deletion_timestamp: Option<chrono::DateTime<chrono::Utc>>,
74 pub labels: HashMap<String, String>,
76 pub annotations: HashMap<String, String>,
78 pub owner_references: Vec<OwnerReference>,
80 pub finalizers: Vec<String>,
82}
83
84impl ObjectMeta {
85 pub fn new(name: impl Into<String>, namespace: impl Into<String>) -> Self {
87 Self {
88 name: name.into(),
89 namespace: namespace.into(),
90 uid: uuid::Uuid::new_v4().to_string(),
91 resource_version: 1,
92 generation: 1,
93 creation_timestamp: chrono::Utc::now(),
94 deletion_timestamp: None,
95 labels: HashMap::new(),
96 annotations: HashMap::new(),
97 owner_references: Vec::new(),
98 finalizers: Vec::new(),
99 }
100 }
101
102 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
104 self.labels.insert(key.into(), value.into());
105 self
106 }
107
108 pub fn with_annotation(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
110 self.annotations.insert(key.into(), value.into());
111 self
112 }
113
114 pub fn with_finalizer(mut self, finalizer: impl Into<String>) -> Self {
116 self.finalizers.push(finalizer.into());
117 self
118 }
119
120 pub fn bump_version(&mut self) {
122 self.resource_version += 1;
123 }
124
125 pub fn bump_generation(&mut self) {
127 self.generation += 1;
128 self.bump_version();
129 }
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct OwnerReference {
135 pub api_version: String,
137 pub kind: String,
139 pub name: String,
141 pub uid: String,
143 pub controller: bool,
145 pub block_owner_deletion: bool,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct Resource<T> {
152 pub api_version: String,
154 pub kind: ResourceKind,
156 pub metadata: ObjectMeta,
158 pub spec: T,
160 pub status: Option<serde_json::Value>,
162}
163
164impl<T> Resource<T> {
165 pub fn new(kind: ResourceKind, name: impl Into<String>, spec: T) -> Self {
167 Self {
168 api_version: "forge.io/v1".to_string(),
169 kind,
170 metadata: ObjectMeta::new(name, "default"),
171 spec,
172 status: None,
173 }
174 }
175
176 pub fn in_namespace(mut self, namespace: impl Into<String>) -> Self {
178 self.metadata.namespace = namespace.into();
179 self
180 }
181
182 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
184 self.metadata = self.metadata.with_label(key, value);
185 self
186 }
187}
188
189pub struct ResourceStore {
191 resources: RwLock<HashMap<ResourceKind, HashMap<String, serde_json::Value>>>,
193 version_counter: RwLock<u64>,
195 watchers: Arc<watch::WatchRegistry>,
197}
198
199impl ResourceStore {
200 pub fn new() -> Self {
202 Self {
203 resources: RwLock::new(HashMap::new()),
204 version_counter: RwLock::new(0),
205 watchers: Arc::new(watch::WatchRegistry::new()),
206 }
207 }
208
209 fn next_version(&self) -> u64 {
211 let mut counter = self.version_counter.write();
212 *counter += 1;
213 *counter
214 }
215
216 pub fn create(&self, kind: ResourceKind, key: &str, value: serde_json::Value) -> Result<u64, StoreError> {
218 let mut resources = self.resources.write();
219 let kind_map = resources.entry(kind.clone()).or_insert_with(HashMap::new);
220
221 if kind_map.contains_key(key) {
222 return Err(StoreError::AlreadyExists(key.to_string()));
223 }
224
225 let version = self.next_version();
226 kind_map.insert(key.to_string(), value.clone());
227
228 self.watchers.notify(WatchEvent::Added {
230 kind: kind.clone(),
231 key: key.to_string(),
232 value,
233 version,
234 });
235
236 info!(kind = %kind, key = key, version = version, "Resource created");
237 Ok(version)
238 }
239
240 pub fn get(&self, kind: &ResourceKind, key: &str) -> Option<serde_json::Value> {
242 self.resources.read()
243 .get(kind)
244 .and_then(|m| m.get(key).cloned())
245 }
246
247 pub fn update(&self, kind: ResourceKind, key: &str, value: serde_json::Value, expected_version: Option<u64>) -> Result<u64, StoreError> {
249 let mut resources = self.resources.write();
250 let kind_map = resources.entry(kind.clone()).or_insert_with(HashMap::new);
251
252 if !kind_map.contains_key(key) {
253 return Err(StoreError::NotFound(key.to_string()));
254 }
255
256 if let Some(expected) = expected_version {
258 let current = *self.version_counter.read();
260 if current != expected {
261 return Err(StoreError::Conflict(expected, current));
262 }
263 }
264
265 let version = self.next_version();
266 kind_map.insert(key.to_string(), value.clone());
267
268 self.watchers.notify(WatchEvent::Modified {
270 kind: kind.clone(),
271 key: key.to_string(),
272 value,
273 version,
274 });
275
276 Ok(version)
277 }
278
279 pub fn delete(&self, kind: &ResourceKind, key: &str) -> Result<(), StoreError> {
281 let mut resources = self.resources.write();
282
283 if let Some(kind_map) = resources.get_mut(kind) {
284 if let Some(value) = kind_map.remove(key) {
285 let version = self.next_version();
286
287 self.watchers.notify(WatchEvent::Deleted {
289 kind: kind.clone(),
290 key: key.to_string(),
291 value,
292 version,
293 });
294
295 info!(kind = %kind, key = key, "Resource deleted");
296 return Ok(());
297 }
298 }
299
300 Err(StoreError::NotFound(key.to_string()))
301 }
302
303 pub fn list(&self, kind: &ResourceKind, namespace: Option<&str>) -> Vec<serde_json::Value> {
305 self.resources.read()
306 .get(kind)
307 .map(|m| {
308 m.iter()
309 .filter(|(k, _)| {
310 namespace.map(|ns| k.starts_with(&format!("{}/", ns))).unwrap_or(true)
311 })
312 .map(|(_, v)| v.clone())
313 .collect()
314 })
315 .unwrap_or_default()
316 }
317
318 pub fn watch(&self, kind: ResourceKind) -> watch::WatchStream {
320 self.watchers.subscribe(kind)
321 }
322
323 pub fn current_version(&self) -> u64 {
325 *self.version_counter.read()
326 }
327}
328
329impl Default for ResourceStore {
330 fn default() -> Self {
331 Self::new()
332 }
333}
334
335#[derive(Debug, thiserror::Error)]
337pub enum StoreError {
338 #[error("Resource already exists: {0}")]
340 AlreadyExists(String),
341 #[error("Resource not found: {0}")]
343 NotFound(String),
344 #[error("Version conflict: expected {0}, got {1}")]
346 Conflict(u64, u64),
347 #[error("Serialization error: {0}")]
349 Serialization(String),
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn test_resource_store_crud() {
358 let store = ResourceStore::new();
359 let value = serde_json::json!({"name": "test"});
360
361 let version = store.create(ResourceKind::Workload, "default/test", value.clone()).unwrap();
363 assert!(version > 0);
364
365 let retrieved = store.get(&ResourceKind::Workload, "default/test").unwrap();
367 assert_eq!(retrieved, value);
368
369 let new_value = serde_json::json!({"name": "updated"});
371 store.update(ResourceKind::Workload, "default/test", new_value.clone(), None).unwrap();
372
373 let retrieved = store.get(&ResourceKind::Workload, "default/test").unwrap();
374 assert_eq!(retrieved, new_value);
375
376 store.delete(&ResourceKind::Workload, "default/test").unwrap();
378 assert!(store.get(&ResourceKind::Workload, "default/test").is_none());
379 }
380
381 #[test]
382 fn test_resource_store_conflict() {
383 let store = ResourceStore::new();
384 let value = serde_json::json!({"name": "test"});
385
386 store.create(ResourceKind::Workload, "default/test", value.clone()).unwrap();
387
388 let result = store.create(ResourceKind::Workload, "default/test", value);
390 assert!(matches!(result, Err(StoreError::AlreadyExists(_))));
391 }
392}