use llmosafe::{
CognitivePipeline, EscalationPolicy, EscalationReason, MemoryStats, PidState, PipelineConfig,
PipelineResult, ResourceGuard, SafetyContext, SafetyDecision, Synapse,
};
use std::fs;
pub use llmosafe::DesignAssuranceLevel;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::{Duration, Instant};
struct ResourceHistory {
measurements: Vec<(Instant, u8)>,
window_secs: u64,
cooldown_secs: u64,
last_check: Option<Instant>,
persist_path: Option<PathBuf>,
}
impl ResourceHistory {
fn new(window_secs: u64, cooldown_secs: u64, persist_path: Option<PathBuf>) -> Self {
let mut history = Self {
measurements: Vec::with_capacity(60),
window_secs,
cooldown_secs,
last_check: None,
persist_path,
};
history.restore_last_check();
history
}
fn restore_last_check(&mut self) {
if let Some(ref path) = self.persist_path {
if let Ok(content) = fs::read_to_string(path) {
if let Ok(secs) = content.trim().parse::<u64>() {
let now_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let elapsed_secs = now_epoch.saturating_sub(secs);
if elapsed_secs < self.cooldown_secs {
self.last_check = Some(
Instant::now()
.checked_sub(Duration::from_secs(elapsed_secs))
.unwrap_or_else(Instant::now),
);
}
}
}
}
}
fn persist_last_check(&self) {
if let Some(ref path) = self.persist_path {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
let _ = fs::write(path, secs.to_string());
}
}
fn record(&mut self, pressure: u8) -> f64 {
let now = Instant::now();
let cutoff = now
.checked_sub(Duration::from_secs(self.window_secs))
.unwrap_or_else(Instant::now);
self.measurements.retain(|(t, _)| *t > cutoff);
self.measurements.push((now, pressure));
if self.measurements.is_empty() {
return pressure as f64;
}
#[allow(clippy::cast_precision_loss)]
{
let count = self.measurements.len() as f64;
self.measurements
.iter()
.map(|(_, p)| *p as f64)
.sum::<f64>()
/ count
}
}
fn rolling_average(&self) -> Option<f64> {
if self.measurements.is_empty() {
return None;
}
#[allow(clippy::cast_precision_loss)]
{
let count = self.measurements.len() as f64;
Some(
self.measurements
.iter()
.map(|(_, p)| *p as f64)
.sum::<f64>()
/ count,
)
}
}
fn is_in_cooldown(&self) -> bool {
if let Some(last) = self.last_check {
last.elapsed() < Duration::from_secs(self.cooldown_secs)
} else {
false
}
}
fn mark_checked(&mut self) {
self.last_check = Some(Instant::now());
self.persist_last_check();
}
}
static RESOURCE_HISTORY: Mutex<Option<ResourceHistory>> = Mutex::new(None);
fn resource_history_path() -> Option<PathBuf> {
let base: Option<PathBuf> = std::env::var("RUNTIMO_STATE_DIR")
.ok()
.map(PathBuf::from)
.or_else(|| std::env::var("HOME").ok().map(PathBuf::from));
base.map(|b| b.join(".runtimo").join("resource_history.state"))
}
pub struct LlmoSafeGuard {
guard: ResourceGuard,
policy: EscalationPolicy,
}
fn apply_dal_to_decision(dal: DesignAssuranceLevel, decision: SafetyDecision) -> SafetyDecision {
match dal {
DesignAssuranceLevel::A => decision,
DesignAssuranceLevel::B => match decision {
SafetyDecision::Halt(_, cooldown_ms) => SafetyDecision::Escalate {
entropy: 0,
reason: EscalationReason::Custom("DAL B: Halt downgraded"),
cooldown_ms,
},
other => other,
},
DesignAssuranceLevel::C => match decision {
SafetyDecision::Halt(..) | SafetyDecision::Escalate { .. } => {
SafetyDecision::Warn("DAL C: Escalation downgraded")
}
other => other,
},
DesignAssuranceLevel::D => match decision {
SafetyDecision::Proceed | SafetyDecision::Warn(_) => decision,
SafetyDecision::Escalate { .. }
| SafetyDecision::Halt(..)
| SafetyDecision::Exit(_) => SafetyDecision::Warn("DAL D: Capped at Warn"),
},
DesignAssuranceLevel::E => SafetyDecision::Proceed,
}
}
impl LlmoSafeGuard {
#[must_use]
pub fn new() -> Self {
let guard = ResourceGuard::auto(0.8);
let dal = match std::env::var("RUNTIMO_DAL")
.map(|s| s.to_uppercase())
.as_deref()
{
Ok("B") => DesignAssuranceLevel::B,
Ok("C") => DesignAssuranceLevel::C,
Ok("D") => DesignAssuranceLevel::D,
Ok("E") => DesignAssuranceLevel::E,
_ => DesignAssuranceLevel::A,
};
Self {
guard,
policy: EscalationPolicy::default().with_dal(dal),
}
}
#[must_use]
pub fn with_memory_ceiling_bytes(memory_ceiling_bytes: usize) -> Self {
let dal = match std::env::var("RUNTIMO_DAL")
.map(|s| s.to_uppercase())
.as_deref()
{
Ok("B") => DesignAssuranceLevel::B,
Ok("C") => DesignAssuranceLevel::C,
Ok("D") => DesignAssuranceLevel::D,
Ok("E") => DesignAssuranceLevel::E,
_ => DesignAssuranceLevel::A,
};
Self {
guard: ResourceGuard::new(memory_ceiling_bytes),
policy: EscalationPolicy::default().with_dal(dal),
}
}
pub fn check(&self) -> Result<(), String> {
let mut history = RESOURCE_HISTORY.lock().unwrap_or_else(|e| e.into_inner());
if history.is_none() {
*history = Some(ResourceHistory::new(30, 1, resource_history_path()));
}
#[allow(clippy::expect_used)]
let hist = history
.as_mut()
.expect("history always Some after initialization above");
if hist.is_in_cooldown() {
if let Some(avg) = hist.rolling_average() {
if avg > 80.0 {
return Err(format!(
"Resource pressure averaging {:.1}% over last 30s (cooldown active)",
avg
));
}
}
return Ok(());
}
let pressure = self.guard.pressure();
let avg = hist.record(pressure);
hist.mark_checked();
if pressure > 80 {
return Err(format!("Resource pressure at {}% (ceiling: 80%)", pressure));
}
if avg > 80.0 {
return Err(format!(
"Rolling average resource pressure at {:.1}% (ceiling: 80%)",
avg
));
}
self.guard
.check()
.map(|_| ())
.map_err(|e| format!("Resource guard check failed: {}", e))
}
pub fn execute<F, T>(&self, f: F) -> Result<T, String>
where
F: FnOnce() -> Result<T, String>,
{
self.check()?;
f()
}
#[must_use]
pub fn current_rss_bytes(&self) -> usize {
ResourceGuard::current_rss_bytes()
}
#[must_use]
pub fn system_memory_bytes(&self) -> usize {
ResourceGuard::system_memory_bytes()
}
#[must_use]
pub fn system_cpu_load(&self) -> u8 {
ResourceGuard::system_cpu_load()
}
#[must_use]
pub fn raw_entropy(&self) -> u16 {
self.guard.raw_entropy()
}
#[must_use]
pub fn pressure(&self) -> u8 {
self.guard.pressure()
}
#[must_use]
pub fn safety_context(&self) -> SafetyContext {
SafetyContext::new(self.policy.clone())
}
#[must_use]
pub fn with_dal(mut self, dal: DesignAssuranceLevel) -> Self {
self.policy = self.policy.with_dal(dal);
self
}
#[must_use]
pub fn dal(&self) -> DesignAssuranceLevel {
self.policy.dal
}
pub fn check_cognitive_pipeline(
&self,
objective: &str,
observation: &str,
) -> Result<PipelineResult, String> {
let config = PipelineConfig {
policy: self.policy.clone(),
use_detection_gate: true,
..PipelineConfig::default()
};
let mut pipeline = CognitivePipeline::<64, 10>::with_config(objective, config)
.map_err(|e| format!("Failed to configure CognitivePipeline: {}", e))?;
let mut result = pipeline
.process_safe(observation, &self.guard)
.map_err(|e| format!("Cognitive safety pipeline execution failed: {:?}", e))?;
result.decision = apply_dal_to_decision(self.policy.dal, result.decision);
Ok(result)
}
#[must_use]
pub fn combined_risk_bits(&self, synapse: &Synapse) -> u16 {
synapse.combined_risk_bits()
}
#[must_use]
pub fn oov_ratio(&self, synapse: &Synapse) -> u8 {
synapse.oov_ratio()
}
#[must_use]
pub fn detection_flags(&self, synapse: &Synapse) -> u8 {
synapse.detection_flags()
}
#[must_use]
pub fn pipeline_memory_stats<const M: usize, const S: usize>(
&self,
pipeline: &CognitivePipeline<'_, M, S>,
) -> MemoryStats {
pipeline.memory_stats()
}
#[must_use]
pub fn pipeline_pid_state<'a, const M: usize, const S: usize>(
&self,
pipeline: &'a CognitivePipeline<'_, M, S>,
) -> &'a PidState {
pipeline.pid_state()
}
}
impl Default for LlmoSafeGuard {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn guard_reports_system_memory() {
let guard = LlmoSafeGuard::new();
let mem = guard.system_memory_bytes();
assert!(mem > 0, "System memory should be > 0");
}
#[test]
fn guard_reports_rss() {
let rss = LlmoSafeGuard::new().current_rss_bytes();
assert!(rss > 0, "RSS should be > 0 for running process");
}
#[test]
fn check_passes_under_normal_load() {
let guard = LlmoSafeGuard::new();
let result = guard.check();
if let Err(e) = result {
eprintln!("System under pressure: {}", e);
}
}
#[test]
fn execute_runs_closure_when_safe() {
let guard = LlmoSafeGuard::new();
let result = guard.execute(|| Ok("passed"));
if let Ok(val) = result {
assert_eq!(val, "passed");
}
}
#[test]
fn execution_fails_with_impossible_memory_ceiling() {
let guard = LlmoSafeGuard::with_memory_ceiling_bytes(1);
if guard.current_rss_bytes() > 0 {
let mut executed = false;
let result = guard.execute(|| {
executed = true;
Ok("should_not_run")
});
if guard.check().is_err() {
assert!(
result.is_err(),
"Execution must be rejected when pressure exceeds the 1 byte ceiling"
);
assert!(!executed, "Closure must not be executed on failure");
}
}
}
#[test]
fn with_memory_ceiling_bytes_constructs_successfully() {
let guard = LlmoSafeGuard::with_memory_ceiling_bytes(1024 * 1024);
let _ = guard.safety_context();
let p = guard.pressure();
assert!(p <= 100);
}
#[test]
fn pressure_is_bounded() {
let guard = LlmoSafeGuard::new();
let p = guard.pressure();
assert!(p <= 100, "Pressure should be 0-100, got {}", p);
}
#[test]
fn entropy_is_bounded() {
let guard = LlmoSafeGuard::new();
let e = guard.raw_entropy();
assert!(e <= 1000, "Entropy should be 0-1000, got {}", e);
}
#[test]
fn test_resource_history_rolling_average() {
let mut hist = ResourceHistory::new(30, 1, None);
hist.record(50);
hist.record(60);
hist.record(70);
let avg = hist.rolling_average().unwrap();
assert!(
(avg - 60.0).abs() < 0.1,
"Rolling avg should be ~60, got {}",
avg
);
}
#[test]
fn test_resource_history_cooldown() {
let mut hist = ResourceHistory::new(30, 1, None);
hist.record(90);
hist.mark_checked();
assert!(
hist.is_in_cooldown(),
"Should be in cooldown immediately after check"
);
}
#[test]
fn test_dal_config() {
let guard = LlmoSafeGuard::new().with_dal(DesignAssuranceLevel::C);
assert_eq!(guard.dal(), DesignAssuranceLevel::C);
}
#[test]
fn test_cognitive_pipeline_integration() {
let guard_strict = LlmoSafeGuard::new();
let res_strict = guard_strict.check_cognitive_pipeline("Hello world", "Hello world");
assert!(res_strict.is_ok());
let result_strict = res_strict.unwrap();
println!("DEBUG STRICT DECISION: {:?}", result_strict.decision);
assert!(matches!(
result_strict.decision,
SafetyDecision::Halt(..) | SafetyDecision::Escalate { .. }
));
assert!(!result_strict.is_safe());
let guard_permissive = LlmoSafeGuard::new().with_dal(DesignAssuranceLevel::E);
let res_permissive =
guard_permissive.check_cognitive_pipeline("Hello world", "Hello world");
assert!(res_permissive.is_ok());
let result_permissive = res_permissive.unwrap();
println!(
"DEBUG PERMISSIVE DECISION: {:?}",
result_permissive.decision
);
assert!(matches!(
result_permissive.decision,
SafetyDecision::Proceed
));
assert!(result_permissive.is_safe());
let mut synapse = result_permissive.synapse;
synapse.set_detection_flags(result_permissive.detection_flags);
let bits = guard_permissive.combined_risk_bits(&synapse);
assert_eq!(
guard_permissive.oov_ratio(&synapse),
result_permissive.oov_ratio
);
assert_eq!(
guard_permissive.detection_flags(&synapse),
result_permissive.detection_flags
);
assert_eq!(
bits,
((result_permissive.oov_ratio as u16) << 6)
| (result_permissive.detection_flags as u16)
);
}
}