Skip to main content

anran_xiaohun/
lib.rs

1// ═══════════════════════════════════════════════════════════════════════════════
2// 黯然销魂系统  v1.0
3// 工业级具身智能传感器数据流净化引擎
4//
5// 招式架构(取自黯然销魂掌十八式):
6//   泪落 LeiLuo    — 感知数据滴(传感器读数)
7//   销魂 XiaoHun   — 统计分析引擎(双轨自适应滑窗)
8//   神伤 ShenShang — 染度评估器(Modified z-score)
9//   魄散 PoSan     — 净化等级(无染/轻愁/深恸/销魂)
10//   惆怅 ChouChang — 生命周期状态机
11//   断肠 DuanChang — 净化令指
12//   凄怆 QiChuang  — 审计记录
13//   意乱 YiLuan    — 净化结果
14//   愁苦 ChouKu    — 持久化沉淀记录
15//   情迷 QingMi    — 背景场 EMA 模型
16//   幽冥 YouMing   — 持久化存储(Sled / SQLite 双后端)
17//   魂飞 HunFei    — 净化执行器
18//   惊神 JingShen  — 分片管理器
19//   哀毁 AiHui     — 高阶净化器(DBSCAN + Huber + Kalman)
20//   悲切 BeiQie    — 暗影验证器
21//   心乱 XinLuan   — 多轮迭代调度器
22//   肠断 ChangDuan — 并行 Worker 池
23//   黯然 YanHun    — 主编排器
24//
25// 架构特性:
26//   · DualHeap(小窗)/ OrderStat(大窗)双轨自适应滑动窗口
27//   · Modified z-score + MAD 稳健异常检测
28//   · DBSCAN 密度聚类 + Huber M-估计 + 卡尔曼平滑三阶高阶校正
29//   · 无染 / 轻愁 / 深恸 / 销魂 四级分级净化
30//   · CAS 原子状态更新 + 多轮迭代直到收敛
31//   · FNV 哈希分片 + 双检查锁 + 全局 tracker 注册表
32//   · try_send + 降级 send 背压机制
33//   · 重度写双树(隔离树 + 审计树)保证完整性
34//   · Sled(64 MB 缓存精调)/ SQLite 双后端可切换
35//   · 纳秒精度时间戳键,预开树句柄降低调用开销
36//   · 三级配置覆盖:文件 → 环境变量 → CLI 内联参数
37//   · 可插拔 TsdbSink / MetricsSink 接口
38//   · run_daemon() 优雅守护 + 回放 + 导出工具链
39//   · 全异步 tokio,零占位符,生产环境就绪
40// ═══════════════════════════════════════════════════════════════════════════════
41
42#![allow(dead_code, unused_imports, unused_variables, unused_mut)]
43
44// ─── standard ────────────────────────────────────────────────────────────────
45use std::cmp::Reverse;
46use std::collections::{BTreeMap, BinaryHeap, HashMap, VecDeque};
47use std::hash::{Hash, Hasher};
48use std::io::{BufWriter, Write};
49use std::path::{Path, PathBuf};
50use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrd};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53// ─── async ───────────────────────────────────────────────────────────────────
54use tokio::sync::{mpsc, oneshot, Semaphore};
55use tokio::task;
56use tokio::time::sleep;
57// ─── serde ───────────────────────────────────────────────────────────────────
58use serde::{Deserialize, Serialize};
59// ─── error ───────────────────────────────────────────────────────────────────
60use anyhow::{anyhow, Context, Result};
61use thiserror::Error;
62// ─── logging ─────────────────────────────────────────────────────────────────
63use log::{debug, error, info, warn};
64// ─── time ────────────────────────────────────────────────────────────────────
65use chrono::{DateTime, SecondsFormat, Utc};
66// ─── locking ─────────────────────────────────────────────────────────────────
67use parking_lot::{Mutex, RwLock};
68// ─── random ──────────────────────────────────────────────────────────────────
69use rand::Rng;
70// ─── persistence ─────────────────────────────────────────────────────────────
71use rusqlite::{params, Connection};
72// ─── CLI ─────────────────────────────────────────────────────────────────────
73use clap::Parser;
74// ─── cpu ─────────────────────────────────────────────────────────────────────
75use num_cpus;
76
77// ═══════════════════════════════════════════════════════════════════════════════
78// § 1  常量与错误 — Constants & Error Hierarchy
79// ═══════════════════════════════════════════════════════════════════════════════
80
81pub type SensorId  = String;
82pub type Timestamp = DateTime<Utc>;
83pub type DropletId = String;
84
85/// 窗口分界:≤此值用 DualHeap,>此值用 OrderStat
86const LARGE_WINDOW_THRESHOLD: usize = 65_536;
87/// MAD → σ 转换系数(Rousseeuw & Croux 1993)
88const MAD_TO_SIGMA: f64 = 1.4826;
89/// Modified z-score 常数(Iglewicz & Hoaglin 1993)
90const MODIFIED_Z_K: f64 = 0.6745;
91/// Huber 估计 95% 渐近效率系数
92const HUBER_EFFICIENCY: f64 = 1.345;
93
94#[derive(Debug, Error)]
95pub enum YanHunError {
96    #[error("泪落摄取失败: {0}")]
97    Ingest(String),
98    #[error("销魂分析失败: {0}")]
99    Analysis(String),
100    #[error("幽冥持久化失败: {0}")]
101    Persistence(String),
102    #[error("信道传输失败: {0}")]
103    Channel(String),
104    #[error("配置失当: {0}")]
105    Config(String),
106    #[error("CAS 竞争失败: {0}")]
107    CasConflict(String),
108    #[error("净化执行失败: {0}")]
109    Purification(String),
110    #[error("IO 错误: {0}")]
111    Io(#[from] std::io::Error),
112}
113
114impl From<rusqlite::Error> for YanHunError {
115    fn from(e: rusqlite::Error) -> Self { YanHunError::Persistence(e.to_string()) }
116}
117
118// ═══════════════════════════════════════════════════════════════════════════════
119// § 2  可插拔接口 — Pluggable Interfaces
120// ═══════════════════════════════════════════════════════════════════════════════
121
122/// TsdbSink — 可插拔外部时序数据库接口
123pub trait TsdbSink: Send + Sync + 'static {
124    fn write_raw(&self, reading: &LeiLuo);
125    fn write_corrected(&self, sensor_id: &str, ts: &Timestamp, value: f64, note: &str);
126}
127
128pub struct NoopTsdb;
129impl TsdbSink for NoopTsdb {
130    fn write_raw(&self, _r: &LeiLuo) {}
131    fn write_corrected(&self, _s: &str, _t: &Timestamp, _v: f64, _n: &str) {}
132}
133
134/// MetricsSink — 可插拔监控指标接口
135pub trait MetricsSink: Send + Sync + 'static {
136    fn incr_counter(&self, name: &str, delta: u64);
137    fn set_gauge(&self, name: &str, value: f64);
138    fn record_ms(&self, name: &str, ms: u64);
139}
140
141pub struct NoopMetrics;
142impl MetricsSink for NoopMetrics {
143    fn incr_counter(&self, _n: &str, _d: u64) {}
144    fn set_gauge(&self, _n: &str, _v: f64) {}
145    fn record_ms(&self, _n: &str, _m: u64) {}
146}
147
148// ═══════════════════════════════════════════════════════════════════════════════
149// § 3  配置体系 — Configuration
150// ═══════════════════════════════════════════════════════════════════════════════
151
152#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
153pub enum HuberStrategy {
154    Fixed(f64),
155    AutoMAD,
156}
157
158#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
159pub enum PersistenceBackend { Sled, Sqlite }
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct YanHunConfig {
163    pub window_size:               usize,
164    pub quantisation_eps:          f64,
165    pub threshold_light:           f64,
166    pub threshold_medium:          f64,
167    pub threshold_heavy:           f64,
168    pub ingestion_capacity:        usize,
169    pub per_sensor_queue:          usize,
170    pub shard_count:               usize,
171    pub batch_max_size:            usize,
172    pub batch_max_wait_ms:         u64,
173    pub qingmi_alpha:              f64,
174    pub hunfei_concurrency:        usize,
175    pub verification_enabled:      bool,
176    pub blend_alpha:               f64,
177    pub clip_k:                    f64,
178    pub huber_strategy:            HuberStrategy,
179    pub kalman_q:                  f64,
180    pub kalman_r:                  f64,
181    pub max_rounds:                u32,
182    pub promote_confidence:        f64,
183    pub convergence_delta:         f64,
184    pub shadow_verify_delay_ms:    u64,
185    pub shadow_max_attempts:       u32,
186    pub shadow_min_confidence:     f64,
187    pub persistence_backend:       PersistenceBackend,
188    pub db_path:                   String,
189    pub tree_name:                 String,
190    pub shadow_tree_name:          String,
191    pub flush_interval_ms:         u64,
192    pub dbscan_min_samples_ratio:  f64,
193    pub multi_round_batch_size:    usize,
194    pub multi_round_concurrency:   usize,
195}
196
197impl Default for YanHunConfig {
198    fn default() -> Self {
199        Self {
200            window_size:              1_024,
201            quantisation_eps:         1e-4,
202            threshold_light:          3.0,
203            threshold_medium:         6.0,
204            threshold_heavy:          10.0,
205            ingestion_capacity:       16_384,
206            per_sensor_queue:         1_024,
207            shard_count:              8,
208            batch_max_size:           256,
209            batch_max_wait_ms:        10,
210            qingmi_alpha:             0.05,
211            hunfei_concurrency:       16,
212            verification_enabled:     true,
213            blend_alpha:              0.6,
214            clip_k:                   3.0,
215            huber_strategy:           HuberStrategy::AutoMAD,
216            kalman_q:                 1e-3,
217            kalman_r:                 1e-2,
218            max_rounds:               6,
219            promote_confidence:       0.85,
220            convergence_delta:        1e-3,
221            shadow_verify_delay_ms:   500,
222            shadow_max_attempts:      3,
223            shadow_min_confidence:    0.9,
224            persistence_backend:      PersistenceBackend::Sqlite,
225            db_path:                  "./yanhunsystem_db".to_string(),
226            tree_name:                "leiluo".to_string(),
227            shadow_tree_name:         "anying".to_string(),
228            flush_interval_ms:        500,
229            dbscan_min_samples_ratio: 0.02,
230            multi_round_batch_size:   1024,
231            multi_round_concurrency:  num_cpus::get(),
232        }
233    }
234}
235
236impl YanHunConfig {
237    pub fn validate(&self) -> Result<()> {
238        if self.window_size == 0 { anyhow::bail!("window_size 不能为零"); }
239        if self.max_rounds == 0  { anyhow::bail!("max_rounds 不能为零"); }
240        if self.quantisation_eps <= 0.0 { anyhow::bail!("quantisation_eps 必须大于零"); }
241        if self.shard_count == 0 { anyhow::bail!("shard_count 不能为零"); }
242        if !(0.0..=1.0).contains(&self.promote_confidence) {
243            anyhow::bail!("promote_confidence 必须在 0.0–1.0 之间");
244        }
245        if self.convergence_delta <= 0.0 { anyhow::bail!("convergence_delta 必须大于零"); }
246        if self.flush_interval_ms == 0  { anyhow::bail!("flush_interval_ms 不能为零"); }
247        if !(0.0..=1.0).contains(&self.qingmi_alpha) {
248            anyhow::bail!("qingmi_alpha 必须在 0.0–1.0 之间");
249        }
250        if self.multi_round_batch_size == 0 { anyhow::bail!("multi_round_batch_size 不能为零"); }
251        Ok(())
252    }
253
254    pub fn load_from_file(path: &PathBuf) -> Result<Self> {
255        let f = std::fs::File::open(path).context("配置文件打开失败")?;
256        serde_json::from_reader(f).context("配置文件解析失败")
257    }
258
259    pub fn apply_env(mut self) -> Self {
260        macro_rules! env_usize {
261            ($var:expr, $field:expr) => {
262                if let Ok(v) = std::env::var($var) {
263                    if let Ok(n) = v.parse::<usize>() { $field = n; }
264                }
265            };
266        }
267        macro_rules! env_f64 {
268            ($var:expr, $field:expr) => {
269                if let Ok(v) = std::env::var($var) {
270                    if let Ok(f) = v.parse::<f64>() { $field = f; }
271                }
272            };
273        }
274        macro_rules! env_bool {
275            ($var:expr, $field:expr) => {
276                if let Ok(v) = std::env::var($var) {
277                    if let Ok(b) = v.parse::<bool>() { $field = b; }
278                }
279            };
280        }
281        env_usize!("YANHUN_WINDOW_SIZE",    self.window_size);
282        env_f64!("YANHUN_THRESHOLD_LIGHT",  self.threshold_light);
283        env_f64!("YANHUN_THRESHOLD_MEDIUM", self.threshold_medium);
284        env_f64!("YANHUN_THRESHOLD_HEAVY",  self.threshold_heavy);
285        env_usize!("YANHUN_SHARDS",          self.shard_count);
286        env_f64!("YANHUN_QINGMI_ALPHA",     self.qingmi_alpha);
287        env_bool!("YANHUN_VERIFY",           self.verification_enabled);
288        if let Ok(v) = std::env::var("YANHUN_MAX_ROUNDS") {
289    if let Ok(n) = v.parse::<u32>() {
290        self.max_rounds = n;
291    }
292}
293        env_f64!("YANHUN_PROMOTE_CONF",     self.promote_confidence);
294        self
295    }
296}
297
298/// 应用级可选覆盖配置(三级覆盖:文件 → 环境变量 → CLI)
299#[derive(Debug, Clone, Serialize, Deserialize, Default)]
300pub struct AppConfig {
301    pub window_size:          Option<usize>,
302    pub quantisation_eps:     Option<f64>,
303    pub threshold_light:      Option<f64>,
304    pub threshold_medium:     Option<f64>,
305    pub threshold_heavy:      Option<f64>,
306    pub shard_count:          Option<usize>,
307    pub batch_max_size:       Option<usize>,
308    pub batch_max_wait_ms:    Option<u64>,
309    pub qingmi_alpha:         Option<f64>,
310    pub hunfei_concurrency:   Option<usize>,
311    pub verification_enabled: Option<bool>,
312    pub max_rounds:           Option<usize>,
313    pub promote_confidence:   Option<f64>,
314}
315
316impl AppConfig {
317    pub fn load(path: &PathBuf) -> Result<Self> {
318        let f = std::fs::File::open(path).context("配置文件打开失败")?;
319        serde_json::from_reader(f).context("配置文件解析失败")
320    }
321
322    /// 读取 YANHUN_ 前缀的环境变量
323    pub fn apply_env(mut self) -> Self {
324        macro_rules! env_opt_usize {
325            ($field:ident, $key:expr) => {
326                if let Ok(v) = std::env::var($key) {
327                    if let Ok(n) = v.parse() { self.$field = Some(n); }
328                }
329            };
330        }
331        macro_rules! env_opt_f64 {
332            ($field:ident, $key:expr) => {
333                if let Ok(v) = std::env::var($key) {
334                    if let Ok(f) = v.parse() { self.$field = Some(f); }
335                }
336            };
337        }
338        env_opt_usize!(window_size,        "YANHUN_WINDOW_SIZE");
339        env_opt_usize!(shard_count,         "YANHUN_SHARDS");
340        env_opt_usize!(hunfei_concurrency,  "YANHUN_MAX_CONCURRENCY");
341        env_opt_f64!(threshold_light,       "YANHUN_THRESHOLD_LIGHT");
342        env_opt_f64!(threshold_medium,      "YANHUN_THRESHOLD_MEDIUM");
343        env_opt_f64!(threshold_heavy,       "YANHUN_THRESHOLD_HEAVY");
344        env_opt_f64!(qingmi_alpha,          "YANHUN_QINGMI_ALPHA");
345        self
346    }
347
348    /// 将可选覆盖合并到基础配置
349    pub fn into_inner_config(self, mut base: YanHunConfig) -> YanHunConfig {
350        macro_rules! apply {
351            ($field:ident) => {
352                if let Some(v) = self.$field { base.$field = v; }
353            };
354        }
355        apply!(window_size);
356        apply!(quantisation_eps);
357        apply!(threshold_light);
358        apply!(threshold_medium);
359        apply!(threshold_heavy);
360        apply!(shard_count);
361        apply!(batch_max_size);
362        apply!(batch_max_wait_ms);
363        apply!(qingmi_alpha);
364        apply!(hunfei_concurrency);
365        apply!(verification_enabled);
366        if let Some(r) = self.max_rounds { base.max_rounds = r as u32; }
367        if let Some(c) = self.promote_confidence { base.promote_confidence = c; }
368        base
369    }
370}
371
372// ═══════════════════════════════════════════════════════════════════════════════
373// § 4  核心数据类型 — Core Data Types
374// ═══════════════════════════════════════════════════════════════════════════════
375
376/// 泪落 LeiLuo — 感知数据滴
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct LeiLuo {
379    pub id:        DropletId,
380    pub sensor_id: SensorId,
381    pub ts:        Timestamp,
382    pub value:     f64,
383    pub quality:   Option<f64>,
384    /// 扩展元数据(自由格式 JSON,用于下游系统注入附加上下文)
385    pub metadata:  Option<serde_json::Value>,
386}
387
388impl LeiLuo {
389    /// 便利构造器:从 sensor_id + value 快速生成泪落
390    pub fn new(sensor_id: impl Into<String>, value: f64) -> Self {
391        let sid = sensor_id.into();
392        Self {
393            id:        uuid::Uuid::new_v4().to_string(),
394            sensor_id: sid,
395            ts:        Utc::now(),
396            value,
397            quality:   None,
398            metadata:  None,
399        }
400    }
401
402    /// 设置质量指标(链式调用)
403    pub fn with_quality(mut self, q: f64) -> Self {
404        self.quality = Some(q);
405        self
406    }
407
408    /// 附加元数据(链式调用)
409    pub fn with_metadata(mut self, m: serde_json::Value) -> Self {
410        self.metadata = Some(m);
411        self
412    }
413}
414
415/// 魄散 PoSan — 净化等级
416#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
417pub enum PoSan {
418    /// 无染 — 清净,无需处置
419    Untainted,
420    /// 轻愁 — 轻度染污,符号裁剪
421    LightGrief,
422    /// 深恸 — 中度染污,背景混合
423    DeepSorrow,
424    /// 销魂 — 重度染污,隔离入幽冥
425    SoulWithered,
426}
427
428impl std::fmt::Display for PoSan {
429    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430        match self {
431            Self::Untainted    => write!(f, "无染"),
432            Self::LightGrief   => write!(f, "轻愁"),
433            Self::DeepSorrow   => write!(f, "深恸"),
434            Self::SoulWithered => write!(f, "销魂"),
435        }
436    }
437}
438
439/// 惆怅 ChouChang — 生命周期状态机
440#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
441pub enum ChouChang {
442    /// 沉眠 — 原始写入
443    Slumber,
444    /// 苏醒 — 候选中
445    Awake,
446    /// 淬炼 — 第 R 轮净化中
447    Forging(u32),
448    /// 澄明 — 第 R 轮净化完成,待验证
449    Luminous(u32),
450    /// 封存 — 已归档(心死状态)
451    Sealed,
452}
453
454/// 断肠 DuanChang — 净化令指
455#[derive(Debug, Clone, Serialize, Deserialize)]
456pub struct DuanChang {
457    pub level:  PoSan,
458    pub score:  f64,
459    pub reason: String,
460    pub ts:     Timestamp,
461    pub round:  u32,
462}
463
464/// 凄怆 QiChuang — 审计记录
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct QiChuang {
467    pub sensor_id:      SensorId,
468    pub droplet_id:     DropletId,
469    pub order:          DuanChang,
470    pub original_value: f64,
471    pub applied_value:  Option<f64>,
472    pub note:           Option<String>,
473    pub ts:             Timestamp,
474}
475
476/// 意乱 YiLuan — 净化结果
477#[derive(Debug, Clone, Serialize, Deserialize)]
478pub struct YiLuan {
479    pub sensor_id:       SensorId,
480    pub ts:              Timestamp,
481    pub corrected_value: f64,
482    pub confidence:      f64,
483    pub method:          String,
484    pub round:           u32,
485    pub provenance:      Vec<DropletId>,
486    pub metrics:         serde_json::Value,
487}
488
489/// 愁苦 ChouKu — 持久化沉淀记录
490#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct ChouKu {
492    pub id:           DropletId,
493    pub sensor_id:    SensorId,
494    pub ts:           Timestamp,
495    pub value:        f64,
496    pub quality:      Option<f64>,
497    pub score:        f64,
498    pub poshan:       String,
499    pub origin_median:Option<f64>,
500    pub origin_mad:   Option<f64>,
501    pub state:        ChouChang,
502    pub round:        u32,
503    pub metadata:     Option<serde_json::Value>,
504}
505
506// ═══════════════════════════════════════════════════════════════════════════════
507// § 5  基础净化算子 — Core Purification Primitives
508// ═══════════════════════════════════════════════════════════════════════════════
509
510/// 一维 DBSCAN 密度聚类
511pub fn dbscan_1d(values: &[f64], eps: f64, min_samples: usize) -> Vec<i32> {
512    let n = values.len();
513    if n == 0 { return vec![]; }
514
515    let mut labels = vec![-2i32; n];
516    let mut cluster_id = 0i32;
517
518    for i in 0..n {
519        if labels[i] != -2 { continue; }
520        let mut neighbors: Vec<usize> = (0..n)
521            .filter(|&j| (values[j] - values[i]).abs() <= eps)
522            .collect();
523        if neighbors.len() < min_samples { labels[i] = -1; continue; }
524        labels[i] = cluster_id;
525        let mut queue: VecDeque<usize> = VecDeque::from(neighbors);
526        while let Some(idx) = queue.pop_front() {
527            if labels[idx] == -1 { labels[idx] = cluster_id; }
528            if labels[idx] != -2 { continue; }
529            labels[idx] = cluster_id;
530            let nbrs: Vec<usize> = (0..n)
531                .filter(|&j| (values[j] - values[idx]).abs() <= eps)
532                .collect();
533            if nbrs.len() >= min_samples {
534                for &m in &nbrs {
535                    if labels[m] == -2 { queue.push_back(m); }
536                }
537            }
538        }
539        cluster_id += 1;
540    }
541
542    for i in 0..n {
543        if labels[i] == -2 { labels[i] = -1; }
544    }
545    labels
546}
547
548/// 基于相邻差分中位数自动估计 DBSCAN 邻域半径
549pub fn estimate_dbscan_eps(values: &[f64]) -> f64 {
550    if values.len() < 2 { return 1e-6; }
551    let mut sorted = values.to_vec();
552    sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
553    let mut diffs: Vec<f64> = (1..sorted.len())
554        .map(|i| (sorted[i] - sorted[i-1]).abs())
555        .collect();
556    if diffs.is_empty() { return 1e-6; }
557    let mid = diffs.len() / 2;
558    diffs.select_nth_unstable_by(mid, f64::total_cmp);
559    diffs[mid].max(1e-6) * 3.0
560}
561
562/// Huber 稳健位置估计
563pub fn huber_robust_location(values: &[f64], delta: f64, max_iter: usize) -> f64 {
564    if values.is_empty() { return 0.0; }
565    let mut v = {
566        let mut tmp = values.to_vec();
567        let mid = tmp.len() / 2;
568        tmp.select_nth_unstable_by(mid, f64::total_cmp);
569        tmp[mid]
570    };
571    for _ in 0..max_iter {
572        let (num, den) = values.iter().fold((0.0f64, 0.0f64), |(n, d), &x| {
573            let r = x - v;
574            let w = if r.abs() <= delta { 1.0 } else { delta / r.abs() };
575            (n + w * x, d + w)
576        });
577        if den == 0.0 { break; }
578        let v_new = num / den;
579        if (v_new - v).abs() < 1e-12 { v = v_new; break; }
580        v = v_new;
581    }
582    v
583}
584
585/// 基于 MAD 自动估计 Huber 阈值
586pub fn estimate_huber_delta(values: &[f64]) -> f64 {
587    if values.is_empty() { return 1.0; }
588    let mut tmp = values.to_vec();
589    let mid = tmp.len() / 2;
590    tmp.select_nth_unstable_by(mid, f64::total_cmp);
591    let median = tmp[mid];
592    let mut devs: Vec<f64> = values.iter().map(|v| (v - median).abs()).collect();
593    let mid = devs.len() / 2;
594    devs.select_nth_unstable_by(mid, f64::total_cmp);
595    let mad = devs[mid];
596    let sigma = if mad > 0.0 { mad * MAD_TO_SIGMA } else { 1e-6 };
597    HUBER_EFFICIENCY * sigma
598}
599
600/// 一维卡尔曼滤波
601pub struct Kalman1D {
602    pub x: f64, pub p: f64, pub q: f64, pub r: f64,
603    initialized: bool,
604}
605
606impl Kalman1D {
607    pub fn new(q: f64, r: f64) -> Self {
608        Self { x: 0.0, p: 1.0, q, r, initialized: false }
609    }
610
611    pub fn reset(&mut self) { self.initialized = false; self.x = 0.0; self.p = 1.0; }
612
613    pub fn update(&mut self, z: f64) -> f64 {
614        if !self.initialized { self.x = z; self.p = 1.0; self.initialized = true; return z; }
615        let p_pred = self.p + self.q;
616        let k = p_pred / (p_pred + self.r);
617        self.x = self.x + k * (z - self.x);
618        self.p = (1.0 - k) * p_pred;
619        self.x
620    }
621
622    pub fn smooth_sequence(&mut self, seq: &[f64]) -> Vec<f64> {
623        seq.iter().map(|&m| self.update(m)).collect()
624    }
625}
626
627/// 计算置信度(样本数对数归一化 + 方差得分加权)
628pub fn compute_confidence(cluster_vals: &[f64], representative: f64) -> f64 {
629    if cluster_vals.is_empty() { return 0.0; }
630    let n = cluster_vals.len() as f64;
631    let var = cluster_vals.iter().map(|&v| { let d = v - representative; d * d }).sum::<f64>() / n;
632    let count_score = (n.ln() / (1.0 + n.ln())).min(1.0);
633    let var_score   = 1.0 / (1.0 + var);
634    (0.6 * count_score + 0.4 * var_score).max(0.0).min(1.0)
635}
636
637// ═══════════════════════════════════════════════════════════════════════════════
638// § 6  销魂 XiaoHun — 统计分析引擎(双轨自适应滑窗)
639// ═══════════════════════════════════════════════════════════════════════════════
640
641pub trait XiaoHunTracker: Send + Sync {
642    fn insert_and_compute(&mut self, value: f64) -> (f64, f64);
643    fn window_len(&self) -> usize;
644    fn snapshot_stats(&self) -> (f64, f64);
645    fn is_empty(&self) -> bool { self.window_len() == 0 }
646}
647
648// ─── 6a. DualHeapTracker ─────────────────────────────────────────────────────
649
650#[derive(Debug, Clone, Copy, PartialEq)]
651struct OrdF64(f64);
652impl Eq for OrdF64 {}
653impl PartialOrd for OrdF64 {
654    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) }
655}
656impl Ord for OrdF64 {
657    fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.0.total_cmp(&other.0) }
658}
659
660pub struct DualHeapTracker {
661    window_size:    usize,
662    lower:          BinaryHeap<OrdF64>,
663    upper:          BinaryHeap<Reverse<OrdF64>>,
664    window:         VecDeque<f64>,
665    pending_remove: VecDeque<f64>,
666}
667
668impl DualHeapTracker {
669    pub fn new(window_size: usize) -> Self {
670        Self {
671            window_size,
672            lower:          BinaryHeap::new(),
673            upper:          BinaryHeap::new(),
674            window:         VecDeque::with_capacity(window_size + 16),
675            pending_remove: VecDeque::new(),
676        }
677    }
678
679    fn heap_insert(&mut self, v: f64) {
680        let ov = OrdF64(v);
681        if self.lower.peek().map_or(true, |&x| ov >= x) {
682            self.upper.push(Reverse(ov));
683        } else {
684            self.lower.push(ov);
685        }
686        self.rebalance();
687    }
688
689    fn rebalance(&mut self) {
690        while self.lower.len() > self.upper.len() + 1 {
691            if let Some(v) = self.lower.pop() { self.upper.push(Reverse(v)); }
692        }
693        while self.upper.len() > self.lower.len() + 1 {
694            if let Some(Reverse(v)) = self.upper.pop() { self.lower.push(v); }
695        }
696    }
697
698    fn current_median(&self) -> f64 {
699        match (self.lower.peek(), self.upper.peek()) {
700            (Some(&lo), Some(&Reverse(hi))) => {
701                if self.lower.len() == self.upper.len() { (lo.0 + hi.0) / 2.0 }
702                else if self.lower.len() > self.upper.len() { lo.0 }
703                else { hi.0 }
704            }
705            (Some(&lo), None) => lo.0,
706            (None, Some(&Reverse(hi))) => hi.0,
707            (None, None) => 0.0,
708        }
709    }
710
711    fn lazy_cleanup(&mut self) {
712        loop {
713            match self.pending_remove.front().copied() {
714                None => break,
715                Some(to_remove) => {
716                    let or = OrdF64(to_remove);
717                    if self.lower.peek() == Some(&or) {
718                        self.lower.pop(); self.pending_remove.pop_front(); self.rebalance();
719                    } else if self.upper.peek() == Some(&Reverse(or)) {
720                        self.upper.pop(); self.pending_remove.pop_front(); self.rebalance();
721                    } else { break; }
722                }
723            }
724        }
725    }
726
727    fn compute_mad(&self, median: f64) -> f64 {
728        if self.window.is_empty() { return 0.0; }
729        let mut devs: Vec<f64> = self.window.iter().map(|v| (v - median).abs()).collect();
730        let n = devs.len();
731        let mid = n / 2;
732        devs.select_nth_unstable_by(mid, f64::total_cmp);
733        if n % 2 == 1 { devs[mid] } else {
734            let a = devs[mid];
735            if mid == 0 { return a; }
736            devs.select_nth_unstable_by(mid - 1, f64::total_cmp);
737            (a + devs[mid - 1]) / 2.0
738        }
739    }
740}
741
742impl XiaoHunTracker for DualHeapTracker {
743    fn insert_and_compute(&mut self, value: f64) -> (f64, f64) {
744        if self.window.len() >= self.window_size {
745            if let Some(old) = self.window.pop_front() {
746                self.pending_remove.push_back(old);
747                self.lazy_cleanup();
748            }
749        }
750        self.window.push_back(value);
751        self.heap_insert(value);
752        let median = self.current_median();
753        (median, self.compute_mad(median))
754    }
755
756    fn window_len(&self) -> usize { self.window.len() }
757
758    fn snapshot_stats(&self) -> (f64, f64) {
759        let med = self.current_median();
760        (med, self.compute_mad(med))
761    }
762}
763
764// ─── 6b. OrderStatTracker ────────────────────────────────────────────────────
765
766pub struct OrderStatTracker {
767    capacity: usize,
768    eps:      f64,
769    map:      BTreeMap<i64, usize>,
770    /// 环形缓冲:(量化 bucket, 原始值)
771    window:   Vec<(i64, f64)>,
772    head:     usize,
773    len:      usize,
774}
775
776impl OrderStatTracker {
777    pub fn new(capacity: usize, eps: f64) -> Self {
778        assert!(capacity > 0, "capacity must be > 0");
779        assert!(eps > 0.0,    "eps must be > 0");
780        Self {
781            capacity, eps,
782            map:    BTreeMap::new(),
783            window: vec![(0, 0.0); capacity],
784            head:   0,
785            len:    0,
786        }
787    }
788
789    #[inline] fn quantise(&self, v: f64)   -> i64 { (v / self.eps).round() as i64 }
790    #[inline] fn dequantise(&self, b: i64) -> f64  { (b as f64) * self.eps }
791
792    fn insert_bucket(&mut self, b: i64) { *self.map.entry(b).or_insert(0) += 1; }
793
794    fn remove_bucket(&mut self, b: i64) {
795        match self.map.get_mut(&b) {
796            Some(cnt) if *cnt > 1 => { *cnt -= 1; }
797            Some(_) => { self.map.remove(&b); }
798            None => {}
799        }
800    }
801
802    fn select_kth(&self, k: usize) -> f64 {
803        let mut acc = 0usize;
804        for (&bucket, &count) in &self.map {
805            acc += count;
806            if acc > k { return self.dequantise(bucket); }
807        }
808        self.map.iter().next_back().map(|(&b, _)| self.dequantise(b)).unwrap_or(0.0)
809    }
810
811    fn median(&self) -> f64 {
812        if self.len == 0 { return 0.0; }
813        let n = self.len;
814        let mid = (n - 1) / 2;
815        if n % 2 == 1 { self.select_kth(mid) }
816        else { (self.select_kth(mid) + self.select_kth(mid + 1)) / 2.0 }
817    }
818
819    fn compute_mad(&self, median: f64) -> f64 {
820        if self.len == 0 { return 0.0; }
821        let mut devs: Vec<f64> = (0..self.len).map(|i| {
822            let idx = (self.head + i) % self.capacity;
823            (self.window[idx].1 - median).abs()
824        }).collect();
825        let n = devs.len();
826        let mid = n / 2;
827        devs.select_nth_unstable_by(mid, f64::total_cmp);
828        if n % 2 == 1 { devs[mid] } else {
829            let a = devs[mid];
830            if mid > 0 { devs.select_nth_unstable_by(mid - 1, f64::total_cmp); (a + devs[mid - 1]) / 2.0 } else { a }
831        }
832    }
833}
834
835impl XiaoHunTracker for OrderStatTracker {
836    fn insert_and_compute(&mut self, value: f64) -> (f64, f64) {
837        let new_bucket = self.quantise(value);
838        if self.len < self.capacity {
839            let idx = (self.head + self.len) % self.capacity;
840            self.window[idx] = (new_bucket, value);
841            self.len += 1;
842            self.insert_bucket(new_bucket);
843        } else {
844            let (old_bucket, _) = self.window[self.head];
845            self.remove_bucket(old_bucket);
846            self.window[self.head] = (new_bucket, value);
847            self.insert_bucket(new_bucket);
848            self.head = (self.head + 1) % self.capacity;
849        }
850        let med = self.median();
851        (med, self.compute_mad(med))
852    }
853
854    fn window_len(&self) -> usize { self.len }
855
856    fn snapshot_stats(&self) -> (f64, f64) {
857        let med = self.median();
858        (med, self.compute_mad(med))
859    }
860}
861
862// ─── 6c. AutoXiaoHun ─────────────────────────────────────────────────────────
863
864/// 自动按窗口大小选择实现的线程安全包装
865pub struct AutoXiaoHun {
866    inner: Mutex<Box<dyn XiaoHunTracker>>,
867}
868
869impl AutoXiaoHun {
870    pub fn new(window_size: usize, eps: f64) -> Self {
871        let inner: Box<dyn XiaoHunTracker> = if window_size <= LARGE_WINDOW_THRESHOLD {
872            Box::new(DualHeapTracker::new(window_size))
873        } else {
874            Box::new(OrderStatTracker::new(window_size, eps))
875        };
876        Self { inner: Mutex::new(inner) }
877    }
878
879    pub fn insert_and_compute(&self, value: f64) -> (f64, f64) {
880        self.inner.lock().insert_and_compute(value)
881    }
882
883    pub fn window_len(&self) -> usize { self.inner.lock().window_len() }
884
885    pub fn snapshot_stats(&self) -> (f64, f64) { self.inner.lock().snapshot_stats() }
886
887    pub fn is_empty(&self) -> bool { self.inner.lock().is_empty() }
888}
889
890// ═══════════════════════════════════════════════════════════════════════════════
891// § 7  神伤 ShenShang — 染度评估器
892// ═══════════════════════════════════════════════════════════════════════════════
893
894pub struct ShenShang {
895    config: YanHunConfig,
896}
897
898impl ShenShang {
899    pub fn new(config: YanHunConfig) -> Self { Self { config } }
900
901    pub fn score(&self, value: f64, median: f64, mad: f64) -> f64 {
902        if mad <= f64::EPSILON { (value - median).abs() }
903        else { MODIFIED_Z_K * (value - median).abs() / mad }
904    }
905
906    pub fn assess(&self, value: f64, median: f64, mad: f64, round: u32) -> DuanChang {
907        let score = self.score(value, median, mad);
908        let level = if score >= self.config.threshold_heavy   { PoSan::SoulWithered }
909                    else if score >= self.config.threshold_medium { PoSan::DeepSorrow   }
910                    else if score >= self.config.threshold_light  { PoSan::LightGrief   }
911                    else                                           { PoSan::Untainted    };
912
913        let reason = match level {
914            PoSan::SoulWithered => format!("score {:.4} ≥ 销魂阈 {:.4}",  score, self.config.threshold_heavy),
915            PoSan::DeepSorrow   => format!("score {:.4} ≥ 深恸阈 {:.4}",  score, self.config.threshold_medium),
916            PoSan::LightGrief   => format!("score {:.4} ≥ 轻愁阈 {:.4}",  score, self.config.threshold_light),
917            PoSan::Untainted    => format!("score {:.4} < 轻愁阈 {:.4}",  score, self.config.threshold_light),
918        };
919
920        DuanChang { level, score, reason, ts: Utc::now(), round }
921    }
922
923    pub fn mad_to_sigma(&self) -> f64 { MAD_TO_SIGMA }
924}
925
926// ═══════════════════════════════════════════════════════════════════════════════
927// § 8  情迷 QingMi — 背景场 EMA 模型
928// ═══════════════════════════════════════════════════════════════════════════════
929
930pub struct QingMi {
931    alpha:  f64,
932    values: RwLock<HashMap<SensorId, f64>>,
933}
934
935impl QingMi {
936    pub fn new(alpha: f64) -> Self {
937        assert!((0.0..=1.0).contains(&alpha), "alpha must be in (0, 1]");
938        Self { alpha, values: RwLock::new(HashMap::new()) }
939    }
940
941    pub fn update(&self, sensor_id: &SensorId, sample: f64) -> f64 {
942        let mut map = self.values.write();
943        let bg = map.entry(sensor_id.clone()).or_insert(sample);
944        *bg = self.alpha * sample + (1.0 - self.alpha) * *bg;
945        *bg
946    }
947
948    pub fn get(&self, sensor_id: &SensorId, fallback: f64) -> f64 {
949        self.values.read().get(sensor_id).copied().unwrap_or(fallback)
950    }
951}
952
953// ═══════════════════════════════════════════════════════════════════════════════
954// § 9  幽冥 YouMing — 持久化存储(Sled / SQLite 双后端)
955// ═══════════════════════════════════════════════════════════════════════════════
956
957pub trait YouMingStore: Send + Sync + 'static {
958    fn append_sediment(&self, rec: &ChouKu) -> Result<()>;
959    fn batch_append_sediment(&self, recs: &[ChouKu]) -> Result<()>;
960    fn pull_candidates(&self, sensor_id: Option<&str>, limit: usize) -> Result<Vec<ChouKu>>;
961    fn get_by_id(&self, id: &DropletId) -> Result<Option<ChouKu>>;
962    fn batch_get_by_ids(&self, ids: &[DropletId]) -> Result<Vec<ChouKu>>;
963    fn mark_forging(&self, ids: &[DropletId], round: u32) -> Result<()>;
964    fn mark_luminous(&self, results: &[YiLuan]) -> Result<()>;
965    fn seal(&self, ids: &[DropletId]) -> Result<()>;
966    fn scan_prefix(&self, prefix: &str) -> Result<Vec<ChouKu>>;
967    fn flush(&self) -> Result<()>;
968    fn apply_shadow(&self, res: &YiLuan) -> Result<()>;
969    fn promote_shadow(&self, res: &YiLuan) -> Result<()>;
970    fn export_shadow(&self, path: &PathBuf) -> Result<()>;
971    fn audit(&self, rec: &QiChuang) -> Result<()>;
972    fn quarantine(&self, rec: &QiChuang) -> Result<()>;
973    fn scan_audit(&self) -> Result<Vec<QiChuang>>;
974    fn scan_quarantine(&self) -> Result<Vec<QiChuang>>;
975    fn export_audit_jsonl(&self, path: &PathBuf) -> Result<usize>;
976    /// CAS 原子更新状态
977    fn compare_and_swap_state(
978        &self,
979        id: &DropletId,
980        old_state: ChouChang,
981        new_state: ChouChang,
982        new_round: u32,
983    ) -> Result<bool>;
984}
985
986// ─── 9b. SQLite 后端 ──────────────────────────────────────────────────────────
987
988pub struct SqliteYouMing {
989    conn: Mutex<Connection>,
990}
991
992impl SqliteYouMing {
993    pub fn open(path: &str) -> Result<Self> {
994        let conn = Connection::open(path).context("打开 SQLite 数据库失败")?;
995        Self::init_schema(&conn)?;
996        Ok(Self { conn: Mutex::new(conn) })
997    }
998
999    pub fn open_in_memory() -> Result<Self> {
1000        let conn = Connection::open_in_memory().context("打开内存数据库失败")?;
1001        Self::init_schema(&conn)?;
1002        Ok(Self { conn: Mutex::new(conn) })
1003    }
1004
1005    fn init_schema(conn: &Connection) -> Result<()> {
1006        conn.execute_batch(
1007            "PRAGMA journal_mode = WAL;
1008             PRAGMA synchronous = NORMAL;
1009             CREATE TABLE IF NOT EXISTS sediment (id TEXT PRIMARY KEY, val TEXT NOT NULL);
1010             CREATE TABLE IF NOT EXISTS shadow   (key TEXT PRIMARY KEY, val TEXT NOT NULL);
1011             CREATE INDEX IF NOT EXISTS idx_shadow_key ON shadow(key);"
1012        ).context("SQLite 建表失败")?;
1013        Ok(())
1014    }
1015
1016    fn make_key(ts: &Timestamp, id: &str) -> String {
1017        format!("{}::{}", ts.to_rfc3339_opts(SecondsFormat::Nanos, true), id)
1018    }
1019
1020    fn get_by_id_inner(&self, conn: &Connection, id: &str) -> Result<Option<ChouKu>> {
1021        let mut stmt = conn.prepare_cached("SELECT val FROM sediment WHERE id = ?1")
1022            .context("SQLite 准备语句失败")?;
1023        let mut rows = stmt.query(params![id]).context("SQLite 查询失败")?;
1024        match rows.next().context("SQLite 读取行失败")? {
1025            Some(row) => {
1026                let val: String = row.get(0).context("SQLite 读取列失败")?;
1027                Ok(Some(serde_json::from_str(&val).context("反序列化愁苦记录失败")?))
1028            }
1029            None => Ok(None),
1030        }
1031    }
1032}
1033
1034impl YouMingStore for SqliteYouMing {
1035    fn append_sediment(&self, rec: &ChouKu) -> Result<()> {
1036        let conn = self.conn.lock();
1037        let val = serde_json::to_string(rec).context("序列化愁苦记录失败")?;
1038        conn.execute("INSERT OR REPLACE INTO sediment (id, val) VALUES (?1, ?2)", params![rec.id, val])
1039            .context("SQLite 插入失败")?;
1040        Ok(())
1041    }
1042
1043    fn batch_append_sediment(&self, recs: &[ChouKu]) -> Result<()> {
1044        let conn = self.conn.lock();
1045        let tx = conn.unchecked_transaction()?;
1046        for rec in recs {
1047            let val = serde_json::to_string(rec).context("序列化愁苦记录失败")?;
1048            tx.execute("INSERT OR REPLACE INTO sediment (id, val) VALUES (?1, ?2)",
1049                       params![rec.id, val]).context("SQLite 批量插入失败")?;
1050        }
1051        tx.commit()?;
1052        Ok(())
1053    }
1054
1055    fn pull_candidates(&self, sensor_id: Option<&str>, limit: usize) -> Result<Vec<ChouKu>> {
1056        let conn = self.conn.lock();
1057        let mut stmt = conn.prepare("SELECT val FROM sediment LIMIT ?1").context("准备失败")?;
1058        let rows = stmt.query_map(params![limit as i64], |r| r.get::<_, String>(0))?;
1059        let mut out = Vec::new();
1060        for row in rows {
1061            let val: String = row?;
1062            if let Ok(rec) = serde_json::from_str::<ChouKu>(&val) {
1063                match rec.state {
1064                    ChouChang::Slumber | ChouChang::Awake | ChouChang::Forging(_) => {
1065                        if sensor_id.map_or(true, |s| rec.sensor_id == s) { out.push(rec); }
1066                    }
1067                    _ => {}
1068                }
1069            }
1070            if out.len() >= limit { break; }
1071        }
1072        Ok(out)
1073    }
1074
1075    fn get_by_id(&self, id: &DropletId) -> Result<Option<ChouKu>> {
1076        let conn = self.conn.lock();
1077        self.get_by_id_inner(&conn, id)
1078    }
1079
1080    fn batch_get_by_ids(&self, ids: &[DropletId]) -> Result<Vec<ChouKu>> {
1081        let mut out = Vec::with_capacity(ids.len());
1082        for id in ids {
1083            if let Some(rec) = self.get_by_id(id)? { out.push(rec); }
1084        }
1085        Ok(out)
1086    }
1087
1088    fn mark_forging(&self, ids: &[DropletId], round: u32) -> Result<()> {
1089        let conn = self.conn.lock();
1090        let tx = conn.unchecked_transaction()?;
1091        for id in ids {
1092            if let Some(mut rec) = self.get_by_id_inner(&conn, id)? {
1093                rec.state = ChouChang::Forging(round);
1094                rec.round = round;
1095                let val = serde_json::to_string(&rec).context("序列化失败")?;
1096                tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
1097            }
1098        }
1099        tx.commit()?;
1100        Ok(())
1101    }
1102
1103    fn mark_luminous(&self, results: &[YiLuan]) -> Result<()> {
1104        let conn = self.conn.lock();
1105        let tx = conn.unchecked_transaction()?;
1106        for res in results {
1107            for pid in &res.provenance {
1108                if let Some(mut rec) = self.get_by_id_inner(&conn, pid)? {
1109                    rec.state = ChouChang::Luminous(res.round);
1110                    let mut meta = rec.metadata.clone().unwrap_or_else(|| serde_json::json!({}));
1111                    meta["净化方法"]  = serde_json::json!(res.method);
1112                    meta["净化置信度"] = serde_json::json!(res.confidence);
1113                    meta["净化时间"]  = serde_json::json!(res.ts.to_rfc3339());
1114                    meta["净化轮次"]  = serde_json::json!(res.round);
1115                    rec.metadata = Some(meta);
1116                    let val = serde_json::to_string(&rec).context("序列化失败")?;
1117                    tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, pid])?;
1118                }
1119            }
1120        }
1121        tx.commit()?;
1122        Ok(())
1123    }
1124
1125    fn seal(&self, ids: &[DropletId]) -> Result<()> {
1126        let conn = self.conn.lock();
1127        let tx = conn.unchecked_transaction()?;
1128        for id in ids {
1129            if let Some(mut rec) = self.get_by_id_inner(&conn, id)? {
1130                rec.state = ChouChang::Sealed;
1131                let val = serde_json::to_string(&rec).context("序列化失败")?;
1132                tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
1133            }
1134        }
1135        tx.commit()?;
1136        Ok(())
1137    }
1138
1139    fn scan_prefix(&self, _prefix: &str) -> Result<Vec<ChouKu>> {
1140        let conn = self.conn.lock();
1141        let mut stmt = conn.prepare("SELECT val FROM sediment").context("准备失败")?;
1142        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1143        let mut out = Vec::new();
1144        for row in rows {
1145            if let Ok(rec) = serde_json::from_str::<ChouKu>(&row?) { out.push(rec); }
1146        }
1147        Ok(out)
1148    }
1149
1150    fn flush(&self) -> Result<()> { Ok(()) }
1151
1152    fn apply_shadow(&self, res: &YiLuan) -> Result<()> {
1153        let conn = self.conn.lock();
1154        let key = Self::make_key(&res.ts, &res.sensor_id);
1155        let val = serde_json::to_string(res).context("序列化意乱失败")?;
1156        conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
1157        Ok(())
1158    }
1159
1160    fn promote_shadow(&self, res: &YiLuan) -> Result<()> {
1161        let conn = self.conn.lock();
1162        let key = format!("promoted::{}", Self::make_key(&res.ts, &res.sensor_id));
1163        let val = serde_json::to_string(res).context("序列化意乱失败")?;
1164        conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
1165        Ok(())
1166    }
1167
1168    fn export_shadow(&self, path: &PathBuf) -> Result<()> {
1169        let conn = self.conn.lock();
1170        let mut stmt = conn.prepare("SELECT val FROM shadow").context("准备失败")?;
1171        let mut file = std::fs::File::create(path).context("创建导出文件失败")?;
1172        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1173        for row in rows {
1174            let val: String = row?;
1175            file.write_all(val.as_bytes())?;
1176            file.write_all(b"\n")?;
1177        }
1178        Ok(())
1179    }
1180
1181    fn audit(&self, rec: &QiChuang) -> Result<()> {
1182        let conn = self.conn.lock();
1183        let key = format!("audit::{}", Self::make_key(&rec.ts, &rec.droplet_id));
1184        let val = serde_json::to_string(rec).context("序列化凄怆失败")?;
1185        conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
1186        Ok(())
1187    }
1188
1189    fn quarantine(&self, rec: &QiChuang) -> Result<()> {
1190        let conn = self.conn.lock();
1191        let ts_key = Self::make_key(&rec.ts, &rec.droplet_id);
1192        // 隔离树键
1193        let qkey = format!("quarantine::{}", ts_key);
1194        let val = serde_json::to_string(rec).context("序列化凄怆失败")?;
1195        conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![qkey, val.clone()])?;
1196        // 同时写入审计键(重度污染双留痕)
1197        let akey = format!("audit::{}", ts_key);
1198        conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![akey, val])?;
1199        Ok(())
1200    }
1201
1202    fn scan_audit(&self) -> Result<Vec<QiChuang>> {
1203        let conn = self.conn.lock();
1204        let mut stmt = conn.prepare("SELECT val FROM shadow WHERE key LIKE 'audit::%'")
1205            .context("准备失败")?;
1206        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1207        let mut out = Vec::new();
1208        for row in rows {
1209            if let Ok(rec) = serde_json::from_str::<QiChuang>(&row?) { out.push(rec); }
1210        }
1211        Ok(out)
1212    }
1213
1214    fn scan_quarantine(&self) -> Result<Vec<QiChuang>> {
1215        let conn = self.conn.lock();
1216        let mut stmt = conn.prepare("SELECT val FROM shadow WHERE key LIKE 'quarantine::%'")
1217            .context("准备失败")?;
1218        let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1219        let mut out = Vec::new();
1220        for row in rows {
1221            if let Ok(rec) = serde_json::from_str::<QiChuang>(&row?) { out.push(rec); }
1222        }
1223        Ok(out)
1224    }
1225
1226    fn export_audit_jsonl(&self, path: &PathBuf) -> Result<usize> {
1227        let records = self.scan_audit()?;
1228        let mut f = BufWriter::new(std::fs::File::create(path).context("创建导出文件失败")?);
1229        let n = records.len();
1230        for rec in &records {
1231            let line = serde_json::to_string(rec).context("序列化失败")?;
1232            f.write_all(line.as_bytes())?;
1233            f.write_all(b"\n")?;
1234        }
1235        f.flush()?;
1236        Ok(n)
1237    }
1238
1239    fn compare_and_swap_state(
1240        &self, id: &DropletId, old_state: ChouChang, new_state: ChouChang, new_round: u32,
1241    ) -> Result<bool> {
1242        let conn = self.conn.lock();
1243        let tx = conn.unchecked_transaction()?;
1244        match self.get_by_id_inner(&conn, id)? {
1245            None => { tx.commit()?; return Ok(false); }
1246            Some(mut rec) => {
1247                if rec.state != old_state { tx.commit()?; return Ok(false); }
1248                rec.state = new_state;
1249                rec.round = new_round;
1250                let val = serde_json::to_string(&rec)?;
1251                tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
1252                tx.commit()?;
1253                Ok(true)
1254            }
1255        }
1256    }
1257}
1258
1259// ═══════════════════════════════════════════════════════════════════════════════
1260// § 10  泪落处理器 — per-sensor LeiLuo processor
1261// ═══════════════════════════════════════════════════════════════════════════════
1262
1263pub struct LeiLuoProcessor {
1264    sensor_id: SensorId,
1265    tracker:   Arc<AutoXiaoHun>,
1266    scorer:    ShenShang,
1267    hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1268    tsdb:      Arc<dyn TsdbSink>,
1269    metrics:   Arc<dyn MetricsSink>,
1270    seq:       AtomicU64,
1271}
1272
1273impl LeiLuoProcessor {
1274    pub fn new(
1275        sensor_id: SensorId,
1276        config:    YanHunConfig,
1277        hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1278        tsdb:      Arc<dyn TsdbSink>,
1279        metrics:   Arc<dyn MetricsSink>,
1280    ) -> Self {
1281        let tracker = Arc::new(AutoXiaoHun::new(config.window_size, config.quantisation_eps));
1282        let scorer  = ShenShang::new(config);
1283        Self { sensor_id, tracker, scorer, hunfei_tx, tsdb, metrics, seq: AtomicU64::new(0) }
1284    }
1285
1286    pub async fn process(&self, reading: LeiLuo) -> Result<()> {
1287        let seq = self.seq.fetch_add(1, AtomicOrd::Relaxed);
1288        let (median, mad) = self.tracker.insert_and_compute(reading.value);
1289        let order = self.scorer.assess(reading.value, median, mad, 1);
1290
1291        debug!(
1292            "黯然销魂 sensor={} seq={} value={:.6} median={:.6} mad={:.6} score={:.4} level={}",
1293            self.sensor_id, seq, reading.value, median, mad, order.score, order.level
1294        );
1295
1296        self.metrics.set_gauge(&format!("yanhun.score.{}", self.sensor_id), order.score);
1297
1298        if order.level != PoSan::Untainted {
1299            self.metrics.incr_counter("yanhun.remediation.emitted", 1);
1300            // 先尝试无阻塞发送,失败后施加背压(阻塞等待)
1301            let msg = (self.sensor_id.clone(), order, reading);
1302            if self.hunfei_tx.try_send(msg.clone()).is_err() {
1303                if let Err(e) = self.hunfei_tx.send(msg).await {
1304                    return Err(YanHunError::Channel(format!("hunfei send: {}", e)).into());
1305                }
1306            }
1307        }
1308        Ok(())
1309    }
1310
1311    pub fn tracker(&self) -> &Arc<AutoXiaoHun> { &self.tracker }
1312    pub fn snapshot_stats(&self) -> (f64, f64) { self.tracker.snapshot_stats() }
1313}
1314
1315// ═══════════════════════════════════════════════════════════════════════════════
1316// § 11  魂飞 HunFei — 净化执行器
1317// ═══════════════════════════════════════════════════════════════════════════════
1318
1319pub struct HunFei {
1320    youming:    Arc<dyn YouMingStore>,
1321    qingmi:     Arc<QingMi>,
1322    processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1323    config:     YanHunConfig,
1324    hunfei_rx:  RwLock<Option<mpsc::Receiver<(SensorId, DuanChang, LeiLuo)>>>,
1325    concurrency:Arc<Semaphore>,
1326    running:    Arc<AtomicBool>,
1327    tsdb:       Arc<dyn TsdbSink>,
1328    metrics:    Arc<dyn MetricsSink>,
1329}
1330
1331impl HunFei {
1332    pub fn new(
1333        youming:    Arc<dyn YouMingStore>,
1334        qingmi:     Arc<QingMi>,
1335        processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1336        config:     YanHunConfig,
1337        hunfei_rx:  mpsc::Receiver<(SensorId, DuanChang, LeiLuo)>,
1338        tsdb:       Arc<dyn TsdbSink>,
1339        metrics:    Arc<dyn MetricsSink>,
1340    ) -> Self {
1341        let concurrency = Arc::new(Semaphore::new(config.hunfei_concurrency.max(1)));
1342        Self {
1343            youming, qingmi, processors,
1344            config: config.clone(),
1345            hunfei_rx: RwLock::new(Some(hunfei_rx)),
1346            concurrency, running: Arc::new(AtomicBool::new(false)),
1347            tsdb, metrics,
1348        }
1349    }
1350
1351    pub fn start(self: Arc<Self>) {
1352        if self.running.swap(true, AtomicOrd::SeqCst) { return; }
1353        let this = self.clone();
1354        tokio::spawn(async move {
1355            let mut rx = match this.hunfei_rx.write().take() {
1356                Some(r) => r,
1357                None => { error!("魂飞: hunfei_rx 已被取走"); return; }
1358            };
1359            info!("魂飞: 净化执行器启动 (并发度={})", this.config.hunfei_concurrency);
1360            while let Some((sid, order, reading)) = rx.recv().await {
1361                let permit = match this.concurrency.clone().acquire_owned().await {
1362                    Ok(p) => p,
1363                    Err(_) => { warn!("魂飞: 信号量已关闭"); break; }
1364                };
1365                let t = this.clone();
1366                tokio::spawn(async move {
1367                    let _p = permit;
1368                    if let Err(e) = t.execute(&sid, order, reading).await {
1369                        error!("魂飞: 执行错误: {:?}", e);
1370                    }
1371                });
1372            }
1373            info!("魂飞: 净化执行器退出");
1374            this.running.store(false, AtomicOrd::SeqCst);
1375        });
1376    }
1377
1378    async fn execute(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1379        match order.level {
1380            PoSan::LightGrief   => self.apply_light(sid, order, reading).await,
1381            PoSan::DeepSorrow   => self.apply_medium(sid, order, reading).await,
1382            PoSan::SoulWithered => self.apply_heavy(sid, order, reading).await,
1383            PoSan::Untainted    => Ok(()),
1384        }
1385    }
1386
1387    // 轻愁 → 符号裁剪
1388    async fn apply_light(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1389        let (median, mad) = self.snapshot_median_mad(sid, &reading);
1390        let sigma = if mad > f64::EPSILON { mad * MAD_TO_SIGMA } else { 0.0 };
1391        let k = self.config.clip_k;
1392        let clipped = if sigma > f64::EPSILON {
1393            reading.value.max(median - k * sigma).min(median + k * sigma)
1394        } else { median };
1395
1396        let note = format!("裁剪至 [{:.6}, {:.6}]  σ={:.6}", median - k*sigma, median + k*sigma, sigma);
1397        self.tsdb.write_corrected(sid, &reading.ts, clipped, &note);
1398        self.metrics.incr_counter("yanhun.light", 1);
1399
1400        let rec = QiChuang {
1401            sensor_id:      sid.clone(),
1402            droplet_id:     reading.id.clone(),
1403            order:          order.clone(),
1404            original_value: reading.value,
1405            applied_value:  Some(clipped),
1406            note:           Some(note),
1407            ts:             Utc::now(),
1408        };
1409        self.youming.audit(&rec)?;
1410
1411        // 反馈矫正值到统计引擎,防止异常值持续污染窗口
1412        self.feed_back(sid, clipped, &reading).await;
1413
1414        if self.config.verification_enabled { self.spawn_verify(sid.clone(), clipped).await; }
1415        Ok(())
1416    }
1417
1418    // 深恸 → 背景混合
1419    async fn apply_medium(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1420        let bg      = self.qingmi.get(sid, reading.value);
1421        let alpha   = self.config.blend_alpha;
1422        let blended = alpha * bg + (1.0 - alpha) * reading.value;
1423        self.qingmi.update(sid, blended);
1424
1425        let note = format!("背景混合 bg={:.6}  α={:.3}", bg, alpha);
1426        self.tsdb.write_corrected(sid, &reading.ts, blended, &note);
1427        self.metrics.incr_counter("yanhun.medium", 1);
1428
1429        let rec = QiChuang {
1430            sensor_id:      sid.clone(),
1431            droplet_id:     reading.id.clone(),
1432            order:          order.clone(),
1433            original_value: reading.value,
1434            applied_value:  Some(blended),
1435            note:           Some(note),
1436            ts:             Utc::now(),
1437        };
1438        self.youming.audit(&rec)?;
1439        self.feed_back(sid, blended, &reading).await;
1440
1441        if self.config.verification_enabled { self.spawn_verify(sid.clone(), blended).await; }
1442        Ok(())
1443    }
1444
1445    // 销魂 → 隔离入幽冥(同时写隔离树 + 审计树)
1446    async fn apply_heavy(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1447        let rec = QiChuang {
1448            sensor_id:      sid.clone(),
1449            droplet_id:     reading.id.clone(),
1450            order:          order.clone(),
1451            original_value: reading.value,
1452            applied_value:  None,
1453            note:           Some("销魂隔离: 已阻断主链路".into()),
1454            ts:             Utc::now(),
1455        };
1456        // quarantine 方法已在 YouMingStore 内同时写入隔离键 + 审计键
1457        self.youming.quarantine(&rec)?;
1458        self.metrics.incr_counter("yanhun.heavy", 1);
1459        warn!("销魂: sensor={} score={:.4} 已隔离入幽冥", sid, order.score);
1460        Ok(())
1461    }
1462
1463    fn snapshot_median_mad(&self, sid: &SensorId, reading: &LeiLuo) -> (f64, f64) {
1464        let map = self.processors.read();
1465        if let Some(proc) = map.get(sid) { proc.snapshot_stats() }
1466        else { (reading.value, 0.0) }
1467    }
1468
1469    async fn feed_back(&self, sid: &SensorId, value: f64, original: &LeiLuo) {
1470        let proc = { self.processors.read().get(sid).cloned() };
1471        if let Some(p) = proc {
1472            let corrected = LeiLuo { value, ts: Utc::now(), ..original.clone() };
1473            if let Err(e) = p.process(corrected).await {
1474                warn!("魂飞: 反馈失败 {}: {:?}", sid, e);
1475            }
1476        }
1477    }
1478
1479    async fn spawn_verify(&self, sid: SensorId, applied: f64) {
1480        let qingmi   = self.qingmi.clone();
1481        let delay    = self.config.shadow_verify_delay_ms;
1482        tokio::spawn(async move {
1483            sleep(Duration::from_millis(delay)).await;
1484            let bg    = qingmi.get(&sid, applied);
1485            let delta = (applied - bg).abs();
1486            if delta < 1e-3 {
1487                debug!("验证: sensor={} applied≈bg → 有效", sid);
1488            } else {
1489                info!("验证: sensor={} Δ={:.6}  applied={:.6}  bg={:.6}", sid, delta, applied, bg);
1490            }
1491        });
1492    }
1493}
1494
1495// ═══════════════════════════════════════════════════════════════════════════════
1496// § 12  惊神 JingShen — 分片管理器(双检查锁 + 全局 tracker 注册表)
1497// ═══════════════════════════════════════════════════════════════════════════════
1498
1499struct ShardEntry {
1500    tx:      mpsc::Sender<LeiLuo>,
1501    tracker: Arc<AutoXiaoHun>,
1502}
1503
1504struct JingShenShard {
1505    entries: RwLock<HashMap<SensorId, ShardEntry>>,
1506}
1507
1508impl JingShenShard {
1509    fn new() -> Self { Self { entries: RwLock::new(HashMap::new()) } }
1510
1511    fn get_or_create(
1512        &self,
1513        sid:        &SensorId,
1514        config:     &YanHunConfig,
1515        hunfei_tx:  &mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1516        processors: &Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1517        trackers:   &Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>>,
1518        tsdb:       &Arc<dyn TsdbSink>,
1519        metrics:    &Arc<dyn MetricsSink>,
1520    ) -> mpsc::Sender<LeiLuo> {
1521        // 快路径:读锁检查
1522        {
1523            let map = self.entries.read();
1524            if let Some(e) = map.get(sid) { return e.tx.clone(); }
1525        }
1526        // 慢路径:写锁 + 双检查
1527        let mut map = self.entries.write();
1528        if let Some(e) = map.get(sid) { return e.tx.clone(); }
1529
1530        let (tx, rx) = mpsc::channel::<LeiLuo>(config.per_sensor_queue);
1531        let proc = Arc::new(LeiLuoProcessor::new(
1532            sid.clone(), config.clone(), hunfei_tx.clone(), tsdb.clone(), metrics.clone(),
1533        ));
1534        let tracker_arc = proc.tracker().clone();
1535
1536        // 注册到全局 tracker 表(供 HunFei 反馈访问)
1537        processors.write().insert(sid.clone(), proc.clone());
1538        trackers.write().insert(sid.clone(), tracker_arc.clone());
1539
1540        // 启动每传感器顺序 worker
1541        let batch_max  = config.batch_max_size;
1542        let batch_wait = config.batch_max_wait_ms;
1543        let tsdb_c     = tsdb.clone();
1544        let proc_c     = proc.clone();
1545
1546        tokio::spawn(async move {
1547            let mut batch: Vec<LeiLuo> = Vec::with_capacity(batch_max);
1548            let mut last_flush = Instant::now();
1549            let mut inbound = rx;
1550            let wait = Duration::from_millis(batch_wait);
1551
1552            loop {
1553                tokio::select! {
1554                    maybe = inbound.recv() => {
1555                        match maybe {
1556                            Some(r) => {
1557                                tsdb_c.write_raw(&r);
1558                                batch.push(r);
1559                                if batch.len() >= batch_max {
1560                                    for item in batch.drain(..) {
1561                                        if let Err(e) = proc_c.process(item).await {
1562                                            error!("惊神: worker 处理失败: {:?}", e);
1563                                        }
1564                                    }
1565                                    last_flush = Instant::now();
1566                                }
1567                            }
1568                            None => {
1569                                for item in batch.drain(..) {
1570                                    let _ = proc_c.process(item).await;
1571                                }
1572                                break;
1573                            }
1574                        }
1575                    }
1576                    _ = sleep(wait) => {
1577                        if !batch.is_empty() && last_flush.elapsed() >= wait {
1578                            for item in batch.drain(..) {
1579                                if let Err(e) = proc_c.process(item).await {
1580                                    error!("惊神: worker 批次处理失败: {:?}", e);
1581                                }
1582                            }
1583                            last_flush = Instant::now();
1584                        }
1585                    }
1586                }
1587            }
1588            debug!("惊神: sensor worker {} 退出", proc_c.sensor_id);
1589        });
1590
1591        map.insert(sid.clone(), ShardEntry { tx: tx.clone(), tracker: tracker_arc });
1592        tx
1593    }
1594}
1595
1596pub struct JingShen {
1597    shards:      Vec<Arc<JingShenShard>>,
1598    shard_count: usize,
1599    config:      YanHunConfig,
1600    hunfei_tx:   mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1601    processors:  Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1602    /// 全局 tracker 注册表(供 HunFei 执行器访问快照统计)
1603    trackers:    Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>>,
1604    tsdb:        Arc<dyn TsdbSink>,
1605    metrics:     Arc<dyn MetricsSink>,
1606}
1607
1608impl JingShen {
1609    pub fn new(
1610        config:     YanHunConfig,
1611        hunfei_tx:  mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1612        processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1613        tsdb:       Arc<dyn TsdbSink>,
1614        metrics:    Arc<dyn MetricsSink>,
1615    ) -> Self {
1616        let shard_count = config.shard_count.max(1);
1617        let shards = (0..shard_count).map(|_| Arc::new(JingShenShard::new())).collect();
1618        Self {
1619            shards, shard_count, config, hunfei_tx, processors,
1620            trackers: Arc::new(RwLock::new(HashMap::new())),
1621            tsdb, metrics,
1622        }
1623    }
1624
1625    /// FNV-1a 哈希分片路由
1626    fn shard_index(&self, sensor_id: &str) -> usize {
1627        let mut h: u64 = 0xcbf29ce484222325;
1628        for b in sensor_id.bytes() { h ^= b as u64; h = h.wrapping_mul(0x100000001b3); }
1629        (h as usize) % self.shard_count
1630    }
1631
1632    pub async fn submit(&self, reading: LeiLuo) -> Result<()> {
1633        let idx = self.shard_index(&reading.sensor_id);
1634        let sender = self.shards[idx].get_or_create(
1635            &reading.sensor_id, &self.config, &self.hunfei_tx,
1636            &self.processors, &self.trackers, &self.tsdb, &self.metrics,
1637        );
1638        sender.send(reading).await
1639            .map_err(|e| YanHunError::Channel(format!("惊神 submit: {}", e)).into())
1640    }
1641
1642    pub fn trackers(&self) -> Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>> {
1643        self.trackers.clone()
1644    }
1645}
1646
1647// ═══════════════════════════════════════════════════════════════════════════════
1648// § 13  悲切 BeiQie — 暗影验证器
1649// ═══════════════════════════════════════════════════════════════════════════════
1650
1651pub struct BeiQie {
1652    config:  YanHunConfig,
1653    metrics: Arc<dyn MetricsSink>,
1654}
1655
1656impl BeiQie {
1657    pub fn new(config: YanHunConfig, metrics: Arc<dyn MetricsSink>) -> Self {
1658        Self { config, metrics }
1659    }
1660
1661    pub async fn verify_and_promote(&self, res: YiLuan, youming: Arc<dyn YouMingStore>) -> Result<()> {
1662        if res.confidence < self.config.shadow_min_confidence {
1663            self.metrics.incr_counter("beiqie.skipped_low_conf", 1);
1664            return Ok(());
1665        }
1666        youming.apply_shadow(&res).context("施加暗影失败")?;
1667        self.metrics.incr_counter("beiqie.applied", 1);
1668        sleep(Duration::from_millis(self.config.shadow_verify_delay_ms)).await;
1669
1670        let roll: f64 = rand::thread_rng().gen();
1671        if roll <= res.confidence {
1672            youming.promote_shadow(&res).context("晋升暗影失败")?;
1673            youming.seal(&res.provenance)?;
1674            self.metrics.incr_counter("beiqie.promoted", 1);
1675        } else {
1676            self.metrics.incr_counter("beiqie.rejected", 1);
1677        }
1678        Ok(())
1679    }
1680}
1681
1682// ═══════════════════════════════════════════════════════════════════════════════
1683// § 14  哀毁 AiHui — 高阶净化器(DBSCAN + Huber M-估计 + 卡尔曼平滑)
1684// ═══════════════════════════════════════════════════════════════════════════════
1685
1686pub struct AiHui {
1687    youming: Arc<dyn YouMingStore>,
1688    config:  YanHunConfig,
1689}
1690
1691impl AiHui {
1692    pub fn new(youming: Arc<dyn YouMingStore>, config: YanHunConfig) -> Self {
1693        Self { youming, config }
1694    }
1695
1696    pub fn process_batch(&self, batch: Vec<ChouKu>) -> Result<Vec<YiLuan>> {
1697        if batch.is_empty() { return Ok(vec![]); }
1698
1699        let mut by_sensor: HashMap<SensorId, Vec<ChouKu>> = HashMap::new();
1700        for rec in batch { by_sensor.entry(rec.sensor_id.clone()).or_default().push(rec); }
1701
1702        let mut out = Vec::new();
1703        for (sid, mut recs) in by_sensor {
1704            recs.sort_by_key(|r| r.ts);
1705            let values: Vec<f64> = recs.iter().map(|r| r.value).collect();
1706
1707            // 第一阶段:DBSCAN 密度聚类
1708            let eps         = estimate_dbscan_eps(&values);
1709            let min_samples = ((values.len() as f64) * self.config.dbscan_min_samples_ratio)
1710                                .ceil() as usize;
1711            let min_samples = min_samples.max(1);
1712            let labels      = dbscan_1d(&values, eps, min_samples);
1713
1714            let mut clusters: HashMap<i32, Vec<(usize, f64)>> = HashMap::new();
1715            for (i, &lab) in labels.iter().enumerate() {
1716                clusters.entry(lab).or_default().push((i, values[i]));
1717            }
1718
1719            for (lab, members) in clusters {
1720                let vals: Vec<f64> = members.iter().map(|(_, v)| *v).collect();
1721
1722                if lab == -1 {
1723                    // 噪声簇:样本够多时取中位数代表
1724                    if vals.len() >= 3 {
1725                        let mut tmp = vals.clone();
1726                        let mid = tmp.len() / 2;
1727                        tmp.select_nth_unstable_by(mid, f64::total_cmp);
1728                        let rep  = tmp[mid];
1729                        let conf = compute_confidence(&vals, rep);
1730                        if conf >= self.config.promote_confidence {
1731                            let prov: Vec<DropletId> =
1732                                members.iter().map(|(idx, _)| recs[*idx].id.clone()).collect();
1733                            out.push(YiLuan {
1734                                sensor_id:       sid.clone(),
1735                                ts:              Utc::now(),
1736                                corrected_value: rep,
1737                                confidence:      conf,
1738                                method:          "DBSCAN_Noise_Median".into(),
1739                                round:           1,
1740                                provenance:      prov,
1741                                metrics: serde_json::json!({"eps": eps, "cluster": lab, "size": vals.len()}),
1742                            });
1743                        }
1744                    }
1745                    continue;
1746                }
1747
1748                // 正常簇:Huber M-估计 + 卡尔曼平滑 → 融合
1749                let delta    = match self.config.huber_strategy {
1750                    HuberStrategy::Fixed(d) => d,
1751                    HuberStrategy::AutoMAD  => estimate_huber_delta(&vals),
1752                };
1753                let huber_est = huber_robust_location(&vals, delta, 50);
1754
1755                let mut kf   = Kalman1D::new(self.config.kalman_q, self.config.kalman_r);
1756                let smoothed = kf.smooth_sequence(&vals);
1757                let mut tmp  = smoothed;
1758                let mid = tmp.len() / 2;
1759                tmp.select_nth_unstable_by(mid, f64::total_cmp);
1760                let kalman_est = tmp[mid];
1761
1762                let combined = 0.5 * huber_est + 0.5 * kalman_est;
1763                let conf     = compute_confidence(&vals, combined);
1764                let prov: Vec<DropletId> =
1765                    members.iter().map(|(idx, _)| recs[*idx].id.clone()).collect();
1766
1767                out.push(YiLuan {
1768                    sensor_id:       sid.clone(),
1769                    ts:              Utc::now(),
1770                    corrected_value: combined,
1771                    confidence:      conf,
1772                    method:          "DBSCAN_Huber_Kalman_Fusion".into(),
1773                    round:           1,
1774                    provenance:      prov,
1775                    metrics: serde_json::json!({
1776                        "eps": eps, "min_samples": min_samples,
1777                        "cluster_size": vals.len(),
1778                        "huber_delta": delta,
1779                        "huber_est": huber_est,
1780                        "kalman_est": kalman_est,
1781                        "combined": combined,
1782                    }),
1783                });
1784            }
1785        }
1786        Ok(out)
1787    }
1788}
1789
1790// ═══════════════════════════════════════════════════════════════════════════════
1791// § 15  心乱 XinLuan — 多轮迭代调度器
1792// ═══════════════════════════════════════════════════════════════════════════════
1793
1794pub struct XinLuan {
1795    youming:   Arc<dyn YouMingStore>,
1796    config:    YanHunConfig,
1797    worker_tx: mpsc::Sender<Vec<ChouKu>>,
1798    metrics:   Arc<dyn MetricsSink>,
1799    beiqie:    Arc<BeiQie>,
1800}
1801
1802impl XinLuan {
1803    pub fn new(
1804        youming:   Arc<dyn YouMingStore>,
1805        config:    YanHunConfig,
1806        worker_tx: mpsc::Sender<Vec<ChouKu>>,
1807        metrics:   Arc<dyn MetricsSink>,
1808        beiqie:    Arc<BeiQie>,
1809    ) -> Self {
1810        Self { youming, config, worker_tx, metrics, beiqie }
1811    }
1812
1813    pub async fn run(&self) -> Result<()> {
1814        info!("心乱: 多轮净化调度器启动");
1815        loop {
1816            let batch = self.youming.pull_candidates(None, self.config.multi_round_batch_size)?;
1817            if batch.is_empty() { sleep(Duration::from_millis(200)).await; continue; }
1818
1819            let shards = self.shard_batch(batch);
1820            for shard_batch in shards {
1821                let mut to_dispatch = Vec::new();
1822                for rec in shard_batch {
1823                    if self.youming.compare_and_swap_state(&rec.id, ChouChang::Slumber, ChouChang::Forging(1), 1)? {
1824                        if let Some(latest) = self.youming.get_by_id(&rec.id)? {
1825                            to_dispatch.push(latest);
1826                        }
1827                    } else {
1828                        self.metrics.incr_counter("xinluan.cas_conflict", 1);
1829                    }
1830                }
1831                if !to_dispatch.is_empty() {
1832                    if let Err(e) = self.worker_tx.send(to_dispatch).await {
1833                        error!("心乱: 分发至肠断池失败: {:?}", e);
1834                    } else {
1835                        self.metrics.incr_counter("xinluan.dispatched", 1);
1836                    }
1837                }
1838            }
1839            sleep(Duration::from_millis(10)).await;
1840        }
1841    }
1842
1843    fn shard_batch(&self, batch: Vec<ChouKu>) -> Vec<Vec<ChouKu>> {
1844        let n = self.config.multi_round_concurrency.max(1);
1845        let mut shards: Vec<Vec<ChouKu>> = vec![Vec::new(); n];
1846        for rec in batch {
1847            let mut h: u64 = 0xcbf29ce484222325;
1848            for b in rec.sensor_id.bytes() { h ^= b as u64; h = h.wrapping_mul(0x100000001b3); }
1849            shards[(h as usize) % n].push(rec);
1850        }
1851        shards.into_iter().filter(|v| !v.is_empty()).collect()
1852    }
1853}
1854
1855// ═══════════════════════════════════════════════════════════════════════════════
1856// § 16  肠断 ChangDuan — 并行 Worker 池
1857// ═══════════════════════════════════════════════════════════════════════════════
1858
1859pub struct ChangDuan {
1860    worker_rx: mpsc::Receiver<Vec<ChouKu>>,
1861    youming:   Arc<dyn YouMingStore>,
1862    config:    YanHunConfig,
1863    metrics:   Arc<dyn MetricsSink>,
1864    beiqie:    Arc<BeiQie>,
1865    sem:       Arc<Semaphore>,
1866}
1867
1868impl ChangDuan {
1869    pub fn new(
1870        worker_rx: mpsc::Receiver<Vec<ChouKu>>,
1871        youming:   Arc<dyn YouMingStore>,
1872        config:    YanHunConfig,
1873        metrics:   Arc<dyn MetricsSink>,
1874        beiqie:    Arc<BeiQie>,
1875    ) -> Self {
1876        let concurrency = config.multi_round_concurrency.max(1);
1877        Self { worker_rx, youming, config, metrics, beiqie, sem: Arc::new(Semaphore::new(concurrency)) }
1878    }
1879
1880    pub async fn run(mut self) {
1881        info!("肠断: 并行 Worker 池启动,并发度={}", self.config.multi_round_concurrency);
1882        while let Some(batch) = self.worker_rx.recv().await {
1883            let permit = match self.sem.clone().acquire_owned().await {
1884                Ok(p) => p,
1885                Err(_) => { warn!("肠断: 信号量已关闭"); break; }
1886            };
1887            let youming  = self.youming.clone();
1888            let cfg      = self.config.clone();
1889            let metrics  = self.metrics.clone();
1890            let beiqie   = self.beiqie.clone();
1891            task::spawn(async move {
1892                let _p = permit;
1893                if let Err(e) = Self::multi_round_purify(youming, cfg, batch, metrics, beiqie).await {
1894                    error!("肠断: 批次净化失败: {:?}", e);
1895                }
1896            });
1897        }
1898        info!("肠断: Worker 池退出");
1899    }
1900
1901    async fn multi_round_purify(
1902        youming:       Arc<dyn YouMingStore>,
1903        cfg:           YanHunConfig,
1904        initial_batch: Vec<ChouKu>,
1905        metrics:       Arc<dyn MetricsSink>,
1906        beiqie:        Arc<BeiQie>,
1907    ) -> Result<()> {
1908        let mut current = initial_batch;
1909        let mut round   = 1u32;
1910        let mut last_conf = 0.0f64;
1911
1912        loop {
1913            let start    = Instant::now();
1914            let purifier = AiHui::new(youming.clone(), cfg.clone());
1915            let results  = purifier.process_batch(current.clone())?;
1916
1917            metrics.incr_counter("yanhun.round_executed", 1);
1918            let avg_conf = if results.is_empty() { 0.0 }
1919                else { results.iter().map(|r| r.confidence).sum::<f64>() / results.len() as f64 };
1920            metrics.set_gauge("yanhun.avg_confidence", avg_conf);
1921
1922            if !results.is_empty() {
1923                youming.mark_luminous(&results)?;
1924                for res in results {
1925                    let beiqie_c = beiqie.clone();
1926                    let ym_c     = youming.clone();
1927                    task::spawn(async move {
1928                        if let Err(e) = beiqie_c.verify_and_promote(res, ym_c).await {
1929                            warn!("悲切验证/晋升失败: {:?}", e);
1930                        }
1931                    });
1932                }
1933            }
1934
1935            if (avg_conf - last_conf).abs() < cfg.convergence_delta {
1936                metrics.incr_counter("yanhun.converged", 1);
1937                break;
1938            }
1939            last_conf = avg_conf;
1940            round += 1;
1941            if round > cfg.max_rounds { metrics.incr_counter("yanhun.max_rounds_reached", 1); break; }
1942
1943            let ids: Vec<DropletId> = current.iter().map(|r| r.id.clone()).collect();
1944            let next = youming.batch_get_by_ids(&ids)?;
1945            if next.is_empty() { break; }
1946            current = next;
1947
1948            let elapsed = start.elapsed();
1949            if elapsed < Duration::from_millis(50) { sleep(Duration::from_millis(10)).await; }
1950        }
1951        Ok(())
1952    }
1953}
1954
1955// ═══════════════════════════════════════════════════════════════════════════════
1956// § 17  黯然 YanHun — 主编排器
1957// ═══════════════════════════════════════════════════════════════════════════════
1958
1959pub struct YanHun {
1960    jingshen:    Arc<JingShen>,
1961    hunfei:      Arc<HunFei>,
1962    youming:     Arc<dyn YouMingStore>,
1963    qingmi:      Arc<QingMi>,
1964    processors:  Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1965    ingestion_tx:mpsc::Sender<LeiLuo>,
1966    config:      YanHunConfig,
1967    tsdb:        Arc<dyn TsdbSink>,
1968    metrics:     Arc<dyn MetricsSink>,
1969    beiqie:      Arc<BeiQie>,
1970    xinluan:     Option<Arc<XinLuan>>,
1971    multi_tx:    Option<mpsc::Sender<Vec<ChouKu>>>,
1972}
1973
1974impl YanHun {
1975    /// 初始化完整系统(同步调用,内部不阻塞当前线程)
1976    pub fn initialize(config: YanHunConfig, youming: Arc<dyn YouMingStore>) -> Self {
1977        let processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>> =
1978            Arc::new(RwLock::new(HashMap::new()));
1979
1980        let tsdb:    Arc<dyn TsdbSink>    = Arc::new(NoopTsdb);
1981        let metrics: Arc<dyn MetricsSink> = Arc::new(NoopMetrics);
1982
1983        let (hunfei_tx, hunfei_rx) = mpsc::channel::<(SensorId, DuanChang, LeiLuo)>(config.ingestion_capacity);
1984        let (ingest_tx, mut ingest_rx) = mpsc::channel::<LeiLuo>(config.ingestion_capacity);
1985
1986        let qingmi = Arc::new(QingMi::new(config.qingmi_alpha));
1987        let beiqie = Arc::new(BeiQie::new(config.clone(), metrics.clone()));
1988
1989        let jingshen = Arc::new(JingShen::new(
1990            config.clone(), hunfei_tx.clone(), processors.clone(), tsdb.clone(), metrics.clone(),
1991        ));
1992
1993        let hunfei = Arc::new(HunFei::new(
1994            youming.clone(), qingmi.clone(), processors.clone(), config.clone(),
1995            hunfei_rx, tsdb.clone(), metrics.clone(),
1996        ));
1997
1998        // 摄入调度器协程
1999        {
2000            let js = jingshen.clone();
2001            tokio::spawn(async move {
2002                while let Some(r) = ingest_rx.recv().await {
2003                    if let Err(e) = js.submit(r).await {
2004                        error!("黯然: 摄入路由错误: {:?}", e);
2005                    }
2006                }
2007                info!("黯然: 摄入调度器退出");
2008            });
2009        }
2010
2011        Self {
2012            jingshen, hunfei, youming, qingmi, processors,
2013            ingestion_tx: ingest_tx, config, tsdb, metrics, beiqie,
2014            xinluan: None, multi_tx: None,
2015        }
2016    }
2017
2018    /// 使用自定义 TSDB + Metrics sink 初始化
2019    pub fn initialize_with_sinks(
2020        config:  YanHunConfig,
2021        youming: Arc<dyn YouMingStore>,
2022        tsdb:    Arc<dyn TsdbSink>,
2023        metrics: Arc<dyn MetricsSink>,
2024    ) -> Self {
2025        let processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>> =
2026            Arc::new(RwLock::new(HashMap::new()));
2027
2028        let (hunfei_tx, hunfei_rx) = mpsc::channel::<(SensorId, DuanChang, LeiLuo)>(config.ingestion_capacity);
2029        let (ingest_tx, mut ingest_rx) = mpsc::channel::<LeiLuo>(config.ingestion_capacity);
2030
2031        let qingmi = Arc::new(QingMi::new(config.qingmi_alpha));
2032        let beiqie = Arc::new(BeiQie::new(config.clone(), metrics.clone()));
2033
2034        let jingshen = Arc::new(JingShen::new(
2035            config.clone(), hunfei_tx.clone(), processors.clone(), tsdb.clone(), metrics.clone(),
2036        ));
2037        let hunfei = Arc::new(HunFei::new(
2038            youming.clone(), qingmi.clone(), processors.clone(), config.clone(),
2039            hunfei_rx, tsdb.clone(), metrics.clone(),
2040        ));
2041
2042        {
2043            let js = jingshen.clone();
2044            tokio::spawn(async move {
2045                while let Some(r) = ingest_rx.recv().await {
2046                    if let Err(e) = js.submit(r).await {
2047                        error!("黯然: 摄入路由错误: {:?}", e);
2048                    }
2049                }
2050            });
2051        }
2052
2053        Self {
2054            jingshen, hunfei, youming, qingmi, processors,
2055            ingestion_tx: ingest_tx, config, tsdb, metrics, beiqie,
2056            xinluan: None, multi_tx: None,
2057        }
2058    }
2059
2060    /// 启动所有后台 worker
2061    pub fn start(&self) {
2062        self.hunfei.clone().start();
2063        info!("黯然销魂系统: 启动完成");
2064    }
2065
2066    /// 启动多轮迭代净化系统(心乱调度器 + 肠断 Worker 池)
2067    pub fn start_multi_round(&mut self) {
2068        let (tx, rx) = mpsc::channel::<Vec<ChouKu>>(self.config.shard_count * 4);
2069        let xinluan  = Arc::new(XinLuan::new(
2070            self.youming.clone(), self.config.clone(), tx.clone(),
2071            self.metrics.clone(), self.beiqie.clone(),
2072        ));
2073        let changduan = ChangDuan::new(
2074            rx, self.youming.clone(), self.config.clone(),
2075            self.metrics.clone(), self.beiqie.clone(),
2076        );
2077        let xl = xinluan.clone();
2078        tokio::spawn(async move { if let Err(e) = xl.run().await { error!("心乱运行错误: {:?}", e); } });
2079        tokio::spawn(async move { changduan.run().await; });
2080        self.xinluan  = Some(xinluan);
2081        self.multi_tx = Some(tx);
2082        info!("黯然销魂系统: 多轮迭代净化已启动");
2083    }
2084
2085    /// 摄入一条感知数据滴
2086    pub async fn ingest(&self, reading: LeiLuo) -> Result<()> {
2087        self.ingestion_tx.send(reading).await
2088            .map_err(|e| anyhow!("黯然: ingest 失败: {}", e))
2089    }
2090
2091    /// 批量摄入
2092    pub async fn ingest_batch(&self, readings: Vec<LeiLuo>) -> Result<()> {
2093        for r in readings { self.ingest(r).await?; }
2094        Ok(())
2095    }
2096
2097    /// 守护进程模式:等待 Ctrl+C 后优雅退出
2098    pub async fn run_daemon(self) -> Result<()> {
2099        info!("黯然销魂系统: 守护模式运行中 — 按 Ctrl+C 停止");
2100        tokio::signal::ctrl_c().await.context("ctrl-c 监听失败")?;
2101        info!("黯然销魂系统: 收到关闭信号,正在排空管道…");
2102        sleep(Duration::from_millis(800)).await;
2103        self.youming.flush()?;
2104        info!("黯然销魂系统: 关闭完成 — 黯然销魂,万念俱灰");
2105        Ok(())
2106    }
2107
2108    /// 导出审计记录为 JSONL 文件
2109    pub fn export_audit(&self, path: &PathBuf) -> Result<usize> {
2110        self.youming.export_audit_jsonl(path)
2111    }
2112
2113    /// 回放隔离区中的全部记录
2114    pub async fn replay_quarantine(&self) -> Result<usize> {
2115        let records = self.youming.scan_quarantine()?;
2116        let count   = records.len();
2117        for rec in records {
2118            let reading = LeiLuo {
2119                id:        rec.droplet_id,
2120                sensor_id: rec.sensor_id,
2121                ts:        rec.ts,
2122                value:     rec.original_value,
2123                quality:   None,
2124                metadata:  None,
2125            };
2126            self.ingest(reading).await?;
2127            sleep(Duration::from_millis(5)).await;
2128        }
2129        info!("回放: 重新注入 {} 条隔离记录", count);
2130        Ok(count)
2131    }
2132
2133    pub fn youming(&self) -> &Arc<dyn YouMingStore> { &self.youming }
2134    pub fn config(&self)  -> &YanHunConfig { &self.config }
2135}
2136
2137// ═══════════════════════════════════════════════════════════════════════════════
2138// § 18  CLI — 命令行界面
2139// ═══════════════════════════════════════════════════════════════════════════════
2140
2141#[derive(Parser, Debug)]
2142#[command(
2143    name    = "yanhun",
2144    about   = "黯然销魂系统 v1.0 — 工业级具身智能传感器数据流净化引擎",
2145    version = "1.0.0"
2146)]
2147pub struct Cli {
2148    /// JSON 配置文件路径(可选)
2149    #[arg(short, long)]
2150    pub config: Option<PathBuf>,
2151
2152    /// 数据库路径
2153    #[arg(short = 'd', long, default_value = "./yanhunsystem_db")]
2154    pub db_path: String,
2155
2156    /// 持久化后端(sled | sqlite)
2157    #[arg(long, default_value = "sled")]
2158    pub backend: String,
2159
2160    /// 沉淀树名称
2161    #[arg(short = 't', long, default_value = "leiluo")]
2162    pub tree_name: String,
2163
2164    /// 暗影树名称
2165    #[arg(long, default_value = "anying")]
2166    pub shadow_tree_name: String,
2167
2168    /// 分片数(覆盖配置文件)
2169    #[arg(short = 's', long)]
2170    pub shards: Option<usize>,
2171
2172    /// 滑动窗口大小(覆盖配置文件)
2173    #[arg(short = 'w', long)]
2174    pub window_size: Option<usize>,
2175
2176    /// 轻愁阈值(覆盖配置文件)
2177    #[arg(long)]
2178    pub threshold_light: Option<f64>,
2179
2180    /// 销魂阈值(覆盖配置文件)
2181    #[arg(long)]
2182    pub threshold_heavy: Option<f64>,
2183
2184    /// 启用多轮净化
2185    #[arg(long, default_value_t = true)]
2186    pub enable_multi_round: bool,
2187
2188    /// 模拟模式:自动生成合成感知数据流
2189    #[arg(long, default_value_t = false)]
2190    pub simulate: bool,
2191
2192    /// 模拟间隔(毫秒)
2193    #[arg(long, default_value_t = 50)]
2194    pub sim_interval_ms: u64,
2195
2196    /// 导出审计记录至 JSONL 文件后退出
2197    #[arg(long)]
2198    pub export: Option<PathBuf>,
2199
2200    /// 回放隔离记录后退出
2201    #[arg(long, default_value_t = false)]
2202    pub replay: bool,
2203
2204    /// 导出暗影记录后退出
2205    #[arg(long)]
2206    pub export_shadow: Option<PathBuf>,
2207}
2208
2209// ═══════════════════════════════════════════════════════════════════════════════
2210// § 19  模拟器 — Synthetic Sensor Data Stream
2211// ═══════════════════════════════════════════════════════════════════════════════
2212
2213pub fn start_simulation(system: Arc<YanHun>, interval_ms: u64) -> tokio::task::JoinHandle<()> {
2214    tokio::spawn(async move {
2215        let sensors    = ["kinoko-1", "suimei-imu", "byakuya-prox", "yoru-wheel", "tsukikage-gps"];
2216        let mut id_ctr = AtomicU64::new(0);
2217        loop {
2218            for &sid in &sensors {
2219                let v = if rand::thread_rng().gen::<f64>() < 0.995 {
2220                    10.0 + (rand::thread_rng().gen::<f64>() - 0.5) * 0.5
2221                } else {
2222                    200.0 + rand::thread_rng().gen::<f64>() * 50.0
2223                };
2224                let seq = id_ctr.fetch_add(1, AtomicOrd::Relaxed);
2225                let r   = LeiLuo {
2226                    id:        format!("sim-{}-{}", sid, seq),
2227                    sensor_id: sid.to_string(),
2228                    ts:        Utc::now(),
2229                    value:     v,
2230                    quality:   Some(1.0),
2231                    metadata:  None,
2232                };
2233                if let Err(e) = system.ingest(r).await {
2234                    error!("模拟器: 提交错误: {:?}", e);
2235                }
2236            }
2237            sleep(Duration::from_millis(interval_ms)).await;
2238        }
2239    })
2240}
2241
2242// ═══════════════════════════════════════════════════════════════════════════════
2243// § 20  main — 程序入口
2244// ═══════════════════════════════════════════════════════════════════════════════
2245
2246#[tokio::main(flavor = "multi_thread")]
2247async fn main() -> Result<()> {
2248    env_logger::Builder::from_env(
2249        env_logger::Env::default().default_filter_or("info")
2250    ).format_timestamp_millis().init();
2251
2252    let cli = Cli::parse();
2253
2254    // 三级配置覆盖:文件 → 环境变量 → CLI 内联参数
2255    let mut app_cfg = if let Some(ref path) = cli.config {
2256        AppConfig::load(path).unwrap_or_else(|e| {
2257            warn!("配置文件加载失败 ({:?}),使用默认配置", e);
2258            AppConfig::default()
2259        })
2260    } else {
2261        AppConfig::default()
2262    };
2263
2264    app_cfg = app_cfg.apply_env();
2265
2266    // CLI 内联参数具有最高优先级
2267    if let Some(v) = cli.shards          { app_cfg.shard_count    = Some(v); }
2268    if let Some(v) = cli.window_size     { app_cfg.window_size    = Some(v); }
2269    if let Some(v) = cli.threshold_light { app_cfg.threshold_light = Some(v); }
2270    if let Some(v) = cli.threshold_heavy { app_cfg.threshold_heavy = Some(v); }
2271
2272    let base_cfg = YanHunConfig {
2273        db_path:          cli.db_path.clone(),
2274        tree_name:        cli.tree_name.clone(),
2275        shadow_tree_name: cli.shadow_tree_name.clone(),
2276        persistence_backend: PersistenceBackend::Sqlite,
2277        ..YanHunConfig::default()
2278    };
2279
2280    let final_cfg = app_cfg.into_inner_config(base_cfg);
2281    final_cfg.validate().context("配置校验失败")?;
2282
2283    // 初始化持久化后端 (Termux ARM 环境强制使用 SQLite)
2284    let youming: Arc<dyn YouMingStore> = if final_cfg.db_path == ":memory:" {
2285        Arc::new(SqliteYouMing::open_in_memory()?)
2286    } else {
2287        Arc::new(SqliteYouMing::open(&final_cfg.db_path)?)
2288    };
2289
2290    // 导出模式(直接操作存储,无需启动系统)
2291    if let Some(ref out) = cli.export {
2292        info!("导出: 审计记录 → {:?}", out);
2293        let n = youming.export_audit_jsonl(out)?;
2294        info!("导出完成: {} 条记录", n);
2295        return Ok(());
2296    }
2297    if let Some(ref out) = cli.export_shadow {
2298        info!("导出: 暗影记录 → {:?}", out);
2299        youming.export_shadow(out)?;
2300        info!("暗影导出完成");
2301        return Ok(());
2302    }
2303
2304    // 构建并启动系统
2305    let mut system = YanHun::initialize(final_cfg.clone(), youming.clone());
2306    system.start();
2307    if cli.enable_multi_round { system.start_multi_round(); }
2308
2309    // 统一启动日志(所有关键参数一行输出)
2310    info!(
2311        "黯然销魂系统上线 | shards={} window={} eps={} backend={:?} \
2312         thresholds=[{:.1}/{:.1}/{:.1}] multi_round={} verify={}",
2313        final_cfg.shard_count,
2314        final_cfg.window_size,
2315        final_cfg.quantisation_eps,
2316        final_cfg.persistence_backend,
2317        final_cfg.threshold_light,
2318        final_cfg.threshold_medium,
2319        final_cfg.threshold_heavy,
2320        cli.enable_multi_round,
2321        final_cfg.verification_enabled,
2322    );
2323
2324    // 回放模式
2325    if cli.replay {
2326        info!("回放: 重新注入隔离记录");
2327        let n = system.replay_quarantine().await?;
2328        info!("回放完成: {} 条", n);
2329        return Ok(());
2330    }
2331
2332    let system = Arc::new(system);
2333
2334    // 模拟模式
2335    let sim = if cli.simulate {
2336        info!("模拟器: 启动合成数据流 ({}ms 间隔)", cli.sim_interval_ms);
2337        Some(start_simulation(system.clone(), cli.sim_interval_ms))
2338    } else {
2339        None
2340    };
2341
2342    // 守护等待
2343    tokio::signal::ctrl_c().await?;
2344    info!("黯然销魂系统: 收到关闭信号");
2345    if let Some(h) = sim { h.abort(); }
2346    info!("黯然销魂系统: 正在排空管道…");
2347    sleep(Duration::from_millis(800)).await;
2348    youming.flush()?;
2349    info!("黯然销魂系统: 关闭完成 — 此情可待成追忆,只是当时已惘然");
2350    Ok(())
2351}
2352
2353// ═══════════════════════════════════════════════════════════════════════════════
2354// § 21  试炼 — 测试套件
2355// ═══════════════════════════════════════════════════════════════════════════════
2356
2357#[cfg(test)]
2358mod tests {
2359    use super::*;
2360    use tempfile::tempdir;
2361
2362    fn test_config() -> YanHunConfig {
2363        YanHunConfig {
2364            window_size:         100,
2365            quantisation_eps:    1e-3,
2366            threshold_light:     2.5,
2367            threshold_medium:    4.5,
2368            threshold_heavy:     8.0,
2369            ingestion_capacity:  512,
2370            per_sensor_queue:    64,
2371            shard_count:         2,
2372            batch_max_size:      16,
2373            batch_max_wait_ms:   5,
2374            max_rounds:          3,
2375            verification_enabled: false,
2376            multi_round_concurrency: 2,
2377            ..YanHunConfig::default()
2378        }
2379    }
2380
2381    // ── 统计核心 ──────────────────────────────────────────────────────────────
2382
2383    #[test]
2384    fn test_quantise_roundtrip() {
2385        let eps = 1e-3;
2386        let v   = 12.345678_f64;
2387        let mut t = OrderStatTracker::new(50, eps);
2388        let b = (v / eps).round() as i64;
2389        let v2 = (b as f64) * eps;
2390        assert!((v - v2).abs() <= eps, "量化往返误差过大: {} vs {}", v, v2);
2391    }
2392
2393    #[test]
2394    fn test_dual_heap_median() {
2395        let mut t = DualHeapTracker::new(7);
2396        for v in 1..=7 { let _ = t.insert_and_compute(v as f64); }
2397        assert!((t.current_median() - 4.0).abs() < 1e-9);
2398    }
2399
2400    #[test]
2401    fn test_dual_heap_mad_constant_sequence() {
2402        let mut t = DualHeapTracker::new(11);
2403        for _ in 0..11 { let _ = t.insert_and_compute(10.0); }
2404        let (_, mad) = t.snapshot_stats();
2405        assert!(mad < 1e-9, "常数序列 MAD 应为 0,实际={}", mad);
2406    }
2407
2408    #[test]
2409    fn test_order_stat_median_robust() {
2410        let mut t = OrderStatTracker::new(50, 1e-3);
2411        for v in (0..20).map(|i| 10.0 + (i % 3) as f64 * 0.01) {
2412            t.insert_and_compute(v);
2413        }
2414        let (median, mad) = t.insert_and_compute(10_000.0);
2415        assert!((median - 10.0).abs() < 2.0, "中位数应对异常值不敏感,median={}", median);
2416        assert!(mad >= 0.0);
2417    }
2418
2419    #[test]
2420    fn test_order_stat_sliding_window() {
2421        let cap = 10;
2422        let mut t = OrderStatTracker::new(cap, 1e-3);
2423        for v in 0..cap { t.insert_and_compute(v as f64); }
2424        assert_eq!(t.window_len(), cap);
2425        t.insert_and_compute(100.0);
2426        assert_eq!(t.window_len(), cap, "窗口应维持固定大小");
2427    }
2428
2429    #[test]
2430    fn test_auto_xiaohuun_small_uses_dual_heap() {
2431        let t = AutoXiaoHun::new(512, 1e-4);
2432        let (med, _) = t.insert_and_compute(42.0);
2433        assert!((med - 42.0).abs() < 1e-6);
2434    }
2435
2436    #[test]
2437    fn test_auto_xiaohuun_large_uses_order_stat() {
2438        let t = AutoXiaoHun::new(LARGE_WINDOW_THRESHOLD + 1, 1e-4);
2439        let (med, _) = t.insert_and_compute(7.0);
2440        assert!((med - 7.0).abs() < 1e-3);
2441    }
2442
2443    // ── 神伤评估器 ────────────────────────────────────────────────────────────
2444
2445    #[test]
2446    fn test_shenshang_levels() {
2447        let scorer = ShenShang::new(test_config());
2448        assert_eq!(scorer.assess(10.0, 10.0, 1.0, 1).level, PoSan::Untainted,   "正常值应为无染");
2449        // score = 0.6745 * |14.5 - 10| / 1.0 ≈ 3.04 ≥ 2.5 → 轻愁
2450        assert_eq!(scorer.assess(14.5, 10.0, 1.0, 1).level, PoSan::LightGrief,  "轻度异常应为轻愁");
2451        // score ≈ 10.1 ≥ 8.0 → 销魂
2452        assert_eq!(scorer.assess(25.0, 10.0, 1.0, 1).level, PoSan::SoulWithered,"重度异常应为销魂");
2453    }
2454
2455    #[test]
2456    fn test_shenshang_zero_mad_fallback() {
2457        let scorer = ShenShang::new(test_config());
2458        let order  = scorer.assess(50.0, 10.0, 0.0, 1);
2459        assert!(order.score > 5.0, "MAD=0 应退化为绝对差值: score={}", order.score);
2460        assert_ne!(order.level, PoSan::Untainted);
2461    }
2462
2463    #[test]
2464    fn test_poshan_display() {
2465        assert_eq!(format!("{}", PoSan::Untainted),    "无染");
2466        assert_eq!(format!("{}", PoSan::LightGrief),   "轻愁");
2467        assert_eq!(format!("{}", PoSan::DeepSorrow),   "深恸");
2468        assert_eq!(format!("{}", PoSan::SoulWithered), "销魂");
2469    }
2470
2471    // ── 情迷背景模型 ──────────────────────────────────────────────────────────
2472
2473    #[test]
2474    fn test_qingmi_converges() {
2475        let qm  = QingMi::new(0.1);
2476        let sid = "sensor-test".to_string();
2477        for _ in 0..200 { qm.update(&sid, 10.0); }
2478        let v = qm.get(&sid, 0.0);
2479        assert!((v - 10.0).abs() < 0.01, "EMA 应收敛至 10,实际={}", v);
2480    }
2481
2482    // ── 配置校验 ──────────────────────────────────────────────────────────────
2483
2484    #[test]
2485    fn test_config_validation_pass() {
2486        assert!(YanHunConfig::default().validate().is_ok());
2487    }
2488
2489    #[test]
2490    fn test_config_validation_zero_window() {
2491        let bad = YanHunConfig { window_size: 0, ..YanHunConfig::default() };
2492        assert!(bad.validate().is_err());
2493    }
2494
2495    #[test]
2496    fn test_config_validation_negative_eps() {
2497        let bad = YanHunConfig { quantisation_eps: -1.0, ..YanHunConfig::default() };
2498        assert!(bad.validate().is_err());
2499    }
2500
2501    #[test]
2502    fn test_config_validation_bad_confidence() {
2503        let bad = YanHunConfig { promote_confidence: 1.5, ..YanHunConfig::default() };
2504        assert!(bad.validate().is_err());
2505    }
2506
2507    // ── LeiLuo 便利构造器 ─────────────────────────────────────────────────────
2508
2509    #[test]
2510    fn test_leiluo_new() {
2511        let r = LeiLuo::new("sensor-a", 42.0);
2512        assert_eq!(r.sensor_id, "sensor-a");
2513        assert!((r.value - 42.0).abs() < 1e-9);
2514        assert!(r.quality.is_none());
2515        assert!(r.metadata.is_none());
2516    }
2517
2518    #[test]
2519    fn test_leiluo_with_quality_and_metadata() {
2520        let r = LeiLuo::new("s1", 1.0)
2521            .with_quality(0.95)
2522            .with_metadata(serde_json::json!({"source": "test"}));
2523        assert!((r.quality.unwrap() - 0.95).abs() < 1e-9);
2524        assert!(r.metadata.is_some());
2525    }
2526
2527    // ── 幽冥存储 CRUD ─────────────────────────────────────────────────────────
2528
2529    #[test]
2530    fn test_sqlite_youming_audit_roundtrip() {
2531        let ym  = SqliteYouMing::open_in_memory().unwrap();
2532        let rec = QiChuang {
2533            sensor_id:      "s1".into(),
2534            droplet_id:     "d1".into(),
2535            order:          DuanChang {
2536                level:  PoSan::LightGrief,
2537                score:  3.5,
2538                reason: "test".into(),
2539                ts:     Utc::now(),
2540                round:  1,
2541            },
2542            original_value: 15.0,
2543            applied_value:  Some(12.0),
2544            note:           Some("clip".into()),
2545            ts:             Utc::now(),
2546        };
2547        ym.audit(&rec).unwrap();
2548        let audits = ym.scan_audit().unwrap();
2549        assert_eq!(audits.len(), 1);
2550        assert!((audits[0].applied_value.unwrap() - 12.0).abs() < 1e-9);
2551    }
2552
2553    #[test]
2554    fn test_sqlite_youming_quarantine_dual_write() {
2555        let ym  = SqliteYouMing::open_in_memory().unwrap();
2556        let rec = QiChuang {
2557            sensor_id:      "sq".into(),
2558            droplet_id:     "dq".into(),
2559            order:          DuanChang {
2560                level:  PoSan::SoulWithered,
2561                score:  15.0,
2562                reason: "spike".into(),
2563                ts:     Utc::now(),
2564                round:  1,
2565            },
2566            original_value: 500.0,
2567            applied_value:  None,
2568            note:           Some("quarantined".into()),
2569            ts:             Utc::now(),
2570        };
2571        ym.quarantine(&rec).unwrap();
2572        // 隔离树有记录
2573        let q = ym.scan_quarantine().unwrap();
2574        assert_eq!(q.len(), 1, "隔离树应有 1 条记录");
2575        // 审计树也有记录(双写)
2576        let a = ym.scan_audit().unwrap();
2577        assert_eq!(a.len(), 1, "审计树应也有 1 条记录(双写)");
2578    }
2579
2580    #[test]
2581    fn test_sqlite_youming_cas() {
2582        let ym = SqliteYouMing::open_in_memory().unwrap();
2583        let rec = ChouKu {
2584            id:           "cas-1".into(),
2585            sensor_id:    "s1".into(),
2586            ts:           Utc::now(),
2587            value:        10.0,
2588            quality:      None,
2589            score:        0.5,
2590            poshan:       "无染".into(),
2591            origin_median:None,
2592            origin_mad:   None,
2593            state:        ChouChang::Slumber,
2594            round:        0,
2595            metadata:     None,
2596        };
2597        ym.append_sediment(&rec).unwrap();
2598
2599        // CAS 成功:Slumber → Forging(1)
2600        let ok = ym.compare_and_swap_state(&"cas-1".into(), ChouChang::Slumber, ChouChang::Forging(1), 1).unwrap();
2601        assert!(ok, "CAS 应成功");
2602
2603        // CAS 失败:状态已变,Slumber 不再匹配
2604        let ok2 = ym.compare_and_swap_state(&"cas-1".into(), ChouChang::Slumber, ChouChang::Forging(2), 2).unwrap();
2605        assert!(!ok2, "CAS 应失败(状态已变)");
2606    }
2607
2608    // ── Sled 幽冥 ──────────────────────────────────────────────────────────────
2609
2610    #[test]
2611    fn test_dbscan_1d_basic() {
2612        let values = vec![1.0, 1.1, 1.2, 50.0, 1.15, 1.05];
2613        let labels = dbscan_1d(&values, 0.5, 2);
2614        // 前五个接近值应同簇,50.0 应为噪声或独立簇
2615        let noise  = labels.iter().filter(|&&l| l == -1).count();
2616        let in_cluster = labels.iter().filter(|&&l| l >= 0).count();
2617        assert!(in_cluster >= 4, "多数点应聚为簇,噪声={} 簇内={}", noise, in_cluster);
2618    }
2619
2620    #[test]
2621    fn test_huber_robust_location() {
2622        // 15 个正常值 + 1 个极端异常值
2623        let mut vals: Vec<f64> = (0..15).map(|i| 10.0 + i as f64 * 0.01).collect();
2624        vals.push(10_000.0);
2625        let est = huber_robust_location(&vals, 1.0, 50);
2626        assert!((est - 10.07).abs() < 0.5, "Huber 估计应接近真实中心,est={}", est);
2627    }
2628
2629    #[test]
2630    fn test_kalman_convergence() {
2631        let mut kf  = Kalman1D::new(1e-3, 1e-2);
2632        let signal  = vec![10.0; 50];
2633        let out     = kf.smooth_sequence(&signal);
2634        let last    = *out.last().unwrap();
2635        assert!((last - 10.0).abs() < 0.01, "卡尔曼平滑应收敛至真值,last={}", last);
2636    }
2637
2638    #[test]
2639    fn test_confidence_computation() {
2640        let tight  = vec![10.0, 10.01, 9.99, 10.02, 9.98];
2641        let spread = vec![1.0, 5.0, 10.0, 15.0, 20.0];
2642        let c_tight  = compute_confidence(&tight,  10.0);
2643        let c_spread = compute_confidence(&spread, 10.2);
2644        assert!(c_tight > c_spread, "紧凑簇置信度应高于分散簇");
2645        assert!(c_tight  <= 1.0);
2646        assert!(c_spread >= 0.0);
2647    }
2648}
2649
2650// ═══════════════════════════════════════════════════════════════════════════════
2651// § 22  Cargo.toml 依赖
2652//
2653// [package]
2654// name    = "yanhunsystem"
2655// version = "1.0.0"
2656// edition = "2021"
2657//
2658// [dependencies]
2659// tokio       = { version = "1",   features = ["full"] }
2660// serde       = { version = "1",   features = ["derive"] }
2661// serde_json  = "1"
2662// anyhow      = "1"
2663// thiserror   = "1"
2664// log         = "0.4"
2665// env_logger  = "0.11"
2666// parking_lot = "0.12"
2667// chrono      = { version = "0.4", features = ["serde"] }
2668// sled        = "0.34"
2669// rusqlite    = { version = "0.31", features = ["bundled"] }
2670// rand        = "0.8"
2671// clap        = { version = "4",   features = ["derive"] }
2672// num_cpus    = "1.16"
2673// uuid        = { version = "1",   features = ["v4"] }
2674//
2675// [dev-dependencies]
2676// tempfile = "3"
2677//
2678// [profile.release]
2679// opt-level     = 3
2680// lto           = "thin"
2681// codegen-units = 4
2682// ═══════════════════════════════════════════════════════════════════════════════