use crate::error::DoDResult;
use crate::observation::Observation;
use crate::timing::TimingMeasurement;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::time::Instant;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct KernelActionId(Uuid);
impl KernelActionId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
impl Default for KernelActionId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for KernelActionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ActionType {
SchemaEvolution,
ProjectionUpdate,
InvariantAdjustment,
MarketplaceAction,
AutonomicAction,
StateUpdate,
Custom(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum IdempotenceMode {
Idempotent,
NonIdempotent,
ConditionallyIdempotent(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KernelAction {
id: KernelActionId,
action_type: ActionType,
payload: serde_json::Value,
idempotence: IdempotenceMode,
tenant_id: String,
triggering_observations: Vec<crate::observation::ObservationId>,
}
impl KernelAction {
pub fn new(
action_type: ActionType, payload: serde_json::Value, tenant_id: impl Into<String>,
) -> Self {
Self {
id: KernelActionId::new(),
action_type,
payload,
idempotence: IdempotenceMode::NonIdempotent,
tenant_id: tenant_id.into(),
triggering_observations: Vec::new(),
}
}
pub fn id(&self) -> KernelActionId {
self.id
}
pub fn action_type(&self) -> &ActionType {
&self.action_type
}
pub fn payload(&self) -> &serde_json::Value {
&self.payload
}
pub fn with_idempotence(mut self, mode: IdempotenceMode) -> Self {
self.idempotence = mode;
self
}
pub fn idempotent(self) -> Self {
self.with_idempotence(IdempotenceMode::Idempotent)
}
pub fn with_triggering_observation(
mut self, obs_id: crate::observation::ObservationId,
) -> Self {
self.triggering_observations.push(obs_id);
self
}
pub fn idempotence(&self) -> IdempotenceMode {
self.idempotence.clone()
}
pub fn tenant_id(&self) -> &str {
&self.tenant_id
}
pub fn triggering_observations(&self) -> &[crate::observation::ObservationId] {
&self.triggering_observations
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KernelDecision {
decision_id: String,
observations: Vec<Observation>,
actions: Vec<KernelAction>,
timing: TimingMeasurement,
is_replay: bool,
determinism_hash: Option<String>,
}
impl KernelDecision {
pub fn new() -> Self {
Self {
decision_id: Uuid::new_v4().to_string(),
observations: Vec::new(),
actions: Vec::new(),
timing: TimingMeasurement::new(),
is_replay: false,
determinism_hash: None,
}
}
pub fn with_observation(mut self, obs: Observation) -> Self {
self.observations.push(obs);
self
}
pub fn with_action(mut self, action: KernelAction) -> Self {
self.actions.push(action);
self
}
pub fn decision_id(&self) -> &str {
&self.decision_id
}
pub fn observations(&self) -> &[Observation] {
&self.observations
}
pub fn actions(&self) -> &[KernelAction] {
&self.actions
}
pub fn timing(&self) -> &TimingMeasurement {
&self.timing
}
pub fn is_replay(&self) -> bool {
self.is_replay
}
pub fn as_replay(mut self) -> Self {
self.is_replay = true;
self
}
pub fn with_determinism_hash(mut self, hash: String) -> Self {
self.determinism_hash = Some(hash);
self
}
pub fn determinism_hash(&self) -> Option<&str> {
self.determinism_hash.as_deref()
}
}
impl Default for KernelDecision {
fn default() -> Self {
Self::new()
}
}
pub struct Kernel {
contracts: BTreeMap<String, serde_json::Value>,
invariants: BTreeMap<String, String>,
execution_history: BTreeMap<String, KernelDecision>,
}
impl Kernel {
pub fn new() -> Self {
Self {
contracts: BTreeMap::new(),
invariants: BTreeMap::new(),
execution_history: BTreeMap::new(),
}
}
pub fn update_schema(
&mut self, name: impl Into<String>, schema: serde_json::Value,
) -> DoDResult<()> {
self.contracts.insert(name.into(), schema);
Ok(())
}
pub fn add_invariant(
&mut self, name: impl Into<String>, constraint: impl Into<String>,
) -> DoDResult<()> {
self.invariants.insert(name.into(), constraint.into());
Ok(())
}
pub fn decide(
&mut self, observations: Vec<Observation>, tenant_id: &str,
) -> DoDResult<KernelDecision> {
let start = Instant::now();
let _decision_start = TimingMeasurement::new();
if observations.is_empty() {
return Err(crate::error::DoDError::KernelDecision(
"no observations provided".to_string(),
));
}
let tenant = observations[0].tenant_id();
if !observations.iter().all(|o| o.tenant_id() == tenant) {
return Err(crate::error::DoDError::TenantIsolation(
"observations from different tenants".to_string(),
));
}
if tenant != tenant_id {
return Err(crate::error::DoDError::TenantIsolation(
"tenant mismatch".to_string(),
));
}
let mut decision = KernelDecision::new();
for obs in observations {
decision = decision.with_observation(obs);
}
let actions = self.derive_actions(&decision)?;
for action in actions {
decision = decision.with_action(action);
}
let elapsed = start.elapsed().as_millis() as u64;
let _timing = TimingMeasurement::new().finished(elapsed);
if elapsed > crate::constants::KERNEL_MAX_TIME_MS {
return Err(crate::error::DoDError::TimingViolation {
expected: crate::constants::KERNEL_MAX_TIME_MS,
actual: elapsed,
});
}
let hash = self.compute_determinism_hash(&decision);
decision = decision.with_determinism_hash(hash);
self.execution_history
.insert(decision.decision_id().to_string(), decision.clone());
Ok(decision)
}
fn derive_actions(&self, decision: &KernelDecision) -> DoDResult<Vec<KernelAction>> {
let mut actions = Vec::new();
for obs in decision.observations() {
let action = match obs.obs_type() {
crate::observation::ObservationType::Metric(_) => KernelAction::new(
ActionType::StateUpdate,
serde_json::json!({"type": "metric_update"}),
obs.tenant_id(),
),
crate::observation::ObservationType::Anomaly(_) => KernelAction::new(
ActionType::AutonomicAction,
serde_json::json!({"type": "anomaly_response"}),
obs.tenant_id(),
)
.idempotent(),
crate::observation::ObservationType::SLOBreach(_) => KernelAction::new(
ActionType::SchemaEvolution,
serde_json::json!({"type": "slo_response"}),
obs.tenant_id(),
),
_ => KernelAction::new(
ActionType::Custom("unknown".to_string()),
serde_json::json!({"observation": obs.data()}),
obs.tenant_id(),
),
}
.with_triggering_observation(obs.id());
actions.push(action);
}
Ok(actions)
}
fn compute_determinism_hash(&self, decision: &KernelDecision) -> String {
use sha2::Digest;
let mut hasher = sha2::Sha256::new();
for obs in decision.observations() {
hasher.update(obs.id().to_string());
hasher.update(serde_json::to_string(&obs.data()).unwrap_or_default());
}
for (name, schema) in &self.contracts {
hasher.update(name);
hasher.update(schema.to_string());
}
for (name, constraint) in &self.invariants {
hasher.update(name);
hasher.update(constraint);
}
for action in decision.actions() {
hasher.update(action.id().to_string());
hasher.update(action.payload().to_string());
}
hex::encode(hasher.finalize())
}
pub fn verify_determinism(
&self, original_decision: &KernelDecision, replay_decision: &KernelDecision,
) -> DoDResult<()> {
let original_hash = original_decision
.determinism_hash()
.ok_or_else(|| crate::error::DoDError::DeterminismViolation)?;
let replay_hash = replay_decision
.determinism_hash()
.ok_or_else(|| crate::error::DoDError::DeterminismViolation)?;
if original_hash != replay_hash {
return Err(crate::error::DoDError::DeterminismViolation);
}
Ok(())
}
pub fn execution_history(&self) -> &BTreeMap<String, KernelDecision> {
&self.execution_history
}
}
impl Default for Kernel {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kernel_creation() {
let kernel = Kernel::new();
assert!(kernel.contracts.is_empty());
assert!(kernel.invariants.is_empty());
}
#[test]
fn test_kernel_decision() -> DoDResult<()> {
let mut kernel = Kernel::new();
let obs = crate::observation::Observation::new(
crate::observation::ObservationType::Metric(crate::observation::MetricType::Latency),
serde_json::json!({"value": 42}),
"test",
"1.0",
"tenant-1",
)?;
let decision = kernel.decide(vec![obs], "tenant-1")?;
assert!(!decision.actions().is_empty());
Ok(())
}
#[test]
fn test_kernel_timing() -> DoDResult<()> {
let mut kernel = Kernel::new();
let obs = crate::observation::Observation::new(
crate::observation::ObservationType::Metric(crate::observation::MetricType::Latency),
serde_json::json!({"value": 1}),
"test",
"1.0",
"tenant-1",
)?;
let decision = kernel.decide(vec![obs], "tenant-1")?;
assert!(decision.timing().elapsed_ms() < crate::constants::KERNEL_MAX_TIME_MS);
Ok(())
}
}