#![allow(dead_code, unused_imports, unused_variables, unused_mut)]
use std::cmp::Reverse;
use std::collections::{BTreeMap, BinaryHeap, HashMap, VecDeque};
use std::hash::{Hash, Hasher};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrd};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot, Semaphore};
use tokio::task;
use tokio::time::sleep;
use serde::{Deserialize, Serialize};
use anyhow::{anyhow, Context, Result};
use thiserror::Error;
use log::{debug, error, info, warn};
use chrono::{DateTime, SecondsFormat, Utc};
use parking_lot::{Mutex, RwLock};
use rand::Rng;
use rusqlite::{params, Connection};
use clap::Parser;
use num_cpus;
pub type SensorId = String;
pub type Timestamp = DateTime<Utc>;
pub type DropletId = String;
const LARGE_WINDOW_THRESHOLD: usize = 65_536;
const MAD_TO_SIGMA: f64 = 1.4826;
const MODIFIED_Z_K: f64 = 0.6745;
const HUBER_EFFICIENCY: f64 = 1.345;
#[derive(Debug, Error)]
pub enum YanHunError {
#[error("泪落摄取失败: {0}")]
Ingest(String),
#[error("销魂分析失败: {0}")]
Analysis(String),
#[error("幽冥持久化失败: {0}")]
Persistence(String),
#[error("信道传输失败: {0}")]
Channel(String),
#[error("配置失当: {0}")]
Config(String),
#[error("CAS 竞争失败: {0}")]
CasConflict(String),
#[error("净化执行失败: {0}")]
Purification(String),
#[error("IO 错误: {0}")]
Io(#[from] std::io::Error),
}
impl From<rusqlite::Error> for YanHunError {
fn from(e: rusqlite::Error) -> Self { YanHunError::Persistence(e.to_string()) }
}
pub trait TsdbSink: Send + Sync + 'static {
fn write_raw(&self, reading: &LeiLuo);
fn write_corrected(&self, sensor_id: &str, ts: &Timestamp, value: f64, note: &str);
}
pub struct NoopTsdb;
impl TsdbSink for NoopTsdb {
fn write_raw(&self, _r: &LeiLuo) {}
fn write_corrected(&self, _s: &str, _t: &Timestamp, _v: f64, _n: &str) {}
}
pub trait MetricsSink: Send + Sync + 'static {
fn incr_counter(&self, name: &str, delta: u64);
fn set_gauge(&self, name: &str, value: f64);
fn record_ms(&self, name: &str, ms: u64);
}
pub struct NoopMetrics;
impl MetricsSink for NoopMetrics {
fn incr_counter(&self, _n: &str, _d: u64) {}
fn set_gauge(&self, _n: &str, _v: f64) {}
fn record_ms(&self, _n: &str, _m: u64) {}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum HuberStrategy {
Fixed(f64),
AutoMAD,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum PersistenceBackend { Sled, Sqlite }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct YanHunConfig {
pub window_size: usize,
pub quantisation_eps: f64,
pub threshold_light: f64,
pub threshold_medium: f64,
pub threshold_heavy: f64,
pub ingestion_capacity: usize,
pub per_sensor_queue: usize,
pub shard_count: usize,
pub batch_max_size: usize,
pub batch_max_wait_ms: u64,
pub qingmi_alpha: f64,
pub hunfei_concurrency: usize,
pub verification_enabled: bool,
pub blend_alpha: f64,
pub clip_k: f64,
pub huber_strategy: HuberStrategy,
pub kalman_q: f64,
pub kalman_r: f64,
pub max_rounds: u32,
pub promote_confidence: f64,
pub convergence_delta: f64,
pub shadow_verify_delay_ms: u64,
pub shadow_max_attempts: u32,
pub shadow_min_confidence: f64,
pub persistence_backend: PersistenceBackend,
pub db_path: String,
pub tree_name: String,
pub shadow_tree_name: String,
pub flush_interval_ms: u64,
pub dbscan_min_samples_ratio: f64,
pub multi_round_batch_size: usize,
pub multi_round_concurrency: usize,
}
impl Default for YanHunConfig {
fn default() -> Self {
Self {
window_size: 1_024,
quantisation_eps: 1e-4,
threshold_light: 3.0,
threshold_medium: 6.0,
threshold_heavy: 10.0,
ingestion_capacity: 16_384,
per_sensor_queue: 1_024,
shard_count: 8,
batch_max_size: 256,
batch_max_wait_ms: 10,
qingmi_alpha: 0.05,
hunfei_concurrency: 16,
verification_enabled: true,
blend_alpha: 0.6,
clip_k: 3.0,
huber_strategy: HuberStrategy::AutoMAD,
kalman_q: 1e-3,
kalman_r: 1e-2,
max_rounds: 6,
promote_confidence: 0.85,
convergence_delta: 1e-3,
shadow_verify_delay_ms: 500,
shadow_max_attempts: 3,
shadow_min_confidence: 0.9,
persistence_backend: PersistenceBackend::Sqlite,
db_path: "./yanhunsystem_db".to_string(),
tree_name: "leiluo".to_string(),
shadow_tree_name: "anying".to_string(),
flush_interval_ms: 500,
dbscan_min_samples_ratio: 0.02,
multi_round_batch_size: 1024,
multi_round_concurrency: num_cpus::get(),
}
}
}
impl YanHunConfig {
pub fn validate(&self) -> Result<()> {
if self.window_size == 0 { anyhow::bail!("window_size 不能为零"); }
if self.max_rounds == 0 { anyhow::bail!("max_rounds 不能为零"); }
if self.quantisation_eps <= 0.0 { anyhow::bail!("quantisation_eps 必须大于零"); }
if self.shard_count == 0 { anyhow::bail!("shard_count 不能为零"); }
if !(0.0..=1.0).contains(&self.promote_confidence) {
anyhow::bail!("promote_confidence 必须在 0.0–1.0 之间");
}
if self.convergence_delta <= 0.0 { anyhow::bail!("convergence_delta 必须大于零"); }
if self.flush_interval_ms == 0 { anyhow::bail!("flush_interval_ms 不能为零"); }
if !(0.0..=1.0).contains(&self.qingmi_alpha) {
anyhow::bail!("qingmi_alpha 必须在 0.0–1.0 之间");
}
if self.multi_round_batch_size == 0 { anyhow::bail!("multi_round_batch_size 不能为零"); }
Ok(())
}
pub fn load_from_file(path: &PathBuf) -> Result<Self> {
let f = std::fs::File::open(path).context("配置文件打开失败")?;
serde_json::from_reader(f).context("配置文件解析失败")
}
pub fn apply_env(mut self) -> Self {
macro_rules! env_usize {
($var:expr, $field:expr) => {
if let Ok(v) = std::env::var($var) {
if let Ok(n) = v.parse::<usize>() { $field = n; }
}
};
}
macro_rules! env_f64 {
($var:expr, $field:expr) => {
if let Ok(v) = std::env::var($var) {
if let Ok(f) = v.parse::<f64>() { $field = f; }
}
};
}
macro_rules! env_bool {
($var:expr, $field:expr) => {
if let Ok(v) = std::env::var($var) {
if let Ok(b) = v.parse::<bool>() { $field = b; }
}
};
}
env_usize!("YANHUN_WINDOW_SIZE", self.window_size);
env_f64!("YANHUN_THRESHOLD_LIGHT", self.threshold_light);
env_f64!("YANHUN_THRESHOLD_MEDIUM", self.threshold_medium);
env_f64!("YANHUN_THRESHOLD_HEAVY", self.threshold_heavy);
env_usize!("YANHUN_SHARDS", self.shard_count);
env_f64!("YANHUN_QINGMI_ALPHA", self.qingmi_alpha);
env_bool!("YANHUN_VERIFY", self.verification_enabled);
if let Ok(v) = std::env::var("YANHUN_MAX_ROUNDS") {
if let Ok(n) = v.parse::<u32>() {
self.max_rounds = n;
}
}
env_f64!("YANHUN_PROMOTE_CONF", self.promote_confidence);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct AppConfig {
pub window_size: Option<usize>,
pub quantisation_eps: Option<f64>,
pub threshold_light: Option<f64>,
pub threshold_medium: Option<f64>,
pub threshold_heavy: Option<f64>,
pub shard_count: Option<usize>,
pub batch_max_size: Option<usize>,
pub batch_max_wait_ms: Option<u64>,
pub qingmi_alpha: Option<f64>,
pub hunfei_concurrency: Option<usize>,
pub verification_enabled: Option<bool>,
pub max_rounds: Option<usize>,
pub promote_confidence: Option<f64>,
}
impl AppConfig {
pub fn load(path: &PathBuf) -> Result<Self> {
let f = std::fs::File::open(path).context("配置文件打开失败")?;
serde_json::from_reader(f).context("配置文件解析失败")
}
pub fn apply_env(mut self) -> Self {
macro_rules! env_opt_usize {
($field:ident, $key:expr) => {
if let Ok(v) = std::env::var($key) {
if let Ok(n) = v.parse() { self.$field = Some(n); }
}
};
}
macro_rules! env_opt_f64 {
($field:ident, $key:expr) => {
if let Ok(v) = std::env::var($key) {
if let Ok(f) = v.parse() { self.$field = Some(f); }
}
};
}
env_opt_usize!(window_size, "YANHUN_WINDOW_SIZE");
env_opt_usize!(shard_count, "YANHUN_SHARDS");
env_opt_usize!(hunfei_concurrency, "YANHUN_MAX_CONCURRENCY");
env_opt_f64!(threshold_light, "YANHUN_THRESHOLD_LIGHT");
env_opt_f64!(threshold_medium, "YANHUN_THRESHOLD_MEDIUM");
env_opt_f64!(threshold_heavy, "YANHUN_THRESHOLD_HEAVY");
env_opt_f64!(qingmi_alpha, "YANHUN_QINGMI_ALPHA");
self
}
pub fn into_inner_config(self, mut base: YanHunConfig) -> YanHunConfig {
macro_rules! apply {
($field:ident) => {
if let Some(v) = self.$field { base.$field = v; }
};
}
apply!(window_size);
apply!(quantisation_eps);
apply!(threshold_light);
apply!(threshold_medium);
apply!(threshold_heavy);
apply!(shard_count);
apply!(batch_max_size);
apply!(batch_max_wait_ms);
apply!(qingmi_alpha);
apply!(hunfei_concurrency);
apply!(verification_enabled);
if let Some(r) = self.max_rounds { base.max_rounds = r as u32; }
if let Some(c) = self.promote_confidence { base.promote_confidence = c; }
base
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeiLuo {
pub id: DropletId,
pub sensor_id: SensorId,
pub ts: Timestamp,
pub value: f64,
pub quality: Option<f64>,
pub metadata: Option<serde_json::Value>,
}
impl LeiLuo {
pub fn new(sensor_id: impl Into<String>, value: f64) -> Self {
let sid = sensor_id.into();
Self {
id: uuid::Uuid::new_v4().to_string(),
sensor_id: sid,
ts: Utc::now(),
value,
quality: None,
metadata: None,
}
}
pub fn with_quality(mut self, q: f64) -> Self {
self.quality = Some(q);
self
}
pub fn with_metadata(mut self, m: serde_json::Value) -> Self {
self.metadata = Some(m);
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum PoSan {
Untainted,
LightGrief,
DeepSorrow,
SoulWithered,
}
impl std::fmt::Display for PoSan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Untainted => write!(f, "无染"),
Self::LightGrief => write!(f, "轻愁"),
Self::DeepSorrow => write!(f, "深恸"),
Self::SoulWithered => write!(f, "销魂"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ChouChang {
Slumber,
Awake,
Forging(u32),
Luminous(u32),
Sealed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DuanChang {
pub level: PoSan,
pub score: f64,
pub reason: String,
pub ts: Timestamp,
pub round: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QiChuang {
pub sensor_id: SensorId,
pub droplet_id: DropletId,
pub order: DuanChang,
pub original_value: f64,
pub applied_value: Option<f64>,
pub note: Option<String>,
pub ts: Timestamp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct YiLuan {
pub sensor_id: SensorId,
pub ts: Timestamp,
pub corrected_value: f64,
pub confidence: f64,
pub method: String,
pub round: u32,
pub provenance: Vec<DropletId>,
pub metrics: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChouKu {
pub id: DropletId,
pub sensor_id: SensorId,
pub ts: Timestamp,
pub value: f64,
pub quality: Option<f64>,
pub score: f64,
pub poshan: String,
pub origin_median:Option<f64>,
pub origin_mad: Option<f64>,
pub state: ChouChang,
pub round: u32,
pub metadata: Option<serde_json::Value>,
}
pub fn dbscan_1d(values: &[f64], eps: f64, min_samples: usize) -> Vec<i32> {
let n = values.len();
if n == 0 { return vec![]; }
let mut labels = vec![-2i32; n];
let mut cluster_id = 0i32;
for i in 0..n {
if labels[i] != -2 { continue; }
let mut neighbors: Vec<usize> = (0..n)
.filter(|&j| (values[j] - values[i]).abs() <= eps)
.collect();
if neighbors.len() < min_samples { labels[i] = -1; continue; }
labels[i] = cluster_id;
let mut queue: VecDeque<usize> = VecDeque::from(neighbors);
while let Some(idx) = queue.pop_front() {
if labels[idx] == -1 { labels[idx] = cluster_id; }
if labels[idx] != -2 { continue; }
labels[idx] = cluster_id;
let nbrs: Vec<usize> = (0..n)
.filter(|&j| (values[j] - values[idx]).abs() <= eps)
.collect();
if nbrs.len() >= min_samples {
for &m in &nbrs {
if labels[m] == -2 { queue.push_back(m); }
}
}
}
cluster_id += 1;
}
for i in 0..n {
if labels[i] == -2 { labels[i] = -1; }
}
labels
}
pub fn estimate_dbscan_eps(values: &[f64]) -> f64 {
if values.len() < 2 { return 1e-6; }
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
let mut diffs: Vec<f64> = (1..sorted.len())
.map(|i| (sorted[i] - sorted[i-1]).abs())
.collect();
if diffs.is_empty() { return 1e-6; }
let mid = diffs.len() / 2;
diffs.select_nth_unstable_by(mid, f64::total_cmp);
diffs[mid].max(1e-6) * 3.0
}
pub fn huber_robust_location(values: &[f64], delta: f64, max_iter: usize) -> f64 {
if values.is_empty() { return 0.0; }
let mut v = {
let mut tmp = values.to_vec();
let mid = tmp.len() / 2;
tmp.select_nth_unstable_by(mid, f64::total_cmp);
tmp[mid]
};
for _ in 0..max_iter {
let (num, den) = values.iter().fold((0.0f64, 0.0f64), |(n, d), &x| {
let r = x - v;
let w = if r.abs() <= delta { 1.0 } else { delta / r.abs() };
(n + w * x, d + w)
});
if den == 0.0 { break; }
let v_new = num / den;
if (v_new - v).abs() < 1e-12 { v = v_new; break; }
v = v_new;
}
v
}
pub fn estimate_huber_delta(values: &[f64]) -> f64 {
if values.is_empty() { return 1.0; }
let mut tmp = values.to_vec();
let mid = tmp.len() / 2;
tmp.select_nth_unstable_by(mid, f64::total_cmp);
let median = tmp[mid];
let mut devs: Vec<f64> = values.iter().map(|v| (v - median).abs()).collect();
let mid = devs.len() / 2;
devs.select_nth_unstable_by(mid, f64::total_cmp);
let mad = devs[mid];
let sigma = if mad > 0.0 { mad * MAD_TO_SIGMA } else { 1e-6 };
HUBER_EFFICIENCY * sigma
}
pub struct Kalman1D {
pub x: f64, pub p: f64, pub q: f64, pub r: f64,
initialized: bool,
}
impl Kalman1D {
pub fn new(q: f64, r: f64) -> Self {
Self { x: 0.0, p: 1.0, q, r, initialized: false }
}
pub fn reset(&mut self) { self.initialized = false; self.x = 0.0; self.p = 1.0; }
pub fn update(&mut self, z: f64) -> f64 {
if !self.initialized { self.x = z; self.p = 1.0; self.initialized = true; return z; }
let p_pred = self.p + self.q;
let k = p_pred / (p_pred + self.r);
self.x = self.x + k * (z - self.x);
self.p = (1.0 - k) * p_pred;
self.x
}
pub fn smooth_sequence(&mut self, seq: &[f64]) -> Vec<f64> {
seq.iter().map(|&m| self.update(m)).collect()
}
}
pub fn compute_confidence(cluster_vals: &[f64], representative: f64) -> f64 {
if cluster_vals.is_empty() { return 0.0; }
let n = cluster_vals.len() as f64;
let var = cluster_vals.iter().map(|&v| { let d = v - representative; d * d }).sum::<f64>() / n;
let count_score = (n.ln() / (1.0 + n.ln())).min(1.0);
let var_score = 1.0 / (1.0 + var);
(0.6 * count_score + 0.4 * var_score).max(0.0).min(1.0)
}
pub trait XiaoHunTracker: Send + Sync {
fn insert_and_compute(&mut self, value: f64) -> (f64, f64);
fn window_len(&self) -> usize;
fn snapshot_stats(&self) -> (f64, f64);
fn is_empty(&self) -> bool { self.window_len() == 0 }
}
#[derive(Debug, Clone, Copy, PartialEq)]
struct OrdF64(f64);
impl Eq for OrdF64 {}
impl PartialOrd for OrdF64 {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) }
}
impl Ord for OrdF64 {
fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.0.total_cmp(&other.0) }
}
pub struct DualHeapTracker {
window_size: usize,
lower: BinaryHeap<OrdF64>,
upper: BinaryHeap<Reverse<OrdF64>>,
window: VecDeque<f64>,
pending_remove: VecDeque<f64>,
}
impl DualHeapTracker {
pub fn new(window_size: usize) -> Self {
Self {
window_size,
lower: BinaryHeap::new(),
upper: BinaryHeap::new(),
window: VecDeque::with_capacity(window_size + 16),
pending_remove: VecDeque::new(),
}
}
fn heap_insert(&mut self, v: f64) {
let ov = OrdF64(v);
if self.lower.peek().map_or(true, |&x| ov >= x) {
self.upper.push(Reverse(ov));
} else {
self.lower.push(ov);
}
self.rebalance();
}
fn rebalance(&mut self) {
while self.lower.len() > self.upper.len() + 1 {
if let Some(v) = self.lower.pop() { self.upper.push(Reverse(v)); }
}
while self.upper.len() > self.lower.len() + 1 {
if let Some(Reverse(v)) = self.upper.pop() { self.lower.push(v); }
}
}
fn current_median(&self) -> f64 {
match (self.lower.peek(), self.upper.peek()) {
(Some(&lo), Some(&Reverse(hi))) => {
if self.lower.len() == self.upper.len() { (lo.0 + hi.0) / 2.0 }
else if self.lower.len() > self.upper.len() { lo.0 }
else { hi.0 }
}
(Some(&lo), None) => lo.0,
(None, Some(&Reverse(hi))) => hi.0,
(None, None) => 0.0,
}
}
fn lazy_cleanup(&mut self) {
loop {
match self.pending_remove.front().copied() {
None => break,
Some(to_remove) => {
let or = OrdF64(to_remove);
if self.lower.peek() == Some(&or) {
self.lower.pop(); self.pending_remove.pop_front(); self.rebalance();
} else if self.upper.peek() == Some(&Reverse(or)) {
self.upper.pop(); self.pending_remove.pop_front(); self.rebalance();
} else { break; }
}
}
}
}
fn compute_mad(&self, median: f64) -> f64 {
if self.window.is_empty() { return 0.0; }
let mut devs: Vec<f64> = self.window.iter().map(|v| (v - median).abs()).collect();
let n = devs.len();
let mid = n / 2;
devs.select_nth_unstable_by(mid, f64::total_cmp);
if n % 2 == 1 { devs[mid] } else {
let a = devs[mid];
if mid == 0 { return a; }
devs.select_nth_unstable_by(mid - 1, f64::total_cmp);
(a + devs[mid - 1]) / 2.0
}
}
}
impl XiaoHunTracker for DualHeapTracker {
fn insert_and_compute(&mut self, value: f64) -> (f64, f64) {
if self.window.len() >= self.window_size {
if let Some(old) = self.window.pop_front() {
self.pending_remove.push_back(old);
self.lazy_cleanup();
}
}
self.window.push_back(value);
self.heap_insert(value);
let median = self.current_median();
(median, self.compute_mad(median))
}
fn window_len(&self) -> usize { self.window.len() }
fn snapshot_stats(&self) -> (f64, f64) {
let med = self.current_median();
(med, self.compute_mad(med))
}
}
pub struct OrderStatTracker {
capacity: usize,
eps: f64,
map: BTreeMap<i64, usize>,
window: Vec<(i64, f64)>,
head: usize,
len: usize,
}
impl OrderStatTracker {
pub fn new(capacity: usize, eps: f64) -> Self {
assert!(capacity > 0, "capacity must be > 0");
assert!(eps > 0.0, "eps must be > 0");
Self {
capacity, eps,
map: BTreeMap::new(),
window: vec![(0, 0.0); capacity],
head: 0,
len: 0,
}
}
#[inline] fn quantise(&self, v: f64) -> i64 { (v / self.eps).round() as i64 }
#[inline] fn dequantise(&self, b: i64) -> f64 { (b as f64) * self.eps }
fn insert_bucket(&mut self, b: i64) { *self.map.entry(b).or_insert(0) += 1; }
fn remove_bucket(&mut self, b: i64) {
match self.map.get_mut(&b) {
Some(cnt) if *cnt > 1 => { *cnt -= 1; }
Some(_) => { self.map.remove(&b); }
None => {}
}
}
fn select_kth(&self, k: usize) -> f64 {
let mut acc = 0usize;
for (&bucket, &count) in &self.map {
acc += count;
if acc > k { return self.dequantise(bucket); }
}
self.map.iter().next_back().map(|(&b, _)| self.dequantise(b)).unwrap_or(0.0)
}
fn median(&self) -> f64 {
if self.len == 0 { return 0.0; }
let n = self.len;
let mid = (n - 1) / 2;
if n % 2 == 1 { self.select_kth(mid) }
else { (self.select_kth(mid) + self.select_kth(mid + 1)) / 2.0 }
}
fn compute_mad(&self, median: f64) -> f64 {
if self.len == 0 { return 0.0; }
let mut devs: Vec<f64> = (0..self.len).map(|i| {
let idx = (self.head + i) % self.capacity;
(self.window[idx].1 - median).abs()
}).collect();
let n = devs.len();
let mid = n / 2;
devs.select_nth_unstable_by(mid, f64::total_cmp);
if n % 2 == 1 { devs[mid] } else {
let a = devs[mid];
if mid > 0 { devs.select_nth_unstable_by(mid - 1, f64::total_cmp); (a + devs[mid - 1]) / 2.0 } else { a }
}
}
}
impl XiaoHunTracker for OrderStatTracker {
fn insert_and_compute(&mut self, value: f64) -> (f64, f64) {
let new_bucket = self.quantise(value);
if self.len < self.capacity {
let idx = (self.head + self.len) % self.capacity;
self.window[idx] = (new_bucket, value);
self.len += 1;
self.insert_bucket(new_bucket);
} else {
let (old_bucket, _) = self.window[self.head];
self.remove_bucket(old_bucket);
self.window[self.head] = (new_bucket, value);
self.insert_bucket(new_bucket);
self.head = (self.head + 1) % self.capacity;
}
let med = self.median();
(med, self.compute_mad(med))
}
fn window_len(&self) -> usize { self.len }
fn snapshot_stats(&self) -> (f64, f64) {
let med = self.median();
(med, self.compute_mad(med))
}
}
pub struct AutoXiaoHun {
inner: Mutex<Box<dyn XiaoHunTracker>>,
}
impl AutoXiaoHun {
pub fn new(window_size: usize, eps: f64) -> Self {
let inner: Box<dyn XiaoHunTracker> = if window_size <= LARGE_WINDOW_THRESHOLD {
Box::new(DualHeapTracker::new(window_size))
} else {
Box::new(OrderStatTracker::new(window_size, eps))
};
Self { inner: Mutex::new(inner) }
}
pub fn insert_and_compute(&self, value: f64) -> (f64, f64) {
self.inner.lock().insert_and_compute(value)
}
pub fn window_len(&self) -> usize { self.inner.lock().window_len() }
pub fn snapshot_stats(&self) -> (f64, f64) { self.inner.lock().snapshot_stats() }
pub fn is_empty(&self) -> bool { self.inner.lock().is_empty() }
}
pub struct ShenShang {
config: YanHunConfig,
}
impl ShenShang {
pub fn new(config: YanHunConfig) -> Self { Self { config } }
pub fn score(&self, value: f64, median: f64, mad: f64) -> f64 {
if mad <= f64::EPSILON { (value - median).abs() }
else { MODIFIED_Z_K * (value - median).abs() / mad }
}
pub fn assess(&self, value: f64, median: f64, mad: f64, round: u32) -> DuanChang {
let score = self.score(value, median, mad);
let level = if score >= self.config.threshold_heavy { PoSan::SoulWithered }
else if score >= self.config.threshold_medium { PoSan::DeepSorrow }
else if score >= self.config.threshold_light { PoSan::LightGrief }
else { PoSan::Untainted };
let reason = match level {
PoSan::SoulWithered => format!("score {:.4} ≥ 销魂阈 {:.4}", score, self.config.threshold_heavy),
PoSan::DeepSorrow => format!("score {:.4} ≥ 深恸阈 {:.4}", score, self.config.threshold_medium),
PoSan::LightGrief => format!("score {:.4} ≥ 轻愁阈 {:.4}", score, self.config.threshold_light),
PoSan::Untainted => format!("score {:.4} < 轻愁阈 {:.4}", score, self.config.threshold_light),
};
DuanChang { level, score, reason, ts: Utc::now(), round }
}
pub fn mad_to_sigma(&self) -> f64 { MAD_TO_SIGMA }
}
pub struct QingMi {
alpha: f64,
values: RwLock<HashMap<SensorId, f64>>,
}
impl QingMi {
pub fn new(alpha: f64) -> Self {
assert!((0.0..=1.0).contains(&alpha), "alpha must be in (0, 1]");
Self { alpha, values: RwLock::new(HashMap::new()) }
}
pub fn update(&self, sensor_id: &SensorId, sample: f64) -> f64 {
let mut map = self.values.write();
let bg = map.entry(sensor_id.clone()).or_insert(sample);
*bg = self.alpha * sample + (1.0 - self.alpha) * *bg;
*bg
}
pub fn get(&self, sensor_id: &SensorId, fallback: f64) -> f64 {
self.values.read().get(sensor_id).copied().unwrap_or(fallback)
}
}
pub trait YouMingStore: Send + Sync + 'static {
fn append_sediment(&self, rec: &ChouKu) -> Result<()>;
fn batch_append_sediment(&self, recs: &[ChouKu]) -> Result<()>;
fn pull_candidates(&self, sensor_id: Option<&str>, limit: usize) -> Result<Vec<ChouKu>>;
fn get_by_id(&self, id: &DropletId) -> Result<Option<ChouKu>>;
fn batch_get_by_ids(&self, ids: &[DropletId]) -> Result<Vec<ChouKu>>;
fn mark_forging(&self, ids: &[DropletId], round: u32) -> Result<()>;
fn mark_luminous(&self, results: &[YiLuan]) -> Result<()>;
fn seal(&self, ids: &[DropletId]) -> Result<()>;
fn scan_prefix(&self, prefix: &str) -> Result<Vec<ChouKu>>;
fn flush(&self) -> Result<()>;
fn apply_shadow(&self, res: &YiLuan) -> Result<()>;
fn promote_shadow(&self, res: &YiLuan) -> Result<()>;
fn export_shadow(&self, path: &PathBuf) -> Result<()>;
fn audit(&self, rec: &QiChuang) -> Result<()>;
fn quarantine(&self, rec: &QiChuang) -> Result<()>;
fn scan_audit(&self) -> Result<Vec<QiChuang>>;
fn scan_quarantine(&self) -> Result<Vec<QiChuang>>;
fn export_audit_jsonl(&self, path: &PathBuf) -> Result<usize>;
fn compare_and_swap_state(
&self,
id: &DropletId,
old_state: ChouChang,
new_state: ChouChang,
new_round: u32,
) -> Result<bool>;
}
pub struct SqliteYouMing {
conn: Mutex<Connection>,
}
impl SqliteYouMing {
pub fn open(path: &str) -> Result<Self> {
let conn = Connection::open(path).context("打开 SQLite 数据库失败")?;
Self::init_schema(&conn)?;
Ok(Self { conn: Mutex::new(conn) })
}
pub fn open_in_memory() -> Result<Self> {
let conn = Connection::open_in_memory().context("打开内存数据库失败")?;
Self::init_schema(&conn)?;
Ok(Self { conn: Mutex::new(conn) })
}
fn init_schema(conn: &Connection) -> Result<()> {
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS sediment (id TEXT PRIMARY KEY, val TEXT NOT NULL);
CREATE TABLE IF NOT EXISTS shadow (key TEXT PRIMARY KEY, val TEXT NOT NULL);
CREATE INDEX IF NOT EXISTS idx_shadow_key ON shadow(key);"
).context("SQLite 建表失败")?;
Ok(())
}
fn make_key(ts: &Timestamp, id: &str) -> String {
format!("{}::{}", ts.to_rfc3339_opts(SecondsFormat::Nanos, true), id)
}
fn get_by_id_inner(&self, conn: &Connection, id: &str) -> Result<Option<ChouKu>> {
let mut stmt = conn.prepare_cached("SELECT val FROM sediment WHERE id = ?1")
.context("SQLite 准备语句失败")?;
let mut rows = stmt.query(params![id]).context("SQLite 查询失败")?;
match rows.next().context("SQLite 读取行失败")? {
Some(row) => {
let val: String = row.get(0).context("SQLite 读取列失败")?;
Ok(Some(serde_json::from_str(&val).context("反序列化愁苦记录失败")?))
}
None => Ok(None),
}
}
}
impl YouMingStore for SqliteYouMing {
fn append_sediment(&self, rec: &ChouKu) -> Result<()> {
let conn = self.conn.lock();
let val = serde_json::to_string(rec).context("序列化愁苦记录失败")?;
conn.execute("INSERT OR REPLACE INTO sediment (id, val) VALUES (?1, ?2)", params![rec.id, val])
.context("SQLite 插入失败")?;
Ok(())
}
fn batch_append_sediment(&self, recs: &[ChouKu]) -> Result<()> {
let conn = self.conn.lock();
let tx = conn.unchecked_transaction()?;
for rec in recs {
let val = serde_json::to_string(rec).context("序列化愁苦记录失败")?;
tx.execute("INSERT OR REPLACE INTO sediment (id, val) VALUES (?1, ?2)",
params![rec.id, val]).context("SQLite 批量插入失败")?;
}
tx.commit()?;
Ok(())
}
fn pull_candidates(&self, sensor_id: Option<&str>, limit: usize) -> Result<Vec<ChouKu>> {
let conn = self.conn.lock();
let mut stmt = conn.prepare("SELECT val FROM sediment LIMIT ?1").context("准备失败")?;
let rows = stmt.query_map(params![limit as i64], |r| r.get::<_, String>(0))?;
let mut out = Vec::new();
for row in rows {
let val: String = row?;
if let Ok(rec) = serde_json::from_str::<ChouKu>(&val) {
match rec.state {
ChouChang::Slumber | ChouChang::Awake | ChouChang::Forging(_) => {
if sensor_id.map_or(true, |s| rec.sensor_id == s) { out.push(rec); }
}
_ => {}
}
}
if out.len() >= limit { break; }
}
Ok(out)
}
fn get_by_id(&self, id: &DropletId) -> Result<Option<ChouKu>> {
let conn = self.conn.lock();
self.get_by_id_inner(&conn, id)
}
fn batch_get_by_ids(&self, ids: &[DropletId]) -> Result<Vec<ChouKu>> {
let mut out = Vec::with_capacity(ids.len());
for id in ids {
if let Some(rec) = self.get_by_id(id)? { out.push(rec); }
}
Ok(out)
}
fn mark_forging(&self, ids: &[DropletId], round: u32) -> Result<()> {
let conn = self.conn.lock();
let tx = conn.unchecked_transaction()?;
for id in ids {
if let Some(mut rec) = self.get_by_id_inner(&conn, id)? {
rec.state = ChouChang::Forging(round);
rec.round = round;
let val = serde_json::to_string(&rec).context("序列化失败")?;
tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
}
}
tx.commit()?;
Ok(())
}
fn mark_luminous(&self, results: &[YiLuan]) -> Result<()> {
let conn = self.conn.lock();
let tx = conn.unchecked_transaction()?;
for res in results {
for pid in &res.provenance {
if let Some(mut rec) = self.get_by_id_inner(&conn, pid)? {
rec.state = ChouChang::Luminous(res.round);
let mut meta = rec.metadata.clone().unwrap_or_else(|| serde_json::json!({}));
meta["净化方法"] = serde_json::json!(res.method);
meta["净化置信度"] = serde_json::json!(res.confidence);
meta["净化时间"] = serde_json::json!(res.ts.to_rfc3339());
meta["净化轮次"] = serde_json::json!(res.round);
rec.metadata = Some(meta);
let val = serde_json::to_string(&rec).context("序列化失败")?;
tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, pid])?;
}
}
}
tx.commit()?;
Ok(())
}
fn seal(&self, ids: &[DropletId]) -> Result<()> {
let conn = self.conn.lock();
let tx = conn.unchecked_transaction()?;
for id in ids {
if let Some(mut rec) = self.get_by_id_inner(&conn, id)? {
rec.state = ChouChang::Sealed;
let val = serde_json::to_string(&rec).context("序列化失败")?;
tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
}
}
tx.commit()?;
Ok(())
}
fn scan_prefix(&self, _prefix: &str) -> Result<Vec<ChouKu>> {
let conn = self.conn.lock();
let mut stmt = conn.prepare("SELECT val FROM sediment").context("准备失败")?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
let mut out = Vec::new();
for row in rows {
if let Ok(rec) = serde_json::from_str::<ChouKu>(&row?) { out.push(rec); }
}
Ok(out)
}
fn flush(&self) -> Result<()> { Ok(()) }
fn apply_shadow(&self, res: &YiLuan) -> Result<()> {
let conn = self.conn.lock();
let key = Self::make_key(&res.ts, &res.sensor_id);
let val = serde_json::to_string(res).context("序列化意乱失败")?;
conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
Ok(())
}
fn promote_shadow(&self, res: &YiLuan) -> Result<()> {
let conn = self.conn.lock();
let key = format!("promoted::{}", Self::make_key(&res.ts, &res.sensor_id));
let val = serde_json::to_string(res).context("序列化意乱失败")?;
conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
Ok(())
}
fn export_shadow(&self, path: &PathBuf) -> Result<()> {
let conn = self.conn.lock();
let mut stmt = conn.prepare("SELECT val FROM shadow").context("准备失败")?;
let mut file = std::fs::File::create(path).context("创建导出文件失败")?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
for row in rows {
let val: String = row?;
file.write_all(val.as_bytes())?;
file.write_all(b"\n")?;
}
Ok(())
}
fn audit(&self, rec: &QiChuang) -> Result<()> {
let conn = self.conn.lock();
let key = format!("audit::{}", Self::make_key(&rec.ts, &rec.droplet_id));
let val = serde_json::to_string(rec).context("序列化凄怆失败")?;
conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
Ok(())
}
fn quarantine(&self, rec: &QiChuang) -> Result<()> {
let conn = self.conn.lock();
let ts_key = Self::make_key(&rec.ts, &rec.droplet_id);
let qkey = format!("quarantine::{}", ts_key);
let val = serde_json::to_string(rec).context("序列化凄怆失败")?;
conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![qkey, val.clone()])?;
let akey = format!("audit::{}", ts_key);
conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![akey, val])?;
Ok(())
}
fn scan_audit(&self) -> Result<Vec<QiChuang>> {
let conn = self.conn.lock();
let mut stmt = conn.prepare("SELECT val FROM shadow WHERE key LIKE 'audit::%'")
.context("准备失败")?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
let mut out = Vec::new();
for row in rows {
if let Ok(rec) = serde_json::from_str::<QiChuang>(&row?) { out.push(rec); }
}
Ok(out)
}
fn scan_quarantine(&self) -> Result<Vec<QiChuang>> {
let conn = self.conn.lock();
let mut stmt = conn.prepare("SELECT val FROM shadow WHERE key LIKE 'quarantine::%'")
.context("准备失败")?;
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
let mut out = Vec::new();
for row in rows {
if let Ok(rec) = serde_json::from_str::<QiChuang>(&row?) { out.push(rec); }
}
Ok(out)
}
fn export_audit_jsonl(&self, path: &PathBuf) -> Result<usize> {
let records = self.scan_audit()?;
let mut f = BufWriter::new(std::fs::File::create(path).context("创建导出文件失败")?);
let n = records.len();
for rec in &records {
let line = serde_json::to_string(rec).context("序列化失败")?;
f.write_all(line.as_bytes())?;
f.write_all(b"\n")?;
}
f.flush()?;
Ok(n)
}
fn compare_and_swap_state(
&self, id: &DropletId, old_state: ChouChang, new_state: ChouChang, new_round: u32,
) -> Result<bool> {
let conn = self.conn.lock();
let tx = conn.unchecked_transaction()?;
match self.get_by_id_inner(&conn, id)? {
None => { tx.commit()?; return Ok(false); }
Some(mut rec) => {
if rec.state != old_state { tx.commit()?; return Ok(false); }
rec.state = new_state;
rec.round = new_round;
let val = serde_json::to_string(&rec)?;
tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
tx.commit()?;
Ok(true)
}
}
}
}
pub struct LeiLuoProcessor {
sensor_id: SensorId,
tracker: Arc<AutoXiaoHun>,
scorer: ShenShang,
hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
seq: AtomicU64,
}
impl LeiLuoProcessor {
pub fn new(
sensor_id: SensorId,
config: YanHunConfig,
hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
) -> Self {
let tracker = Arc::new(AutoXiaoHun::new(config.window_size, config.quantisation_eps));
let scorer = ShenShang::new(config);
Self { sensor_id, tracker, scorer, hunfei_tx, tsdb, metrics, seq: AtomicU64::new(0) }
}
pub async fn process(&self, reading: LeiLuo) -> Result<()> {
let seq = self.seq.fetch_add(1, AtomicOrd::Relaxed);
let (median, mad) = self.tracker.insert_and_compute(reading.value);
let order = self.scorer.assess(reading.value, median, mad, 1);
debug!(
"黯然销魂 sensor={} seq={} value={:.6} median={:.6} mad={:.6} score={:.4} level={}",
self.sensor_id, seq, reading.value, median, mad, order.score, order.level
);
self.metrics.set_gauge(&format!("yanhun.score.{}", self.sensor_id), order.score);
if order.level != PoSan::Untainted {
self.metrics.incr_counter("yanhun.remediation.emitted", 1);
let msg = (self.sensor_id.clone(), order, reading);
if self.hunfei_tx.try_send(msg.clone()).is_err() {
if let Err(e) = self.hunfei_tx.send(msg).await {
return Err(YanHunError::Channel(format!("hunfei send: {}", e)).into());
}
}
}
Ok(())
}
pub fn tracker(&self) -> &Arc<AutoXiaoHun> { &self.tracker }
pub fn snapshot_stats(&self) -> (f64, f64) { self.tracker.snapshot_stats() }
}
pub struct HunFei {
youming: Arc<dyn YouMingStore>,
qingmi: Arc<QingMi>,
processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
config: YanHunConfig,
hunfei_rx: RwLock<Option<mpsc::Receiver<(SensorId, DuanChang, LeiLuo)>>>,
concurrency:Arc<Semaphore>,
running: Arc<AtomicBool>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
}
impl HunFei {
pub fn new(
youming: Arc<dyn YouMingStore>,
qingmi: Arc<QingMi>,
processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
config: YanHunConfig,
hunfei_rx: mpsc::Receiver<(SensorId, DuanChang, LeiLuo)>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
) -> Self {
let concurrency = Arc::new(Semaphore::new(config.hunfei_concurrency.max(1)));
Self {
youming, qingmi, processors,
config: config.clone(),
hunfei_rx: RwLock::new(Some(hunfei_rx)),
concurrency, running: Arc::new(AtomicBool::new(false)),
tsdb, metrics,
}
}
pub fn start(self: Arc<Self>) {
if self.running.swap(true, AtomicOrd::SeqCst) { return; }
let this = self.clone();
tokio::spawn(async move {
let mut rx = match this.hunfei_rx.write().take() {
Some(r) => r,
None => { error!("魂飞: hunfei_rx 已被取走"); return; }
};
info!("魂飞: 净化执行器启动 (并发度={})", this.config.hunfei_concurrency);
while let Some((sid, order, reading)) = rx.recv().await {
let permit = match this.concurrency.clone().acquire_owned().await {
Ok(p) => p,
Err(_) => { warn!("魂飞: 信号量已关闭"); break; }
};
let t = this.clone();
tokio::spawn(async move {
let _p = permit;
if let Err(e) = t.execute(&sid, order, reading).await {
error!("魂飞: 执行错误: {:?}", e);
}
});
}
info!("魂飞: 净化执行器退出");
this.running.store(false, AtomicOrd::SeqCst);
});
}
async fn execute(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
match order.level {
PoSan::LightGrief => self.apply_light(sid, order, reading).await,
PoSan::DeepSorrow => self.apply_medium(sid, order, reading).await,
PoSan::SoulWithered => self.apply_heavy(sid, order, reading).await,
PoSan::Untainted => Ok(()),
}
}
async fn apply_light(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
let (median, mad) = self.snapshot_median_mad(sid, &reading);
let sigma = if mad > f64::EPSILON { mad * MAD_TO_SIGMA } else { 0.0 };
let k = self.config.clip_k;
let clipped = if sigma > f64::EPSILON {
reading.value.max(median - k * sigma).min(median + k * sigma)
} else { median };
let note = format!("裁剪至 [{:.6}, {:.6}] σ={:.6}", median - k*sigma, median + k*sigma, sigma);
self.tsdb.write_corrected(sid, &reading.ts, clipped, ¬e);
self.metrics.incr_counter("yanhun.light", 1);
let rec = QiChuang {
sensor_id: sid.clone(),
droplet_id: reading.id.clone(),
order: order.clone(),
original_value: reading.value,
applied_value: Some(clipped),
note: Some(note),
ts: Utc::now(),
};
self.youming.audit(&rec)?;
self.feed_back(sid, clipped, &reading).await;
if self.config.verification_enabled { self.spawn_verify(sid.clone(), clipped).await; }
Ok(())
}
async fn apply_medium(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
let bg = self.qingmi.get(sid, reading.value);
let alpha = self.config.blend_alpha;
let blended = alpha * bg + (1.0 - alpha) * reading.value;
self.qingmi.update(sid, blended);
let note = format!("背景混合 bg={:.6} α={:.3}", bg, alpha);
self.tsdb.write_corrected(sid, &reading.ts, blended, ¬e);
self.metrics.incr_counter("yanhun.medium", 1);
let rec = QiChuang {
sensor_id: sid.clone(),
droplet_id: reading.id.clone(),
order: order.clone(),
original_value: reading.value,
applied_value: Some(blended),
note: Some(note),
ts: Utc::now(),
};
self.youming.audit(&rec)?;
self.feed_back(sid, blended, &reading).await;
if self.config.verification_enabled { self.spawn_verify(sid.clone(), blended).await; }
Ok(())
}
async fn apply_heavy(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
let rec = QiChuang {
sensor_id: sid.clone(),
droplet_id: reading.id.clone(),
order: order.clone(),
original_value: reading.value,
applied_value: None,
note: Some("销魂隔离: 已阻断主链路".into()),
ts: Utc::now(),
};
self.youming.quarantine(&rec)?;
self.metrics.incr_counter("yanhun.heavy", 1);
warn!("销魂: sensor={} score={:.4} 已隔离入幽冥", sid, order.score);
Ok(())
}
fn snapshot_median_mad(&self, sid: &SensorId, reading: &LeiLuo) -> (f64, f64) {
let map = self.processors.read();
if let Some(proc) = map.get(sid) { proc.snapshot_stats() }
else { (reading.value, 0.0) }
}
async fn feed_back(&self, sid: &SensorId, value: f64, original: &LeiLuo) {
let proc = { self.processors.read().get(sid).cloned() };
if let Some(p) = proc {
let corrected = LeiLuo { value, ts: Utc::now(), ..original.clone() };
if let Err(e) = p.process(corrected).await {
warn!("魂飞: 反馈失败 {}: {:?}", sid, e);
}
}
}
async fn spawn_verify(&self, sid: SensorId, applied: f64) {
let qingmi = self.qingmi.clone();
let delay = self.config.shadow_verify_delay_ms;
tokio::spawn(async move {
sleep(Duration::from_millis(delay)).await;
let bg = qingmi.get(&sid, applied);
let delta = (applied - bg).abs();
if delta < 1e-3 {
debug!("验证: sensor={} applied≈bg → 有效", sid);
} else {
info!("验证: sensor={} Δ={:.6} applied={:.6} bg={:.6}", sid, delta, applied, bg);
}
});
}
}
struct ShardEntry {
tx: mpsc::Sender<LeiLuo>,
tracker: Arc<AutoXiaoHun>,
}
struct JingShenShard {
entries: RwLock<HashMap<SensorId, ShardEntry>>,
}
impl JingShenShard {
fn new() -> Self { Self { entries: RwLock::new(HashMap::new()) } }
fn get_or_create(
&self,
sid: &SensorId,
config: &YanHunConfig,
hunfei_tx: &mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
processors: &Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
trackers: &Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>>,
tsdb: &Arc<dyn TsdbSink>,
metrics: &Arc<dyn MetricsSink>,
) -> mpsc::Sender<LeiLuo> {
{
let map = self.entries.read();
if let Some(e) = map.get(sid) { return e.tx.clone(); }
}
let mut map = self.entries.write();
if let Some(e) = map.get(sid) { return e.tx.clone(); }
let (tx, rx) = mpsc::channel::<LeiLuo>(config.per_sensor_queue);
let proc = Arc::new(LeiLuoProcessor::new(
sid.clone(), config.clone(), hunfei_tx.clone(), tsdb.clone(), metrics.clone(),
));
let tracker_arc = proc.tracker().clone();
processors.write().insert(sid.clone(), proc.clone());
trackers.write().insert(sid.clone(), tracker_arc.clone());
let batch_max = config.batch_max_size;
let batch_wait = config.batch_max_wait_ms;
let tsdb_c = tsdb.clone();
let proc_c = proc.clone();
tokio::spawn(async move {
let mut batch: Vec<LeiLuo> = Vec::with_capacity(batch_max);
let mut last_flush = Instant::now();
let mut inbound = rx;
let wait = Duration::from_millis(batch_wait);
loop {
tokio::select! {
maybe = inbound.recv() => {
match maybe {
Some(r) => {
tsdb_c.write_raw(&r);
batch.push(r);
if batch.len() >= batch_max {
for item in batch.drain(..) {
if let Err(e) = proc_c.process(item).await {
error!("惊神: worker 处理失败: {:?}", e);
}
}
last_flush = Instant::now();
}
}
None => {
for item in batch.drain(..) {
let _ = proc_c.process(item).await;
}
break;
}
}
}
_ = sleep(wait) => {
if !batch.is_empty() && last_flush.elapsed() >= wait {
for item in batch.drain(..) {
if let Err(e) = proc_c.process(item).await {
error!("惊神: worker 批次处理失败: {:?}", e);
}
}
last_flush = Instant::now();
}
}
}
}
debug!("惊神: sensor worker {} 退出", proc_c.sensor_id);
});
map.insert(sid.clone(), ShardEntry { tx: tx.clone(), tracker: tracker_arc });
tx
}
}
pub struct JingShen {
shards: Vec<Arc<JingShenShard>>,
shard_count: usize,
config: YanHunConfig,
hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
trackers: Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
}
impl JingShen {
pub fn new(
config: YanHunConfig,
hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
) -> Self {
let shard_count = config.shard_count.max(1);
let shards = (0..shard_count).map(|_| Arc::new(JingShenShard::new())).collect();
Self {
shards, shard_count, config, hunfei_tx, processors,
trackers: Arc::new(RwLock::new(HashMap::new())),
tsdb, metrics,
}
}
fn shard_index(&self, sensor_id: &str) -> usize {
let mut h: u64 = 0xcbf29ce484222325;
for b in sensor_id.bytes() { h ^= b as u64; h = h.wrapping_mul(0x100000001b3); }
(h as usize) % self.shard_count
}
pub async fn submit(&self, reading: LeiLuo) -> Result<()> {
let idx = self.shard_index(&reading.sensor_id);
let sender = self.shards[idx].get_or_create(
&reading.sensor_id, &self.config, &self.hunfei_tx,
&self.processors, &self.trackers, &self.tsdb, &self.metrics,
);
sender.send(reading).await
.map_err(|e| YanHunError::Channel(format!("惊神 submit: {}", e)).into())
}
pub fn trackers(&self) -> Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>> {
self.trackers.clone()
}
}
pub struct BeiQie {
config: YanHunConfig,
metrics: Arc<dyn MetricsSink>,
}
impl BeiQie {
pub fn new(config: YanHunConfig, metrics: Arc<dyn MetricsSink>) -> Self {
Self { config, metrics }
}
pub async fn verify_and_promote(&self, res: YiLuan, youming: Arc<dyn YouMingStore>) -> Result<()> {
if res.confidence < self.config.shadow_min_confidence {
self.metrics.incr_counter("beiqie.skipped_low_conf", 1);
return Ok(());
}
youming.apply_shadow(&res).context("施加暗影失败")?;
self.metrics.incr_counter("beiqie.applied", 1);
sleep(Duration::from_millis(self.config.shadow_verify_delay_ms)).await;
let roll: f64 = rand::thread_rng().gen();
if roll <= res.confidence {
youming.promote_shadow(&res).context("晋升暗影失败")?;
youming.seal(&res.provenance)?;
self.metrics.incr_counter("beiqie.promoted", 1);
} else {
self.metrics.incr_counter("beiqie.rejected", 1);
}
Ok(())
}
}
pub struct AiHui {
youming: Arc<dyn YouMingStore>,
config: YanHunConfig,
}
impl AiHui {
pub fn new(youming: Arc<dyn YouMingStore>, config: YanHunConfig) -> Self {
Self { youming, config }
}
pub fn process_batch(&self, batch: Vec<ChouKu>) -> Result<Vec<YiLuan>> {
if batch.is_empty() { return Ok(vec![]); }
let mut by_sensor: HashMap<SensorId, Vec<ChouKu>> = HashMap::new();
for rec in batch { by_sensor.entry(rec.sensor_id.clone()).or_default().push(rec); }
let mut out = Vec::new();
for (sid, mut recs) in by_sensor {
recs.sort_by_key(|r| r.ts);
let values: Vec<f64> = recs.iter().map(|r| r.value).collect();
let eps = estimate_dbscan_eps(&values);
let min_samples = ((values.len() as f64) * self.config.dbscan_min_samples_ratio)
.ceil() as usize;
let min_samples = min_samples.max(1);
let labels = dbscan_1d(&values, eps, min_samples);
let mut clusters: HashMap<i32, Vec<(usize, f64)>> = HashMap::new();
for (i, &lab) in labels.iter().enumerate() {
clusters.entry(lab).or_default().push((i, values[i]));
}
for (lab, members) in clusters {
let vals: Vec<f64> = members.iter().map(|(_, v)| *v).collect();
if lab == -1 {
if vals.len() >= 3 {
let mut tmp = vals.clone();
let mid = tmp.len() / 2;
tmp.select_nth_unstable_by(mid, f64::total_cmp);
let rep = tmp[mid];
let conf = compute_confidence(&vals, rep);
if conf >= self.config.promote_confidence {
let prov: Vec<DropletId> =
members.iter().map(|(idx, _)| recs[*idx].id.clone()).collect();
out.push(YiLuan {
sensor_id: sid.clone(),
ts: Utc::now(),
corrected_value: rep,
confidence: conf,
method: "DBSCAN_Noise_Median".into(),
round: 1,
provenance: prov,
metrics: serde_json::json!({"eps": eps, "cluster": lab, "size": vals.len()}),
});
}
}
continue;
}
let delta = match self.config.huber_strategy {
HuberStrategy::Fixed(d) => d,
HuberStrategy::AutoMAD => estimate_huber_delta(&vals),
};
let huber_est = huber_robust_location(&vals, delta, 50);
let mut kf = Kalman1D::new(self.config.kalman_q, self.config.kalman_r);
let smoothed = kf.smooth_sequence(&vals);
let mut tmp = smoothed;
let mid = tmp.len() / 2;
tmp.select_nth_unstable_by(mid, f64::total_cmp);
let kalman_est = tmp[mid];
let combined = 0.5 * huber_est + 0.5 * kalman_est;
let conf = compute_confidence(&vals, combined);
let prov: Vec<DropletId> =
members.iter().map(|(idx, _)| recs[*idx].id.clone()).collect();
out.push(YiLuan {
sensor_id: sid.clone(),
ts: Utc::now(),
corrected_value: combined,
confidence: conf,
method: "DBSCAN_Huber_Kalman_Fusion".into(),
round: 1,
provenance: prov,
metrics: serde_json::json!({
"eps": eps, "min_samples": min_samples,
"cluster_size": vals.len(),
"huber_delta": delta,
"huber_est": huber_est,
"kalman_est": kalman_est,
"combined": combined,
}),
});
}
}
Ok(out)
}
}
pub struct XinLuan {
youming: Arc<dyn YouMingStore>,
config: YanHunConfig,
worker_tx: mpsc::Sender<Vec<ChouKu>>,
metrics: Arc<dyn MetricsSink>,
beiqie: Arc<BeiQie>,
}
impl XinLuan {
pub fn new(
youming: Arc<dyn YouMingStore>,
config: YanHunConfig,
worker_tx: mpsc::Sender<Vec<ChouKu>>,
metrics: Arc<dyn MetricsSink>,
beiqie: Arc<BeiQie>,
) -> Self {
Self { youming, config, worker_tx, metrics, beiqie }
}
pub async fn run(&self) -> Result<()> {
info!("心乱: 多轮净化调度器启动");
loop {
let batch = self.youming.pull_candidates(None, self.config.multi_round_batch_size)?;
if batch.is_empty() { sleep(Duration::from_millis(200)).await; continue; }
let shards = self.shard_batch(batch);
for shard_batch in shards {
let mut to_dispatch = Vec::new();
for rec in shard_batch {
if self.youming.compare_and_swap_state(&rec.id, ChouChang::Slumber, ChouChang::Forging(1), 1)? {
if let Some(latest) = self.youming.get_by_id(&rec.id)? {
to_dispatch.push(latest);
}
} else {
self.metrics.incr_counter("xinluan.cas_conflict", 1);
}
}
if !to_dispatch.is_empty() {
if let Err(e) = self.worker_tx.send(to_dispatch).await {
error!("心乱: 分发至肠断池失败: {:?}", e);
} else {
self.metrics.incr_counter("xinluan.dispatched", 1);
}
}
}
sleep(Duration::from_millis(10)).await;
}
}
fn shard_batch(&self, batch: Vec<ChouKu>) -> Vec<Vec<ChouKu>> {
let n = self.config.multi_round_concurrency.max(1);
let mut shards: Vec<Vec<ChouKu>> = vec![Vec::new(); n];
for rec in batch {
let mut h: u64 = 0xcbf29ce484222325;
for b in rec.sensor_id.bytes() { h ^= b as u64; h = h.wrapping_mul(0x100000001b3); }
shards[(h as usize) % n].push(rec);
}
shards.into_iter().filter(|v| !v.is_empty()).collect()
}
}
pub struct ChangDuan {
worker_rx: mpsc::Receiver<Vec<ChouKu>>,
youming: Arc<dyn YouMingStore>,
config: YanHunConfig,
metrics: Arc<dyn MetricsSink>,
beiqie: Arc<BeiQie>,
sem: Arc<Semaphore>,
}
impl ChangDuan {
pub fn new(
worker_rx: mpsc::Receiver<Vec<ChouKu>>,
youming: Arc<dyn YouMingStore>,
config: YanHunConfig,
metrics: Arc<dyn MetricsSink>,
beiqie: Arc<BeiQie>,
) -> Self {
let concurrency = config.multi_round_concurrency.max(1);
Self { worker_rx, youming, config, metrics, beiqie, sem: Arc::new(Semaphore::new(concurrency)) }
}
pub async fn run(mut self) {
info!("肠断: 并行 Worker 池启动,并发度={}", self.config.multi_round_concurrency);
while let Some(batch) = self.worker_rx.recv().await {
let permit = match self.sem.clone().acquire_owned().await {
Ok(p) => p,
Err(_) => { warn!("肠断: 信号量已关闭"); break; }
};
let youming = self.youming.clone();
let cfg = self.config.clone();
let metrics = self.metrics.clone();
let beiqie = self.beiqie.clone();
task::spawn(async move {
let _p = permit;
if let Err(e) = Self::multi_round_purify(youming, cfg, batch, metrics, beiqie).await {
error!("肠断: 批次净化失败: {:?}", e);
}
});
}
info!("肠断: Worker 池退出");
}
async fn multi_round_purify(
youming: Arc<dyn YouMingStore>,
cfg: YanHunConfig,
initial_batch: Vec<ChouKu>,
metrics: Arc<dyn MetricsSink>,
beiqie: Arc<BeiQie>,
) -> Result<()> {
let mut current = initial_batch;
let mut round = 1u32;
let mut last_conf = 0.0f64;
loop {
let start = Instant::now();
let purifier = AiHui::new(youming.clone(), cfg.clone());
let results = purifier.process_batch(current.clone())?;
metrics.incr_counter("yanhun.round_executed", 1);
let avg_conf = if results.is_empty() { 0.0 }
else { results.iter().map(|r| r.confidence).sum::<f64>() / results.len() as f64 };
metrics.set_gauge("yanhun.avg_confidence", avg_conf);
if !results.is_empty() {
youming.mark_luminous(&results)?;
for res in results {
let beiqie_c = beiqie.clone();
let ym_c = youming.clone();
task::spawn(async move {
if let Err(e) = beiqie_c.verify_and_promote(res, ym_c).await {
warn!("悲切验证/晋升失败: {:?}", e);
}
});
}
}
if (avg_conf - last_conf).abs() < cfg.convergence_delta {
metrics.incr_counter("yanhun.converged", 1);
break;
}
last_conf = avg_conf;
round += 1;
if round > cfg.max_rounds { metrics.incr_counter("yanhun.max_rounds_reached", 1); break; }
let ids: Vec<DropletId> = current.iter().map(|r| r.id.clone()).collect();
let next = youming.batch_get_by_ids(&ids)?;
if next.is_empty() { break; }
current = next;
let elapsed = start.elapsed();
if elapsed < Duration::from_millis(50) { sleep(Duration::from_millis(10)).await; }
}
Ok(())
}
}
pub struct YanHun {
jingshen: Arc<JingShen>,
hunfei: Arc<HunFei>,
youming: Arc<dyn YouMingStore>,
qingmi: Arc<QingMi>,
processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
ingestion_tx:mpsc::Sender<LeiLuo>,
config: YanHunConfig,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
beiqie: Arc<BeiQie>,
xinluan: Option<Arc<XinLuan>>,
multi_tx: Option<mpsc::Sender<Vec<ChouKu>>>,
}
impl YanHun {
pub fn initialize(config: YanHunConfig, youming: Arc<dyn YouMingStore>) -> Self {
let processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>> =
Arc::new(RwLock::new(HashMap::new()));
let tsdb: Arc<dyn TsdbSink> = Arc::new(NoopTsdb);
let metrics: Arc<dyn MetricsSink> = Arc::new(NoopMetrics);
let (hunfei_tx, hunfei_rx) = mpsc::channel::<(SensorId, DuanChang, LeiLuo)>(config.ingestion_capacity);
let (ingest_tx, mut ingest_rx) = mpsc::channel::<LeiLuo>(config.ingestion_capacity);
let qingmi = Arc::new(QingMi::new(config.qingmi_alpha));
let beiqie = Arc::new(BeiQie::new(config.clone(), metrics.clone()));
let jingshen = Arc::new(JingShen::new(
config.clone(), hunfei_tx.clone(), processors.clone(), tsdb.clone(), metrics.clone(),
));
let hunfei = Arc::new(HunFei::new(
youming.clone(), qingmi.clone(), processors.clone(), config.clone(),
hunfei_rx, tsdb.clone(), metrics.clone(),
));
{
let js = jingshen.clone();
tokio::spawn(async move {
while let Some(r) = ingest_rx.recv().await {
if let Err(e) = js.submit(r).await {
error!("黯然: 摄入路由错误: {:?}", e);
}
}
info!("黯然: 摄入调度器退出");
});
}
Self {
jingshen, hunfei, youming, qingmi, processors,
ingestion_tx: ingest_tx, config, tsdb, metrics, beiqie,
xinluan: None, multi_tx: None,
}
}
pub fn initialize_with_sinks(
config: YanHunConfig,
youming: Arc<dyn YouMingStore>,
tsdb: Arc<dyn TsdbSink>,
metrics: Arc<dyn MetricsSink>,
) -> Self {
let processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>> =
Arc::new(RwLock::new(HashMap::new()));
let (hunfei_tx, hunfei_rx) = mpsc::channel::<(SensorId, DuanChang, LeiLuo)>(config.ingestion_capacity);
let (ingest_tx, mut ingest_rx) = mpsc::channel::<LeiLuo>(config.ingestion_capacity);
let qingmi = Arc::new(QingMi::new(config.qingmi_alpha));
let beiqie = Arc::new(BeiQie::new(config.clone(), metrics.clone()));
let jingshen = Arc::new(JingShen::new(
config.clone(), hunfei_tx.clone(), processors.clone(), tsdb.clone(), metrics.clone(),
));
let hunfei = Arc::new(HunFei::new(
youming.clone(), qingmi.clone(), processors.clone(), config.clone(),
hunfei_rx, tsdb.clone(), metrics.clone(),
));
{
let js = jingshen.clone();
tokio::spawn(async move {
while let Some(r) = ingest_rx.recv().await {
if let Err(e) = js.submit(r).await {
error!("黯然: 摄入路由错误: {:?}", e);
}
}
});
}
Self {
jingshen, hunfei, youming, qingmi, processors,
ingestion_tx: ingest_tx, config, tsdb, metrics, beiqie,
xinluan: None, multi_tx: None,
}
}
pub fn start(&self) {
self.hunfei.clone().start();
info!("黯然销魂系统: 启动完成");
}
pub fn start_multi_round(&mut self) {
let (tx, rx) = mpsc::channel::<Vec<ChouKu>>(self.config.shard_count * 4);
let xinluan = Arc::new(XinLuan::new(
self.youming.clone(), self.config.clone(), tx.clone(),
self.metrics.clone(), self.beiqie.clone(),
));
let changduan = ChangDuan::new(
rx, self.youming.clone(), self.config.clone(),
self.metrics.clone(), self.beiqie.clone(),
);
let xl = xinluan.clone();
tokio::spawn(async move { if let Err(e) = xl.run().await { error!("心乱运行错误: {:?}", e); } });
tokio::spawn(async move { changduan.run().await; });
self.xinluan = Some(xinluan);
self.multi_tx = Some(tx);
info!("黯然销魂系统: 多轮迭代净化已启动");
}
pub async fn ingest(&self, reading: LeiLuo) -> Result<()> {
self.ingestion_tx.send(reading).await
.map_err(|e| anyhow!("黯然: ingest 失败: {}", e))
}
pub async fn ingest_batch(&self, readings: Vec<LeiLuo>) -> Result<()> {
for r in readings { self.ingest(r).await?; }
Ok(())
}
pub async fn run_daemon(self) -> Result<()> {
info!("黯然销魂系统: 守护模式运行中 — 按 Ctrl+C 停止");
tokio::signal::ctrl_c().await.context("ctrl-c 监听失败")?;
info!("黯然销魂系统: 收到关闭信号,正在排空管道…");
sleep(Duration::from_millis(800)).await;
self.youming.flush()?;
info!("黯然销魂系统: 关闭完成 — 黯然销魂,万念俱灰");
Ok(())
}
pub fn export_audit(&self, path: &PathBuf) -> Result<usize> {
self.youming.export_audit_jsonl(path)
}
pub async fn replay_quarantine(&self) -> Result<usize> {
let records = self.youming.scan_quarantine()?;
let count = records.len();
for rec in records {
let reading = LeiLuo {
id: rec.droplet_id,
sensor_id: rec.sensor_id,
ts: rec.ts,
value: rec.original_value,
quality: None,
metadata: None,
};
self.ingest(reading).await?;
sleep(Duration::from_millis(5)).await;
}
info!("回放: 重新注入 {} 条隔离记录", count);
Ok(count)
}
pub fn youming(&self) -> &Arc<dyn YouMingStore> { &self.youming }
pub fn config(&self) -> &YanHunConfig { &self.config }
}
#[derive(Parser, Debug)]
#[command(
name = "yanhun",
about = "黯然销魂系统 v1.0 — 工业级具身智能传感器数据流净化引擎",
version = "1.0.0"
)]
pub struct Cli {
#[arg(short, long)]
pub config: Option<PathBuf>,
#[arg(short = 'd', long, default_value = "./yanhunsystem_db")]
pub db_path: String,
#[arg(long, default_value = "sled")]
pub backend: String,
#[arg(short = 't', long, default_value = "leiluo")]
pub tree_name: String,
#[arg(long, default_value = "anying")]
pub shadow_tree_name: String,
#[arg(short = 's', long)]
pub shards: Option<usize>,
#[arg(short = 'w', long)]
pub window_size: Option<usize>,
#[arg(long)]
pub threshold_light: Option<f64>,
#[arg(long)]
pub threshold_heavy: Option<f64>,
#[arg(long, default_value_t = true)]
pub enable_multi_round: bool,
#[arg(long, default_value_t = false)]
pub simulate: bool,
#[arg(long, default_value_t = 50)]
pub sim_interval_ms: u64,
#[arg(long)]
pub export: Option<PathBuf>,
#[arg(long, default_value_t = false)]
pub replay: bool,
#[arg(long)]
pub export_shadow: Option<PathBuf>,
}
pub fn start_simulation(system: Arc<YanHun>, interval_ms: u64) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let sensors = ["kinoko-1", "suimei-imu", "byakuya-prox", "yoru-wheel", "tsukikage-gps"];
let mut id_ctr = AtomicU64::new(0);
loop {
for &sid in &sensors {
let v = if rand::thread_rng().gen::<f64>() < 0.995 {
10.0 + (rand::thread_rng().gen::<f64>() - 0.5) * 0.5
} else {
200.0 + rand::thread_rng().gen::<f64>() * 50.0
};
let seq = id_ctr.fetch_add(1, AtomicOrd::Relaxed);
let r = LeiLuo {
id: format!("sim-{}-{}", sid, seq),
sensor_id: sid.to_string(),
ts: Utc::now(),
value: v,
quality: Some(1.0),
metadata: None,
};
if let Err(e) = system.ingest(r).await {
error!("模拟器: 提交错误: {:?}", e);
}
}
sleep(Duration::from_millis(interval_ms)).await;
}
})
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("info")
).format_timestamp_millis().init();
let cli = Cli::parse();
let mut app_cfg = if let Some(ref path) = cli.config {
AppConfig::load(path).unwrap_or_else(|e| {
warn!("配置文件加载失败 ({:?}),使用默认配置", e);
AppConfig::default()
})
} else {
AppConfig::default()
};
app_cfg = app_cfg.apply_env();
if let Some(v) = cli.shards { app_cfg.shard_count = Some(v); }
if let Some(v) = cli.window_size { app_cfg.window_size = Some(v); }
if let Some(v) = cli.threshold_light { app_cfg.threshold_light = Some(v); }
if let Some(v) = cli.threshold_heavy { app_cfg.threshold_heavy = Some(v); }
let base_cfg = YanHunConfig {
db_path: cli.db_path.clone(),
tree_name: cli.tree_name.clone(),
shadow_tree_name: cli.shadow_tree_name.clone(),
persistence_backend: PersistenceBackend::Sqlite,
..YanHunConfig::default()
};
let final_cfg = app_cfg.into_inner_config(base_cfg);
final_cfg.validate().context("配置校验失败")?;
let youming: Arc<dyn YouMingStore> = if final_cfg.db_path == ":memory:" {
Arc::new(SqliteYouMing::open_in_memory()?)
} else {
Arc::new(SqliteYouMing::open(&final_cfg.db_path)?)
};
if let Some(ref out) = cli.export {
info!("导出: 审计记录 → {:?}", out);
let n = youming.export_audit_jsonl(out)?;
info!("导出完成: {} 条记录", n);
return Ok(());
}
if let Some(ref out) = cli.export_shadow {
info!("导出: 暗影记录 → {:?}", out);
youming.export_shadow(out)?;
info!("暗影导出完成");
return Ok(());
}
let mut system = YanHun::initialize(final_cfg.clone(), youming.clone());
system.start();
if cli.enable_multi_round { system.start_multi_round(); }
info!(
"黯然销魂系统上线 | shards={} window={} eps={} backend={:?} \
thresholds=[{:.1}/{:.1}/{:.1}] multi_round={} verify={}",
final_cfg.shard_count,
final_cfg.window_size,
final_cfg.quantisation_eps,
final_cfg.persistence_backend,
final_cfg.threshold_light,
final_cfg.threshold_medium,
final_cfg.threshold_heavy,
cli.enable_multi_round,
final_cfg.verification_enabled,
);
if cli.replay {
info!("回放: 重新注入隔离记录");
let n = system.replay_quarantine().await?;
info!("回放完成: {} 条", n);
return Ok(());
}
let system = Arc::new(system);
let sim = if cli.simulate {
info!("模拟器: 启动合成数据流 ({}ms 间隔)", cli.sim_interval_ms);
Some(start_simulation(system.clone(), cli.sim_interval_ms))
} else {
None
};
tokio::signal::ctrl_c().await?;
info!("黯然销魂系统: 收到关闭信号");
if let Some(h) = sim { h.abort(); }
info!("黯然销魂系统: 正在排空管道…");
sleep(Duration::from_millis(800)).await;
youming.flush()?;
info!("黯然销魂系统: 关闭完成 — 此情可待成追忆,只是当时已惘然");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn test_config() -> YanHunConfig {
YanHunConfig {
window_size: 100,
quantisation_eps: 1e-3,
threshold_light: 2.5,
threshold_medium: 4.5,
threshold_heavy: 8.0,
ingestion_capacity: 512,
per_sensor_queue: 64,
shard_count: 2,
batch_max_size: 16,
batch_max_wait_ms: 5,
max_rounds: 3,
verification_enabled: false,
multi_round_concurrency: 2,
..YanHunConfig::default()
}
}
#[test]
fn test_quantise_roundtrip() {
let eps = 1e-3;
let v = 12.345678_f64;
let mut t = OrderStatTracker::new(50, eps);
let b = (v / eps).round() as i64;
let v2 = (b as f64) * eps;
assert!((v - v2).abs() <= eps, "量化往返误差过大: {} vs {}", v, v2);
}
#[test]
fn test_dual_heap_median() {
let mut t = DualHeapTracker::new(7);
for v in 1..=7 { let _ = t.insert_and_compute(v as f64); }
assert!((t.current_median() - 4.0).abs() < 1e-9);
}
#[test]
fn test_dual_heap_mad_constant_sequence() {
let mut t = DualHeapTracker::new(11);
for _ in 0..11 { let _ = t.insert_and_compute(10.0); }
let (_, mad) = t.snapshot_stats();
assert!(mad < 1e-9, "常数序列 MAD 应为 0,实际={}", mad);
}
#[test]
fn test_order_stat_median_robust() {
let mut t = OrderStatTracker::new(50, 1e-3);
for v in (0..20).map(|i| 10.0 + (i % 3) as f64 * 0.01) {
t.insert_and_compute(v);
}
let (median, mad) = t.insert_and_compute(10_000.0);
assert!((median - 10.0).abs() < 2.0, "中位数应对异常值不敏感,median={}", median);
assert!(mad >= 0.0);
}
#[test]
fn test_order_stat_sliding_window() {
let cap = 10;
let mut t = OrderStatTracker::new(cap, 1e-3);
for v in 0..cap { t.insert_and_compute(v as f64); }
assert_eq!(t.window_len(), cap);
t.insert_and_compute(100.0);
assert_eq!(t.window_len(), cap, "窗口应维持固定大小");
}
#[test]
fn test_auto_xiaohuun_small_uses_dual_heap() {
let t = AutoXiaoHun::new(512, 1e-4);
let (med, _) = t.insert_and_compute(42.0);
assert!((med - 42.0).abs() < 1e-6);
}
#[test]
fn test_auto_xiaohuun_large_uses_order_stat() {
let t = AutoXiaoHun::new(LARGE_WINDOW_THRESHOLD + 1, 1e-4);
let (med, _) = t.insert_and_compute(7.0);
assert!((med - 7.0).abs() < 1e-3);
}
#[test]
fn test_shenshang_levels() {
let scorer = ShenShang::new(test_config());
assert_eq!(scorer.assess(10.0, 10.0, 1.0, 1).level, PoSan::Untainted, "正常值应为无染");
assert_eq!(scorer.assess(14.5, 10.0, 1.0, 1).level, PoSan::LightGrief, "轻度异常应为轻愁");
assert_eq!(scorer.assess(25.0, 10.0, 1.0, 1).level, PoSan::SoulWithered,"重度异常应为销魂");
}
#[test]
fn test_shenshang_zero_mad_fallback() {
let scorer = ShenShang::new(test_config());
let order = scorer.assess(50.0, 10.0, 0.0, 1);
assert!(order.score > 5.0, "MAD=0 应退化为绝对差值: score={}", order.score);
assert_ne!(order.level, PoSan::Untainted);
}
#[test]
fn test_poshan_display() {
assert_eq!(format!("{}", PoSan::Untainted), "无染");
assert_eq!(format!("{}", PoSan::LightGrief), "轻愁");
assert_eq!(format!("{}", PoSan::DeepSorrow), "深恸");
assert_eq!(format!("{}", PoSan::SoulWithered), "销魂");
}
#[test]
fn test_qingmi_converges() {
let qm = QingMi::new(0.1);
let sid = "sensor-test".to_string();
for _ in 0..200 { qm.update(&sid, 10.0); }
let v = qm.get(&sid, 0.0);
assert!((v - 10.0).abs() < 0.01, "EMA 应收敛至 10,实际={}", v);
}
#[test]
fn test_config_validation_pass() {
assert!(YanHunConfig::default().validate().is_ok());
}
#[test]
fn test_config_validation_zero_window() {
let bad = YanHunConfig { window_size: 0, ..YanHunConfig::default() };
assert!(bad.validate().is_err());
}
#[test]
fn test_config_validation_negative_eps() {
let bad = YanHunConfig { quantisation_eps: -1.0, ..YanHunConfig::default() };
assert!(bad.validate().is_err());
}
#[test]
fn test_config_validation_bad_confidence() {
let bad = YanHunConfig { promote_confidence: 1.5, ..YanHunConfig::default() };
assert!(bad.validate().is_err());
}
#[test]
fn test_leiluo_new() {
let r = LeiLuo::new("sensor-a", 42.0);
assert_eq!(r.sensor_id, "sensor-a");
assert!((r.value - 42.0).abs() < 1e-9);
assert!(r.quality.is_none());
assert!(r.metadata.is_none());
}
#[test]
fn test_leiluo_with_quality_and_metadata() {
let r = LeiLuo::new("s1", 1.0)
.with_quality(0.95)
.with_metadata(serde_json::json!({"source": "test"}));
assert!((r.quality.unwrap() - 0.95).abs() < 1e-9);
assert!(r.metadata.is_some());
}
#[test]
fn test_sqlite_youming_audit_roundtrip() {
let ym = SqliteYouMing::open_in_memory().unwrap();
let rec = QiChuang {
sensor_id: "s1".into(),
droplet_id: "d1".into(),
order: DuanChang {
level: PoSan::LightGrief,
score: 3.5,
reason: "test".into(),
ts: Utc::now(),
round: 1,
},
original_value: 15.0,
applied_value: Some(12.0),
note: Some("clip".into()),
ts: Utc::now(),
};
ym.audit(&rec).unwrap();
let audits = ym.scan_audit().unwrap();
assert_eq!(audits.len(), 1);
assert!((audits[0].applied_value.unwrap() - 12.0).abs() < 1e-9);
}
#[test]
fn test_sqlite_youming_quarantine_dual_write() {
let ym = SqliteYouMing::open_in_memory().unwrap();
let rec = QiChuang {
sensor_id: "sq".into(),
droplet_id: "dq".into(),
order: DuanChang {
level: PoSan::SoulWithered,
score: 15.0,
reason: "spike".into(),
ts: Utc::now(),
round: 1,
},
original_value: 500.0,
applied_value: None,
note: Some("quarantined".into()),
ts: Utc::now(),
};
ym.quarantine(&rec).unwrap();
let q = ym.scan_quarantine().unwrap();
assert_eq!(q.len(), 1, "隔离树应有 1 条记录");
let a = ym.scan_audit().unwrap();
assert_eq!(a.len(), 1, "审计树应也有 1 条记录(双写)");
}
#[test]
fn test_sqlite_youming_cas() {
let ym = SqliteYouMing::open_in_memory().unwrap();
let rec = ChouKu {
id: "cas-1".into(),
sensor_id: "s1".into(),
ts: Utc::now(),
value: 10.0,
quality: None,
score: 0.5,
poshan: "无染".into(),
origin_median:None,
origin_mad: None,
state: ChouChang::Slumber,
round: 0,
metadata: None,
};
ym.append_sediment(&rec).unwrap();
let ok = ym.compare_and_swap_state(&"cas-1".into(), ChouChang::Slumber, ChouChang::Forging(1), 1).unwrap();
assert!(ok, "CAS 应成功");
let ok2 = ym.compare_and_swap_state(&"cas-1".into(), ChouChang::Slumber, ChouChang::Forging(2), 2).unwrap();
assert!(!ok2, "CAS 应失败(状态已变)");
}
#[test]
fn test_dbscan_1d_basic() {
let values = vec![1.0, 1.1, 1.2, 50.0, 1.15, 1.05];
let labels = dbscan_1d(&values, 0.5, 2);
let noise = labels.iter().filter(|&&l| l == -1).count();
let in_cluster = labels.iter().filter(|&&l| l >= 0).count();
assert!(in_cluster >= 4, "多数点应聚为簇,噪声={} 簇内={}", noise, in_cluster);
}
#[test]
fn test_huber_robust_location() {
let mut vals: Vec<f64> = (0..15).map(|i| 10.0 + i as f64 * 0.01).collect();
vals.push(10_000.0);
let est = huber_robust_location(&vals, 1.0, 50);
assert!((est - 10.07).abs() < 0.5, "Huber 估计应接近真实中心,est={}", est);
}
#[test]
fn test_kalman_convergence() {
let mut kf = Kalman1D::new(1e-3, 1e-2);
let signal = vec![10.0; 50];
let out = kf.smooth_sequence(&signal);
let last = *out.last().unwrap();
assert!((last - 10.0).abs() < 0.01, "卡尔曼平滑应收敛至真值,last={}", last);
}
#[test]
fn test_confidence_computation() {
let tight = vec![10.0, 10.01, 9.99, 10.02, 9.98];
let spread = vec![1.0, 5.0, 10.0, 15.0, 20.0];
let c_tight = compute_confidence(&tight, 10.0);
let c_spread = compute_confidence(&spread, 10.2);
assert!(c_tight > c_spread, "紧凑簇置信度应高于分散簇");
assert!(c_tight <= 1.0);
assert!(c_spread >= 0.0);
}
}