1use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10
11use async_trait::async_trait;
12use chrono::{DateTime, Duration, Utc};
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use tracing::info;
16
17use crate::error::KernelError;
18use crate::health::HealthStatus;
19use crate::process::Pid;
20use crate::service::{ServiceType, SystemService};
21
22#[non_exhaustive]
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29pub enum ConfigValue {
30 Text(String),
32 Integer(i64),
34 Float(f64),
36 Boolean(bool),
38 Json(serde_json::Value),
40}
41
42impl ConfigValue {
43 pub fn to_json(&self) -> serde_json::Value {
45 match self {
46 ConfigValue::Text(s) => serde_json::Value::String(s.clone()),
47 ConfigValue::Integer(n) => serde_json::json!(n),
48 ConfigValue::Float(f) => serde_json::json!(f),
49 ConfigValue::Boolean(b) => serde_json::json!(b),
50 ConfigValue::Json(v) => v.clone(),
51 }
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct ConfigEntry {
62 pub key: String,
64 pub namespace: String,
66 pub value: ConfigValue,
68 pub updated_at: DateTime<Utc>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ConfigChange {
79 pub namespace: String,
81 pub key: String,
83 pub old_value: Option<serde_json::Value>,
85 pub new_value: Option<serde_json::Value>,
87 pub changed_by: Pid,
89 pub timestamp: DateTime<Utc>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct SecretRef {
100 pub namespace: String,
102 pub key: String,
104 pub expires_at: DateTime<Utc>,
106 pub scoped_to: Vec<Pid>,
108}
109
110pub struct ConfigService {
120 configs: DashMap<String, serde_json::Value>,
122 entries: DashMap<String, ConfigEntry>,
124 secrets: DashMap<String, Vec<u8>>,
126 secret_refs: DashMap<String, SecretRef>,
128 subscribers: DashMap<String, Vec<Arc<RwLock<Vec<ConfigChange>>>>>,
130 encryption_key: [u8; 32],
132 change_log: RwLock<Vec<ConfigChange>>,
134 set_count: AtomicU64,
136}
137
138impl ConfigService {
139 pub fn new(encryption_key: [u8; 32]) -> Self {
141 Self {
142 configs: DashMap::new(),
143 entries: DashMap::new(),
144 secrets: DashMap::new(),
145 secret_refs: DashMap::new(),
146 subscribers: DashMap::new(),
147 encryption_key,
148 change_log: RwLock::new(Vec::new()),
149 set_count: AtomicU64::new(0),
150 }
151 }
152
153 pub fn new_default() -> Self {
155 Self::new([0u8; 32])
156 }
157
158 pub fn set(
162 &self,
163 namespace: &str,
164 key: &str,
165 value: serde_json::Value,
166 changed_by: Pid,
167 ) -> Result<(), KernelError> {
168 let config_key = format!("{namespace}/{key}");
169 let old_value = self.configs.get(&config_key).map(|v| v.value().clone());
170 self.configs.insert(config_key, value.clone());
171
172 let change = ConfigChange {
173 namespace: namespace.to_string(),
174 key: key.to_string(),
175 old_value,
176 new_value: Some(value),
177 changed_by,
178 timestamp: Utc::now(),
179 };
180
181 self.notify_subscribers(namespace, &change);
183
184 if let Ok(mut log) = self.change_log.write() {
186 log.push(change);
187 }
188 self.set_count.fetch_add(1, Ordering::Relaxed);
189
190 Ok(())
191 }
192
193 pub fn get(&self, namespace: &str, key: &str) -> Option<serde_json::Value> {
195 let config_key = format!("{namespace}/{key}");
196 self.configs.get(&config_key).map(|v| v.value().clone())
197 }
198
199 pub fn delete(
201 &self,
202 namespace: &str,
203 key: &str,
204 changed_by: Pid,
205 ) -> Result<(), KernelError> {
206 let config_key = format!("{namespace}/{key}");
207 let old_value = self.configs.remove(&config_key).map(|(_, v)| v);
208
209 let change = ConfigChange {
210 namespace: namespace.to_string(),
211 key: key.to_string(),
212 old_value,
213 new_value: None,
214 changed_by,
215 timestamp: Utc::now(),
216 };
217 self.notify_subscribers(namespace, &change);
218 Ok(())
219 }
220
221 pub fn list_keys(&self, namespace: &str) -> Vec<String> {
223 let prefix = format!("{namespace}/");
224 self.configs
225 .iter()
226 .filter(|e| e.key().starts_with(&prefix))
227 .map(|e| e.key()[prefix.len()..].to_string())
228 .collect()
229 }
230
231 pub fn set_typed(
235 &self,
236 namespace: &str,
237 key: &str,
238 value: ConfigValue,
239 changed_by: Pid,
240 ) -> Result<(), KernelError> {
241 let config_key = format!("{namespace}/{key}");
242 let json_value = value.to_json();
243
244 let old_value = self.configs.get(&config_key).map(|v| v.value().clone());
246 self.configs.insert(config_key.clone(), json_value.clone());
247
248 let entry = ConfigEntry {
249 key: key.to_string(),
250 namespace: namespace.to_string(),
251 value,
252 updated_at: Utc::now(),
253 };
254 self.entries.insert(config_key, entry);
255
256 let change = ConfigChange {
257 namespace: namespace.to_string(),
258 key: key.to_string(),
259 old_value,
260 new_value: Some(json_value),
261 changed_by,
262 timestamp: Utc::now(),
263 };
264 self.notify_subscribers(namespace, &change);
265
266 if let Ok(mut log) = self.change_log.write() {
267 log.push(change);
268 }
269 self.set_count.fetch_add(1, Ordering::Relaxed);
270 Ok(())
271 }
272
273 pub fn get_typed(&self, namespace: &str, key: &str) -> Option<ConfigEntry> {
275 let config_key = format!("{namespace}/{key}");
276 self.entries.get(&config_key).map(|e| e.value().clone())
277 }
278
279 pub fn list(&self, namespace: &str) -> Vec<ConfigEntry> {
281 let prefix = format!("{namespace}/");
282 self.entries
283 .iter()
284 .filter(|e| e.key().starts_with(&prefix))
285 .map(|e| e.value().clone())
286 .collect()
287 }
288
289 pub fn delete_typed(
291 &self,
292 namespace: &str,
293 key: &str,
294 changed_by: Pid,
295 ) -> bool {
296 let config_key = format!("{namespace}/{key}");
297 let removed = self.entries.remove(&config_key).is_some();
298 let old_value = self.configs.remove(&config_key).map(|(_, v)| v);
300
301 let change = ConfigChange {
302 namespace: namespace.to_string(),
303 key: key.to_string(),
304 old_value,
305 new_value: None,
306 changed_by,
307 timestamp: Utc::now(),
308 };
309 self.notify_subscribers(namespace, &change);
310 removed
311 }
312
313 pub fn subscribe(&self, namespace: &str) -> Arc<RwLock<Vec<ConfigChange>>> {
320 let queue = Arc::new(RwLock::new(Vec::new()));
321 self.subscribers
322 .entry(namespace.to_string())
323 .or_default()
324 .push(queue.clone());
325 queue
326 }
327
328 fn notify_subscribers(&self, namespace: &str, change: &ConfigChange) {
330 if let Some(mut subs) = self.subscribers.get_mut(namespace) {
331 subs.retain(|queue| {
332 if let Ok(mut q) = queue.write() {
333 q.push(change.clone());
334 true
335 } else {
336 false }
338 });
339 }
340 }
341
342 pub fn set_secret(
346 &self,
347 namespace: &str,
348 key: &str,
349 value: &[u8],
350 scoped_to: Vec<Pid>,
351 ) -> Result<(), KernelError> {
352 let secret_key = format!("{namespace}/{key}");
353
354 let encrypted = self.xor_encrypt(value);
356 self.secrets.insert(secret_key.clone(), encrypted);
357
358 let secret_ref = SecretRef {
359 namespace: namespace.to_string(),
360 key: key.to_string(),
361 expires_at: Utc::now() + Duration::hours(24),
362 scoped_to,
363 };
364 self.secret_refs.insert(secret_key, secret_ref);
365 Ok(())
366 }
367
368 pub fn get_secret(
370 &self,
371 namespace: &str,
372 key: &str,
373 requester_pid: Pid,
374 ) -> Result<Vec<u8>, KernelError> {
375 let secret_key = format!("{namespace}/{key}");
376
377 let secret_ref = self
378 .secret_refs
379 .get(&secret_key)
380 .ok_or_else(|| KernelError::Service("secret not found".into()))?;
381
382 if !secret_ref.scoped_to.is_empty()
384 && !secret_ref.scoped_to.contains(&requester_pid)
385 {
386 return Err(KernelError::CapabilityDenied {
387 pid: requester_pid,
388 action: "read_secret".into(),
389 reason: format!("PID {} not authorized for secret {secret_key}", requester_pid),
390 });
391 }
392
393 if Utc::now() > secret_ref.expires_at {
395 return Err(KernelError::Service("secret expired".into()));
396 }
397
398 let encrypted = self
399 .secrets
400 .get(&secret_key)
401 .ok_or_else(|| KernelError::Service("secret data missing".into()))?;
402
403 Ok(self.xor_decrypt(&encrypted))
404 }
405
406 fn xor_encrypt(&self, data: &[u8]) -> Vec<u8> {
408 data.iter()
409 .enumerate()
410 .map(|(i, b)| b ^ self.encryption_key[i % 32])
411 .collect()
412 }
413
414 fn xor_decrypt(&self, data: &[u8]) -> Vec<u8> {
416 self.xor_encrypt(data)
418 }
419
420 pub fn change_log(&self) -> Vec<ConfigChange> {
422 self.change_log.read().map(|l| l.clone()).unwrap_or_default()
423 }
424
425 pub fn set_count(&self) -> u64 {
427 self.set_count.load(Ordering::Relaxed)
428 }
429}
430
431#[async_trait]
432impl SystemService for ConfigService {
433 fn name(&self) -> &str {
434 "config-service"
435 }
436
437 fn service_type(&self) -> ServiceType {
438 ServiceType::Core
439 }
440
441 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
442 info!("config service started");
443 Ok(())
444 }
445
446 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
447 info!(
448 configs = self.configs.len(),
449 secrets = self.secrets.len(),
450 "config service stopped"
451 );
452 Ok(())
453 }
454
455 async fn health_check(&self) -> HealthStatus {
456 HealthStatus::Healthy
457 }
458}
459
460#[cfg(test)]
463mod tests {
464 use super::*;
465
466 fn pid(n: u64) -> Pid {
467 n
468 }
469
470 #[test]
471 fn set_and_get_config() {
472 let svc = ConfigService::new_default();
473 svc.set("app", "timeout", serde_json::json!(30), pid(1)).unwrap();
474 let val = svc.get("app", "timeout").unwrap();
475 assert_eq!(val, serde_json::json!(30));
476 }
477
478 #[test]
479 fn get_nonexistent_returns_none() {
480 let svc = ConfigService::new_default();
481 assert!(svc.get("app", "missing").is_none());
482 }
483
484 #[test]
485 fn delete_config() {
486 let svc = ConfigService::new_default();
487 svc.set("app", "key", serde_json::json!("val"), pid(1)).unwrap();
488 svc.delete("app", "key", pid(1)).unwrap();
489 assert!(svc.get("app", "key").is_none());
490 }
491
492 #[test]
493 fn list_keys_in_namespace() {
494 let svc = ConfigService::new_default();
495 svc.set("ns", "a", serde_json::json!(1), pid(1)).unwrap();
496 svc.set("ns", "b", serde_json::json!(2), pid(1)).unwrap();
497 svc.set("other", "c", serde_json::json!(3), pid(1)).unwrap();
498 let mut keys = svc.list_keys("ns");
499 keys.sort();
500 assert_eq!(keys, vec!["a", "b"]);
501 }
502
503 #[test]
504 fn config_change_notification() {
505 let svc = ConfigService::new_default();
506 let sub = svc.subscribe("watch");
507 svc.set("watch", "flag", serde_json::json!(true), pid(1)).unwrap();
508 let changes = sub.read().unwrap();
509 assert_eq!(changes.len(), 1);
510 assert_eq!(changes[0].key, "flag");
511 assert_eq!(changes[0].new_value, Some(serde_json::json!(true)));
512 }
513
514 #[test]
515 fn config_change_includes_old_value() {
516 let svc = ConfigService::new_default();
517 let sub = svc.subscribe("ver");
518 svc.set("ver", "v", serde_json::json!(1), pid(1)).unwrap();
519 svc.set("ver", "v", serde_json::json!(2), pid(1)).unwrap();
520 let changes = sub.read().unwrap();
521 assert_eq!(changes.len(), 2);
522 assert_eq!(changes[1].old_value, Some(serde_json::json!(1)));
523 assert_eq!(changes[1].new_value, Some(serde_json::json!(2)));
524 }
525
526 #[test]
527 fn secret_set_and_get() {
528 let key = [0xAB; 32];
529 let svc = ConfigService::new(key);
530 svc.set_secret("creds", "api_key", b"secret123", vec![pid(1)])
531 .unwrap();
532 let val = svc.get_secret("creds", "api_key", pid(1)).unwrap();
533 assert_eq!(val, b"secret123");
534 }
535
536 #[test]
537 fn secret_encrypted_at_rest() {
538 let key = [0xAB; 32];
539 let svc = ConfigService::new(key);
540 svc.set_secret("creds", "pass", b"plaintext", vec![pid(1)])
541 .unwrap();
542 let stored = svc.secrets.get("creds/pass").unwrap();
544 assert_ne!(stored.as_slice(), b"plaintext");
545 }
546
547 #[test]
548 fn unauthorized_pid_cannot_read_secret() {
549 let svc = ConfigService::new_default();
550 svc.set_secret("creds", "key", b"val", vec![pid(1)]).unwrap();
551 let result = svc.get_secret("creds", "key", pid(99));
552 assert!(result.is_err());
553 let err = result.unwrap_err().to_string();
554 assert!(err.contains("denied") || err.contains("authorized"), "got: {err}");
555 }
556
557 #[test]
558 fn empty_scope_allows_any_pid() {
559 let svc = ConfigService::new_default();
560 svc.set_secret("open", "key", b"val", vec![]).unwrap();
561 let val = svc.get_secret("open", "key", pid(42)).unwrap();
562 assert_eq!(val, b"val");
563 }
564
565 #[test]
566 fn change_log_recorded() {
567 let svc = ConfigService::new_default();
568 svc.set("ns", "k", serde_json::json!("v"), pid(1)).unwrap();
569 let log = svc.change_log();
570 assert_eq!(log.len(), 1);
571 assert_eq!(log[0].namespace, "ns");
572 }
573
574 #[tokio::test]
575 async fn system_service_impl() {
576 let svc = ConfigService::new_default();
577 assert_eq!(svc.name(), "config-service");
578 assert_eq!(svc.service_type(), ServiceType::Core);
579 svc.start().await.unwrap();
580 assert_eq!(svc.health_check().await, HealthStatus::Healthy);
581 svc.stop().await.unwrap();
582 }
583
584 #[test]
587 fn typed_set_get_roundtrip() {
588 let svc = ConfigService::new_default();
589 svc.set_typed("app", "name", ConfigValue::Text("myapp".into()), pid(1))
590 .unwrap();
591 let entry = svc.get_typed("app", "name").unwrap();
592 assert_eq!(entry.value, ConfigValue::Text("myapp".into()));
593 assert_eq!(entry.namespace, "app");
594 assert_eq!(entry.key, "name");
595 }
596
597 #[test]
598 fn typed_get_nonexistent_returns_none() {
599 let svc = ConfigService::new_default();
600 assert!(svc.get_typed("app", "missing").is_none());
601 }
602
603 #[test]
604 fn typed_list_returns_all_in_namespace() {
605 let svc = ConfigService::new_default();
606 svc.set_typed("db", "host", ConfigValue::Text("localhost".into()), pid(1))
607 .unwrap();
608 svc.set_typed("db", "port", ConfigValue::Integer(5432), pid(1))
609 .unwrap();
610 svc.set_typed("cache", "ttl", ConfigValue::Integer(60), pid(1))
611 .unwrap();
612
613 let mut entries = svc.list("db");
614 entries.sort_by(|a, b| a.key.cmp(&b.key));
615 assert_eq!(entries.len(), 2);
616 assert_eq!(entries[0].key, "host");
617 assert_eq!(entries[1].key, "port");
618 }
619
620 #[test]
621 fn typed_delete_removes_entry() {
622 let svc = ConfigService::new_default();
623 svc.set_typed("ns", "k", ConfigValue::Boolean(true), pid(1)).unwrap();
624 assert!(svc.delete_typed("ns", "k", pid(1)));
625 assert!(svc.get_typed("ns", "k").is_none());
626 assert!(!svc.delete_typed("ns", "k", pid(1)));
628 }
629
630 #[test]
631 fn typed_subscribe_receives_change_notification() {
632 let svc = ConfigService::new_default();
633 let sub = svc.subscribe("typed-ns");
634 svc.set_typed("typed-ns", "flag", ConfigValue::Boolean(true), pid(1))
635 .unwrap();
636 let changes = sub.read().unwrap();
637 assert_eq!(changes.len(), 1);
638 assert_eq!(changes[0].key, "flag");
639 assert_eq!(changes[0].new_value, Some(serde_json::json!(true)));
640 }
641
642 #[test]
643 fn typed_set_updates_existing_value() {
644 let svc = ConfigService::new_default();
645 svc.set_typed("app", "level", ConfigValue::Integer(1), pid(1)).unwrap();
646 svc.set_typed("app", "level", ConfigValue::Integer(2), pid(1)).unwrap();
647 let entry = svc.get_typed("app", "level").unwrap();
648 assert_eq!(entry.value, ConfigValue::Integer(2));
649 }
650
651 #[test]
652 fn typed_namespace_isolation() {
653 let svc = ConfigService::new_default();
654 svc.set_typed("alpha", "key", ConfigValue::Text("a".into()), pid(1)).unwrap();
655 svc.set_typed("beta", "key", ConfigValue::Text("b".into()), pid(1)).unwrap();
656
657 let a = svc.get_typed("alpha", "key").unwrap();
658 let b = svc.get_typed("beta", "key").unwrap();
659 assert_eq!(a.value, ConfigValue::Text("a".into()));
660 assert_eq!(b.value, ConfigValue::Text("b".into()));
661
662 assert_eq!(svc.list("alpha").len(), 1);
664 assert_eq!(svc.list("beta").len(), 1);
665 assert_eq!(svc.list("gamma").len(), 0);
666 }
667
668 #[test]
669 fn typed_all_value_variants() {
670 let svc = ConfigService::new_default();
671
672 svc.set_typed("t", "text", ConfigValue::Text("hello".into()), pid(1)).unwrap();
673 svc.set_typed("t", "int", ConfigValue::Integer(42), pid(1)).unwrap();
674 svc.set_typed("t", "float", ConfigValue::Float(3.14), pid(1)).unwrap();
675 svc.set_typed("t", "bool", ConfigValue::Boolean(false), pid(1)).unwrap();
676 svc.set_typed("t", "json", ConfigValue::Json(serde_json::json!({"a": 1})), pid(1)).unwrap();
677
678 assert_eq!(svc.get_typed("t", "text").unwrap().value, ConfigValue::Text("hello".into()));
679 assert_eq!(svc.get_typed("t", "int").unwrap().value, ConfigValue::Integer(42));
680 assert_eq!(svc.get_typed("t", "float").unwrap().value, ConfigValue::Float(3.14));
681 assert_eq!(svc.get_typed("t", "bool").unwrap().value, ConfigValue::Boolean(false));
682 assert_eq!(
683 svc.get_typed("t", "json").unwrap().value,
684 ConfigValue::Json(serde_json::json!({"a": 1}))
685 );
686 }
687}