1#![allow(dead_code, unused_imports, unused_variables, unused_mut)]
43
44use std::cmp::Reverse;
46use std::collections::{BTreeMap, BinaryHeap, HashMap, VecDeque};
47use std::hash::{Hash, Hasher};
48use std::io::{BufWriter, Write};
49use std::path::{Path, PathBuf};
50use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrd};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53use tokio::sync::{mpsc, oneshot, Semaphore};
55use tokio::task;
56use tokio::time::sleep;
57use serde::{Deserialize, Serialize};
59use anyhow::{anyhow, Context, Result};
61use thiserror::Error;
62use log::{debug, error, info, warn};
64use chrono::{DateTime, SecondsFormat, Utc};
66use parking_lot::{Mutex, RwLock};
68use rand::Rng;
70use rusqlite::{params, Connection};
72use clap::Parser;
74use num_cpus;
76
77pub type SensorId = String;
82pub type Timestamp = DateTime<Utc>;
83pub type DropletId = String;
84
85const LARGE_WINDOW_THRESHOLD: usize = 65_536;
87const MAD_TO_SIGMA: f64 = 1.4826;
89const MODIFIED_Z_K: f64 = 0.6745;
91const HUBER_EFFICIENCY: f64 = 1.345;
93
94#[derive(Debug, Error)]
95pub enum YanHunError {
96 #[error("泪落摄取失败: {0}")]
97 Ingest(String),
98 #[error("销魂分析失败: {0}")]
99 Analysis(String),
100 #[error("幽冥持久化失败: {0}")]
101 Persistence(String),
102 #[error("信道传输失败: {0}")]
103 Channel(String),
104 #[error("配置失当: {0}")]
105 Config(String),
106 #[error("CAS 竞争失败: {0}")]
107 CasConflict(String),
108 #[error("净化执行失败: {0}")]
109 Purification(String),
110 #[error("IO 错误: {0}")]
111 Io(#[from] std::io::Error),
112}
113
114impl From<rusqlite::Error> for YanHunError {
115 fn from(e: rusqlite::Error) -> Self { YanHunError::Persistence(e.to_string()) }
116}
117
118pub trait TsdbSink: Send + Sync + 'static {
124 fn write_raw(&self, reading: &LeiLuo);
125 fn write_corrected(&self, sensor_id: &str, ts: &Timestamp, value: f64, note: &str);
126}
127
128pub struct NoopTsdb;
129impl TsdbSink for NoopTsdb {
130 fn write_raw(&self, _r: &LeiLuo) {}
131 fn write_corrected(&self, _s: &str, _t: &Timestamp, _v: f64, _n: &str) {}
132}
133
134pub trait MetricsSink: Send + Sync + 'static {
136 fn incr_counter(&self, name: &str, delta: u64);
137 fn set_gauge(&self, name: &str, value: f64);
138 fn record_ms(&self, name: &str, ms: u64);
139}
140
141pub struct NoopMetrics;
142impl MetricsSink for NoopMetrics {
143 fn incr_counter(&self, _n: &str, _d: u64) {}
144 fn set_gauge(&self, _n: &str, _v: f64) {}
145 fn record_ms(&self, _n: &str, _m: u64) {}
146}
147
148#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
153pub enum HuberStrategy {
154 Fixed(f64),
155 AutoMAD,
156}
157
158#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
159pub enum PersistenceBackend { Sled, Sqlite }
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct YanHunConfig {
163 pub window_size: usize,
164 pub quantisation_eps: f64,
165 pub threshold_light: f64,
166 pub threshold_medium: f64,
167 pub threshold_heavy: f64,
168 pub ingestion_capacity: usize,
169 pub per_sensor_queue: usize,
170 pub shard_count: usize,
171 pub batch_max_size: usize,
172 pub batch_max_wait_ms: u64,
173 pub qingmi_alpha: f64,
174 pub hunfei_concurrency: usize,
175 pub verification_enabled: bool,
176 pub blend_alpha: f64,
177 pub clip_k: f64,
178 pub huber_strategy: HuberStrategy,
179 pub kalman_q: f64,
180 pub kalman_r: f64,
181 pub max_rounds: u32,
182 pub promote_confidence: f64,
183 pub convergence_delta: f64,
184 pub shadow_verify_delay_ms: u64,
185 pub shadow_max_attempts: u32,
186 pub shadow_min_confidence: f64,
187 pub persistence_backend: PersistenceBackend,
188 pub db_path: String,
189 pub tree_name: String,
190 pub shadow_tree_name: String,
191 pub flush_interval_ms: u64,
192 pub dbscan_min_samples_ratio: f64,
193 pub multi_round_batch_size: usize,
194 pub multi_round_concurrency: usize,
195}
196
197impl Default for YanHunConfig {
198 fn default() -> Self {
199 Self {
200 window_size: 1_024,
201 quantisation_eps: 1e-4,
202 threshold_light: 3.0,
203 threshold_medium: 6.0,
204 threshold_heavy: 10.0,
205 ingestion_capacity: 16_384,
206 per_sensor_queue: 1_024,
207 shard_count: 8,
208 batch_max_size: 256,
209 batch_max_wait_ms: 10,
210 qingmi_alpha: 0.05,
211 hunfei_concurrency: 16,
212 verification_enabled: true,
213 blend_alpha: 0.6,
214 clip_k: 3.0,
215 huber_strategy: HuberStrategy::AutoMAD,
216 kalman_q: 1e-3,
217 kalman_r: 1e-2,
218 max_rounds: 6,
219 promote_confidence: 0.85,
220 convergence_delta: 1e-3,
221 shadow_verify_delay_ms: 500,
222 shadow_max_attempts: 3,
223 shadow_min_confidence: 0.9,
224 persistence_backend: PersistenceBackend::Sqlite,
225 db_path: "./yanhunsystem_db".to_string(),
226 tree_name: "leiluo".to_string(),
227 shadow_tree_name: "anying".to_string(),
228 flush_interval_ms: 500,
229 dbscan_min_samples_ratio: 0.02,
230 multi_round_batch_size: 1024,
231 multi_round_concurrency: num_cpus::get(),
232 }
233 }
234}
235
236impl YanHunConfig {
237 pub fn validate(&self) -> Result<()> {
238 if self.window_size == 0 { anyhow::bail!("window_size 不能为零"); }
239 if self.max_rounds == 0 { anyhow::bail!("max_rounds 不能为零"); }
240 if self.quantisation_eps <= 0.0 { anyhow::bail!("quantisation_eps 必须大于零"); }
241 if self.shard_count == 0 { anyhow::bail!("shard_count 不能为零"); }
242 if !(0.0..=1.0).contains(&self.promote_confidence) {
243 anyhow::bail!("promote_confidence 必须在 0.0–1.0 之间");
244 }
245 if self.convergence_delta <= 0.0 { anyhow::bail!("convergence_delta 必须大于零"); }
246 if self.flush_interval_ms == 0 { anyhow::bail!("flush_interval_ms 不能为零"); }
247 if !(0.0..=1.0).contains(&self.qingmi_alpha) {
248 anyhow::bail!("qingmi_alpha 必须在 0.0–1.0 之间");
249 }
250 if self.multi_round_batch_size == 0 { anyhow::bail!("multi_round_batch_size 不能为零"); }
251 Ok(())
252 }
253
254 pub fn load_from_file(path: &PathBuf) -> Result<Self> {
255 let f = std::fs::File::open(path).context("配置文件打开失败")?;
256 serde_json::from_reader(f).context("配置文件解析失败")
257 }
258
259 pub fn apply_env(mut self) -> Self {
260 macro_rules! env_usize {
261 ($var:expr, $field:expr) => {
262 if let Ok(v) = std::env::var($var) {
263 if let Ok(n) = v.parse::<usize>() { $field = n; }
264 }
265 };
266 }
267 macro_rules! env_f64 {
268 ($var:expr, $field:expr) => {
269 if let Ok(v) = std::env::var($var) {
270 if let Ok(f) = v.parse::<f64>() { $field = f; }
271 }
272 };
273 }
274 macro_rules! env_bool {
275 ($var:expr, $field:expr) => {
276 if let Ok(v) = std::env::var($var) {
277 if let Ok(b) = v.parse::<bool>() { $field = b; }
278 }
279 };
280 }
281 env_usize!("YANHUN_WINDOW_SIZE", self.window_size);
282 env_f64!("YANHUN_THRESHOLD_LIGHT", self.threshold_light);
283 env_f64!("YANHUN_THRESHOLD_MEDIUM", self.threshold_medium);
284 env_f64!("YANHUN_THRESHOLD_HEAVY", self.threshold_heavy);
285 env_usize!("YANHUN_SHARDS", self.shard_count);
286 env_f64!("YANHUN_QINGMI_ALPHA", self.qingmi_alpha);
287 env_bool!("YANHUN_VERIFY", self.verification_enabled);
288 if let Ok(v) = std::env::var("YANHUN_MAX_ROUNDS") {
289 if let Ok(n) = v.parse::<u32>() {
290 self.max_rounds = n;
291 }
292}
293 env_f64!("YANHUN_PROMOTE_CONF", self.promote_confidence);
294 self
295 }
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize, Default)]
300pub struct AppConfig {
301 pub window_size: Option<usize>,
302 pub quantisation_eps: Option<f64>,
303 pub threshold_light: Option<f64>,
304 pub threshold_medium: Option<f64>,
305 pub threshold_heavy: Option<f64>,
306 pub shard_count: Option<usize>,
307 pub batch_max_size: Option<usize>,
308 pub batch_max_wait_ms: Option<u64>,
309 pub qingmi_alpha: Option<f64>,
310 pub hunfei_concurrency: Option<usize>,
311 pub verification_enabled: Option<bool>,
312 pub max_rounds: Option<usize>,
313 pub promote_confidence: Option<f64>,
314}
315
316impl AppConfig {
317 pub fn load(path: &PathBuf) -> Result<Self> {
318 let f = std::fs::File::open(path).context("配置文件打开失败")?;
319 serde_json::from_reader(f).context("配置文件解析失败")
320 }
321
322 pub fn apply_env(mut self) -> Self {
324 macro_rules! env_opt_usize {
325 ($field:ident, $key:expr) => {
326 if let Ok(v) = std::env::var($key) {
327 if let Ok(n) = v.parse() { self.$field = Some(n); }
328 }
329 };
330 }
331 macro_rules! env_opt_f64 {
332 ($field:ident, $key:expr) => {
333 if let Ok(v) = std::env::var($key) {
334 if let Ok(f) = v.parse() { self.$field = Some(f); }
335 }
336 };
337 }
338 env_opt_usize!(window_size, "YANHUN_WINDOW_SIZE");
339 env_opt_usize!(shard_count, "YANHUN_SHARDS");
340 env_opt_usize!(hunfei_concurrency, "YANHUN_MAX_CONCURRENCY");
341 env_opt_f64!(threshold_light, "YANHUN_THRESHOLD_LIGHT");
342 env_opt_f64!(threshold_medium, "YANHUN_THRESHOLD_MEDIUM");
343 env_opt_f64!(threshold_heavy, "YANHUN_THRESHOLD_HEAVY");
344 env_opt_f64!(qingmi_alpha, "YANHUN_QINGMI_ALPHA");
345 self
346 }
347
348 pub fn into_inner_config(self, mut base: YanHunConfig) -> YanHunConfig {
350 macro_rules! apply {
351 ($field:ident) => {
352 if let Some(v) = self.$field { base.$field = v; }
353 };
354 }
355 apply!(window_size);
356 apply!(quantisation_eps);
357 apply!(threshold_light);
358 apply!(threshold_medium);
359 apply!(threshold_heavy);
360 apply!(shard_count);
361 apply!(batch_max_size);
362 apply!(batch_max_wait_ms);
363 apply!(qingmi_alpha);
364 apply!(hunfei_concurrency);
365 apply!(verification_enabled);
366 if let Some(r) = self.max_rounds { base.max_rounds = r as u32; }
367 if let Some(c) = self.promote_confidence { base.promote_confidence = c; }
368 base
369 }
370}
371
372#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct LeiLuo {
379 pub id: DropletId,
380 pub sensor_id: SensorId,
381 pub ts: Timestamp,
382 pub value: f64,
383 pub quality: Option<f64>,
384 pub metadata: Option<serde_json::Value>,
386}
387
388impl LeiLuo {
389 pub fn new(sensor_id: impl Into<String>, value: f64) -> Self {
391 let sid = sensor_id.into();
392 Self {
393 id: uuid::Uuid::new_v4().to_string(),
394 sensor_id: sid,
395 ts: Utc::now(),
396 value,
397 quality: None,
398 metadata: None,
399 }
400 }
401
402 pub fn with_quality(mut self, q: f64) -> Self {
404 self.quality = Some(q);
405 self
406 }
407
408 pub fn with_metadata(mut self, m: serde_json::Value) -> Self {
410 self.metadata = Some(m);
411 self
412 }
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
417pub enum PoSan {
418 Untainted,
420 LightGrief,
422 DeepSorrow,
424 SoulWithered,
426}
427
428impl std::fmt::Display for PoSan {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 match self {
431 Self::Untainted => write!(f, "无染"),
432 Self::LightGrief => write!(f, "轻愁"),
433 Self::DeepSorrow => write!(f, "深恸"),
434 Self::SoulWithered => write!(f, "销魂"),
435 }
436 }
437}
438
439#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
441pub enum ChouChang {
442 Slumber,
444 Awake,
446 Forging(u32),
448 Luminous(u32),
450 Sealed,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
456pub struct DuanChang {
457 pub level: PoSan,
458 pub score: f64,
459 pub reason: String,
460 pub ts: Timestamp,
461 pub round: u32,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct QiChuang {
467 pub sensor_id: SensorId,
468 pub droplet_id: DropletId,
469 pub order: DuanChang,
470 pub original_value: f64,
471 pub applied_value: Option<f64>,
472 pub note: Option<String>,
473 pub ts: Timestamp,
474}
475
476#[derive(Debug, Clone, Serialize, Deserialize)]
478pub struct YiLuan {
479 pub sensor_id: SensorId,
480 pub ts: Timestamp,
481 pub corrected_value: f64,
482 pub confidence: f64,
483 pub method: String,
484 pub round: u32,
485 pub provenance: Vec<DropletId>,
486 pub metrics: serde_json::Value,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct ChouKu {
492 pub id: DropletId,
493 pub sensor_id: SensorId,
494 pub ts: Timestamp,
495 pub value: f64,
496 pub quality: Option<f64>,
497 pub score: f64,
498 pub poshan: String,
499 pub origin_median:Option<f64>,
500 pub origin_mad: Option<f64>,
501 pub state: ChouChang,
502 pub round: u32,
503 pub metadata: Option<serde_json::Value>,
504}
505
506pub fn dbscan_1d(values: &[f64], eps: f64, min_samples: usize) -> Vec<i32> {
512 let n = values.len();
513 if n == 0 { return vec![]; }
514
515 let mut labels = vec![-2i32; n];
516 let mut cluster_id = 0i32;
517
518 for i in 0..n {
519 if labels[i] != -2 { continue; }
520 let mut neighbors: Vec<usize> = (0..n)
521 .filter(|&j| (values[j] - values[i]).abs() <= eps)
522 .collect();
523 if neighbors.len() < min_samples { labels[i] = -1; continue; }
524 labels[i] = cluster_id;
525 let mut queue: VecDeque<usize> = VecDeque::from(neighbors);
526 while let Some(idx) = queue.pop_front() {
527 if labels[idx] == -1 { labels[idx] = cluster_id; }
528 if labels[idx] != -2 { continue; }
529 labels[idx] = cluster_id;
530 let nbrs: Vec<usize> = (0..n)
531 .filter(|&j| (values[j] - values[idx]).abs() <= eps)
532 .collect();
533 if nbrs.len() >= min_samples {
534 for &m in &nbrs {
535 if labels[m] == -2 { queue.push_back(m); }
536 }
537 }
538 }
539 cluster_id += 1;
540 }
541
542 for i in 0..n {
543 if labels[i] == -2 { labels[i] = -1; }
544 }
545 labels
546}
547
548pub fn estimate_dbscan_eps(values: &[f64]) -> f64 {
550 if values.len() < 2 { return 1e-6; }
551 let mut sorted = values.to_vec();
552 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
553 let mut diffs: Vec<f64> = (1..sorted.len())
554 .map(|i| (sorted[i] - sorted[i-1]).abs())
555 .collect();
556 if diffs.is_empty() { return 1e-6; }
557 let mid = diffs.len() / 2;
558 diffs.select_nth_unstable_by(mid, f64::total_cmp);
559 diffs[mid].max(1e-6) * 3.0
560}
561
562pub fn huber_robust_location(values: &[f64], delta: f64, max_iter: usize) -> f64 {
564 if values.is_empty() { return 0.0; }
565 let mut v = {
566 let mut tmp = values.to_vec();
567 let mid = tmp.len() / 2;
568 tmp.select_nth_unstable_by(mid, f64::total_cmp);
569 tmp[mid]
570 };
571 for _ in 0..max_iter {
572 let (num, den) = values.iter().fold((0.0f64, 0.0f64), |(n, d), &x| {
573 let r = x - v;
574 let w = if r.abs() <= delta { 1.0 } else { delta / r.abs() };
575 (n + w * x, d + w)
576 });
577 if den == 0.0 { break; }
578 let v_new = num / den;
579 if (v_new - v).abs() < 1e-12 { v = v_new; break; }
580 v = v_new;
581 }
582 v
583}
584
585pub fn estimate_huber_delta(values: &[f64]) -> f64 {
587 if values.is_empty() { return 1.0; }
588 let mut tmp = values.to_vec();
589 let mid = tmp.len() / 2;
590 tmp.select_nth_unstable_by(mid, f64::total_cmp);
591 let median = tmp[mid];
592 let mut devs: Vec<f64> = values.iter().map(|v| (v - median).abs()).collect();
593 let mid = devs.len() / 2;
594 devs.select_nth_unstable_by(mid, f64::total_cmp);
595 let mad = devs[mid];
596 let sigma = if mad > 0.0 { mad * MAD_TO_SIGMA } else { 1e-6 };
597 HUBER_EFFICIENCY * sigma
598}
599
600pub struct Kalman1D {
602 pub x: f64, pub p: f64, pub q: f64, pub r: f64,
603 initialized: bool,
604}
605
606impl Kalman1D {
607 pub fn new(q: f64, r: f64) -> Self {
608 Self { x: 0.0, p: 1.0, q, r, initialized: false }
609 }
610
611 pub fn reset(&mut self) { self.initialized = false; self.x = 0.0; self.p = 1.0; }
612
613 pub fn update(&mut self, z: f64) -> f64 {
614 if !self.initialized { self.x = z; self.p = 1.0; self.initialized = true; return z; }
615 let p_pred = self.p + self.q;
616 let k = p_pred / (p_pred + self.r);
617 self.x = self.x + k * (z - self.x);
618 self.p = (1.0 - k) * p_pred;
619 self.x
620 }
621
622 pub fn smooth_sequence(&mut self, seq: &[f64]) -> Vec<f64> {
623 seq.iter().map(|&m| self.update(m)).collect()
624 }
625}
626
627pub fn compute_confidence(cluster_vals: &[f64], representative: f64) -> f64 {
629 if cluster_vals.is_empty() { return 0.0; }
630 let n = cluster_vals.len() as f64;
631 let var = cluster_vals.iter().map(|&v| { let d = v - representative; d * d }).sum::<f64>() / n;
632 let count_score = (n.ln() / (1.0 + n.ln())).min(1.0);
633 let var_score = 1.0 / (1.0 + var);
634 (0.6 * count_score + 0.4 * var_score).max(0.0).min(1.0)
635}
636
637pub trait XiaoHunTracker: Send + Sync {
642 fn insert_and_compute(&mut self, value: f64) -> (f64, f64);
643 fn window_len(&self) -> usize;
644 fn snapshot_stats(&self) -> (f64, f64);
645 fn is_empty(&self) -> bool { self.window_len() == 0 }
646}
647
648#[derive(Debug, Clone, Copy, PartialEq)]
651struct OrdF64(f64);
652impl Eq for OrdF64 {}
653impl PartialOrd for OrdF64 {
654 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) }
655}
656impl Ord for OrdF64 {
657 fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.0.total_cmp(&other.0) }
658}
659
660pub struct DualHeapTracker {
661 window_size: usize,
662 lower: BinaryHeap<OrdF64>,
663 upper: BinaryHeap<Reverse<OrdF64>>,
664 window: VecDeque<f64>,
665 pending_remove: VecDeque<f64>,
666}
667
668impl DualHeapTracker {
669 pub fn new(window_size: usize) -> Self {
670 Self {
671 window_size,
672 lower: BinaryHeap::new(),
673 upper: BinaryHeap::new(),
674 window: VecDeque::with_capacity(window_size + 16),
675 pending_remove: VecDeque::new(),
676 }
677 }
678
679 fn heap_insert(&mut self, v: f64) {
680 let ov = OrdF64(v);
681 if self.lower.peek().map_or(true, |&x| ov >= x) {
682 self.upper.push(Reverse(ov));
683 } else {
684 self.lower.push(ov);
685 }
686 self.rebalance();
687 }
688
689 fn rebalance(&mut self) {
690 while self.lower.len() > self.upper.len() + 1 {
691 if let Some(v) = self.lower.pop() { self.upper.push(Reverse(v)); }
692 }
693 while self.upper.len() > self.lower.len() + 1 {
694 if let Some(Reverse(v)) = self.upper.pop() { self.lower.push(v); }
695 }
696 }
697
698 fn current_median(&self) -> f64 {
699 match (self.lower.peek(), self.upper.peek()) {
700 (Some(&lo), Some(&Reverse(hi))) => {
701 if self.lower.len() == self.upper.len() { (lo.0 + hi.0) / 2.0 }
702 else if self.lower.len() > self.upper.len() { lo.0 }
703 else { hi.0 }
704 }
705 (Some(&lo), None) => lo.0,
706 (None, Some(&Reverse(hi))) => hi.0,
707 (None, None) => 0.0,
708 }
709 }
710
711 fn lazy_cleanup(&mut self) {
712 loop {
713 match self.pending_remove.front().copied() {
714 None => break,
715 Some(to_remove) => {
716 let or = OrdF64(to_remove);
717 if self.lower.peek() == Some(&or) {
718 self.lower.pop(); self.pending_remove.pop_front(); self.rebalance();
719 } else if self.upper.peek() == Some(&Reverse(or)) {
720 self.upper.pop(); self.pending_remove.pop_front(); self.rebalance();
721 } else { break; }
722 }
723 }
724 }
725 }
726
727 fn compute_mad(&self, median: f64) -> f64 {
728 if self.window.is_empty() { return 0.0; }
729 let mut devs: Vec<f64> = self.window.iter().map(|v| (v - median).abs()).collect();
730 let n = devs.len();
731 let mid = n / 2;
732 devs.select_nth_unstable_by(mid, f64::total_cmp);
733 if n % 2 == 1 { devs[mid] } else {
734 let a = devs[mid];
735 if mid == 0 { return a; }
736 devs.select_nth_unstable_by(mid - 1, f64::total_cmp);
737 (a + devs[mid - 1]) / 2.0
738 }
739 }
740}
741
742impl XiaoHunTracker for DualHeapTracker {
743 fn insert_and_compute(&mut self, value: f64) -> (f64, f64) {
744 if self.window.len() >= self.window_size {
745 if let Some(old) = self.window.pop_front() {
746 self.pending_remove.push_back(old);
747 self.lazy_cleanup();
748 }
749 }
750 self.window.push_back(value);
751 self.heap_insert(value);
752 let median = self.current_median();
753 (median, self.compute_mad(median))
754 }
755
756 fn window_len(&self) -> usize { self.window.len() }
757
758 fn snapshot_stats(&self) -> (f64, f64) {
759 let med = self.current_median();
760 (med, self.compute_mad(med))
761 }
762}
763
764pub struct OrderStatTracker {
767 capacity: usize,
768 eps: f64,
769 map: BTreeMap<i64, usize>,
770 window: Vec<(i64, f64)>,
772 head: usize,
773 len: usize,
774}
775
776impl OrderStatTracker {
777 pub fn new(capacity: usize, eps: f64) -> Self {
778 assert!(capacity > 0, "capacity must be > 0");
779 assert!(eps > 0.0, "eps must be > 0");
780 Self {
781 capacity, eps,
782 map: BTreeMap::new(),
783 window: vec![(0, 0.0); capacity],
784 head: 0,
785 len: 0,
786 }
787 }
788
789 #[inline] fn quantise(&self, v: f64) -> i64 { (v / self.eps).round() as i64 }
790 #[inline] fn dequantise(&self, b: i64) -> f64 { (b as f64) * self.eps }
791
792 fn insert_bucket(&mut self, b: i64) { *self.map.entry(b).or_insert(0) += 1; }
793
794 fn remove_bucket(&mut self, b: i64) {
795 match self.map.get_mut(&b) {
796 Some(cnt) if *cnt > 1 => { *cnt -= 1; }
797 Some(_) => { self.map.remove(&b); }
798 None => {}
799 }
800 }
801
802 fn select_kth(&self, k: usize) -> f64 {
803 let mut acc = 0usize;
804 for (&bucket, &count) in &self.map {
805 acc += count;
806 if acc > k { return self.dequantise(bucket); }
807 }
808 self.map.iter().next_back().map(|(&b, _)| self.dequantise(b)).unwrap_or(0.0)
809 }
810
811 fn median(&self) -> f64 {
812 if self.len == 0 { return 0.0; }
813 let n = self.len;
814 let mid = (n - 1) / 2;
815 if n % 2 == 1 { self.select_kth(mid) }
816 else { (self.select_kth(mid) + self.select_kth(mid + 1)) / 2.0 }
817 }
818
819 fn compute_mad(&self, median: f64) -> f64 {
820 if self.len == 0 { return 0.0; }
821 let mut devs: Vec<f64> = (0..self.len).map(|i| {
822 let idx = (self.head + i) % self.capacity;
823 (self.window[idx].1 - median).abs()
824 }).collect();
825 let n = devs.len();
826 let mid = n / 2;
827 devs.select_nth_unstable_by(mid, f64::total_cmp);
828 if n % 2 == 1 { devs[mid] } else {
829 let a = devs[mid];
830 if mid > 0 { devs.select_nth_unstable_by(mid - 1, f64::total_cmp); (a + devs[mid - 1]) / 2.0 } else { a }
831 }
832 }
833}
834
835impl XiaoHunTracker for OrderStatTracker {
836 fn insert_and_compute(&mut self, value: f64) -> (f64, f64) {
837 let new_bucket = self.quantise(value);
838 if self.len < self.capacity {
839 let idx = (self.head + self.len) % self.capacity;
840 self.window[idx] = (new_bucket, value);
841 self.len += 1;
842 self.insert_bucket(new_bucket);
843 } else {
844 let (old_bucket, _) = self.window[self.head];
845 self.remove_bucket(old_bucket);
846 self.window[self.head] = (new_bucket, value);
847 self.insert_bucket(new_bucket);
848 self.head = (self.head + 1) % self.capacity;
849 }
850 let med = self.median();
851 (med, self.compute_mad(med))
852 }
853
854 fn window_len(&self) -> usize { self.len }
855
856 fn snapshot_stats(&self) -> (f64, f64) {
857 let med = self.median();
858 (med, self.compute_mad(med))
859 }
860}
861
862pub struct AutoXiaoHun {
866 inner: Mutex<Box<dyn XiaoHunTracker>>,
867}
868
869impl AutoXiaoHun {
870 pub fn new(window_size: usize, eps: f64) -> Self {
871 let inner: Box<dyn XiaoHunTracker> = if window_size <= LARGE_WINDOW_THRESHOLD {
872 Box::new(DualHeapTracker::new(window_size))
873 } else {
874 Box::new(OrderStatTracker::new(window_size, eps))
875 };
876 Self { inner: Mutex::new(inner) }
877 }
878
879 pub fn insert_and_compute(&self, value: f64) -> (f64, f64) {
880 self.inner.lock().insert_and_compute(value)
881 }
882
883 pub fn window_len(&self) -> usize { self.inner.lock().window_len() }
884
885 pub fn snapshot_stats(&self) -> (f64, f64) { self.inner.lock().snapshot_stats() }
886
887 pub fn is_empty(&self) -> bool { self.inner.lock().is_empty() }
888}
889
890pub struct ShenShang {
895 config: YanHunConfig,
896}
897
898impl ShenShang {
899 pub fn new(config: YanHunConfig) -> Self { Self { config } }
900
901 pub fn score(&self, value: f64, median: f64, mad: f64) -> f64 {
902 if mad <= f64::EPSILON { (value - median).abs() }
903 else { MODIFIED_Z_K * (value - median).abs() / mad }
904 }
905
906 pub fn assess(&self, value: f64, median: f64, mad: f64, round: u32) -> DuanChang {
907 let score = self.score(value, median, mad);
908 let level = if score >= self.config.threshold_heavy { PoSan::SoulWithered }
909 else if score >= self.config.threshold_medium { PoSan::DeepSorrow }
910 else if score >= self.config.threshold_light { PoSan::LightGrief }
911 else { PoSan::Untainted };
912
913 let reason = match level {
914 PoSan::SoulWithered => format!("score {:.4} ≥ 销魂阈 {:.4}", score, self.config.threshold_heavy),
915 PoSan::DeepSorrow => format!("score {:.4} ≥ 深恸阈 {:.4}", score, self.config.threshold_medium),
916 PoSan::LightGrief => format!("score {:.4} ≥ 轻愁阈 {:.4}", score, self.config.threshold_light),
917 PoSan::Untainted => format!("score {:.4} < 轻愁阈 {:.4}", score, self.config.threshold_light),
918 };
919
920 DuanChang { level, score, reason, ts: Utc::now(), round }
921 }
922
923 pub fn mad_to_sigma(&self) -> f64 { MAD_TO_SIGMA }
924}
925
926pub struct QingMi {
931 alpha: f64,
932 values: RwLock<HashMap<SensorId, f64>>,
933}
934
935impl QingMi {
936 pub fn new(alpha: f64) -> Self {
937 assert!((0.0..=1.0).contains(&alpha), "alpha must be in (0, 1]");
938 Self { alpha, values: RwLock::new(HashMap::new()) }
939 }
940
941 pub fn update(&self, sensor_id: &SensorId, sample: f64) -> f64 {
942 let mut map = self.values.write();
943 let bg = map.entry(sensor_id.clone()).or_insert(sample);
944 *bg = self.alpha * sample + (1.0 - self.alpha) * *bg;
945 *bg
946 }
947
948 pub fn get(&self, sensor_id: &SensorId, fallback: f64) -> f64 {
949 self.values.read().get(sensor_id).copied().unwrap_or(fallback)
950 }
951}
952
953pub trait YouMingStore: Send + Sync + 'static {
958 fn append_sediment(&self, rec: &ChouKu) -> Result<()>;
959 fn batch_append_sediment(&self, recs: &[ChouKu]) -> Result<()>;
960 fn pull_candidates(&self, sensor_id: Option<&str>, limit: usize) -> Result<Vec<ChouKu>>;
961 fn get_by_id(&self, id: &DropletId) -> Result<Option<ChouKu>>;
962 fn batch_get_by_ids(&self, ids: &[DropletId]) -> Result<Vec<ChouKu>>;
963 fn mark_forging(&self, ids: &[DropletId], round: u32) -> Result<()>;
964 fn mark_luminous(&self, results: &[YiLuan]) -> Result<()>;
965 fn seal(&self, ids: &[DropletId]) -> Result<()>;
966 fn scan_prefix(&self, prefix: &str) -> Result<Vec<ChouKu>>;
967 fn flush(&self) -> Result<()>;
968 fn apply_shadow(&self, res: &YiLuan) -> Result<()>;
969 fn promote_shadow(&self, res: &YiLuan) -> Result<()>;
970 fn export_shadow(&self, path: &PathBuf) -> Result<()>;
971 fn audit(&self, rec: &QiChuang) -> Result<()>;
972 fn quarantine(&self, rec: &QiChuang) -> Result<()>;
973 fn scan_audit(&self) -> Result<Vec<QiChuang>>;
974 fn scan_quarantine(&self) -> Result<Vec<QiChuang>>;
975 fn export_audit_jsonl(&self, path: &PathBuf) -> Result<usize>;
976 fn compare_and_swap_state(
978 &self,
979 id: &DropletId,
980 old_state: ChouChang,
981 new_state: ChouChang,
982 new_round: u32,
983 ) -> Result<bool>;
984}
985
986pub struct SqliteYouMing {
989 conn: Mutex<Connection>,
990}
991
992impl SqliteYouMing {
993 pub fn open(path: &str) -> Result<Self> {
994 let conn = Connection::open(path).context("打开 SQLite 数据库失败")?;
995 Self::init_schema(&conn)?;
996 Ok(Self { conn: Mutex::new(conn) })
997 }
998
999 pub fn open_in_memory() -> Result<Self> {
1000 let conn = Connection::open_in_memory().context("打开内存数据库失败")?;
1001 Self::init_schema(&conn)?;
1002 Ok(Self { conn: Mutex::new(conn) })
1003 }
1004
1005 fn init_schema(conn: &Connection) -> Result<()> {
1006 conn.execute_batch(
1007 "PRAGMA journal_mode = WAL;
1008 PRAGMA synchronous = NORMAL;
1009 CREATE TABLE IF NOT EXISTS sediment (id TEXT PRIMARY KEY, val TEXT NOT NULL);
1010 CREATE TABLE IF NOT EXISTS shadow (key TEXT PRIMARY KEY, val TEXT NOT NULL);
1011 CREATE INDEX IF NOT EXISTS idx_shadow_key ON shadow(key);"
1012 ).context("SQLite 建表失败")?;
1013 Ok(())
1014 }
1015
1016 fn make_key(ts: &Timestamp, id: &str) -> String {
1017 format!("{}::{}", ts.to_rfc3339_opts(SecondsFormat::Nanos, true), id)
1018 }
1019
1020 fn get_by_id_inner(&self, conn: &Connection, id: &str) -> Result<Option<ChouKu>> {
1021 let mut stmt = conn.prepare_cached("SELECT val FROM sediment WHERE id = ?1")
1022 .context("SQLite 准备语句失败")?;
1023 let mut rows = stmt.query(params![id]).context("SQLite 查询失败")?;
1024 match rows.next().context("SQLite 读取行失败")? {
1025 Some(row) => {
1026 let val: String = row.get(0).context("SQLite 读取列失败")?;
1027 Ok(Some(serde_json::from_str(&val).context("反序列化愁苦记录失败")?))
1028 }
1029 None => Ok(None),
1030 }
1031 }
1032}
1033
1034impl YouMingStore for SqliteYouMing {
1035 fn append_sediment(&self, rec: &ChouKu) -> Result<()> {
1036 let conn = self.conn.lock();
1037 let val = serde_json::to_string(rec).context("序列化愁苦记录失败")?;
1038 conn.execute("INSERT OR REPLACE INTO sediment (id, val) VALUES (?1, ?2)", params![rec.id, val])
1039 .context("SQLite 插入失败")?;
1040 Ok(())
1041 }
1042
1043 fn batch_append_sediment(&self, recs: &[ChouKu]) -> Result<()> {
1044 let conn = self.conn.lock();
1045 let tx = conn.unchecked_transaction()?;
1046 for rec in recs {
1047 let val = serde_json::to_string(rec).context("序列化愁苦记录失败")?;
1048 tx.execute("INSERT OR REPLACE INTO sediment (id, val) VALUES (?1, ?2)",
1049 params![rec.id, val]).context("SQLite 批量插入失败")?;
1050 }
1051 tx.commit()?;
1052 Ok(())
1053 }
1054
1055 fn pull_candidates(&self, sensor_id: Option<&str>, limit: usize) -> Result<Vec<ChouKu>> {
1056 let conn = self.conn.lock();
1057 let mut stmt = conn.prepare("SELECT val FROM sediment LIMIT ?1").context("准备失败")?;
1058 let rows = stmt.query_map(params![limit as i64], |r| r.get::<_, String>(0))?;
1059 let mut out = Vec::new();
1060 for row in rows {
1061 let val: String = row?;
1062 if let Ok(rec) = serde_json::from_str::<ChouKu>(&val) {
1063 match rec.state {
1064 ChouChang::Slumber | ChouChang::Awake | ChouChang::Forging(_) => {
1065 if sensor_id.map_or(true, |s| rec.sensor_id == s) { out.push(rec); }
1066 }
1067 _ => {}
1068 }
1069 }
1070 if out.len() >= limit { break; }
1071 }
1072 Ok(out)
1073 }
1074
1075 fn get_by_id(&self, id: &DropletId) -> Result<Option<ChouKu>> {
1076 let conn = self.conn.lock();
1077 self.get_by_id_inner(&conn, id)
1078 }
1079
1080 fn batch_get_by_ids(&self, ids: &[DropletId]) -> Result<Vec<ChouKu>> {
1081 let mut out = Vec::with_capacity(ids.len());
1082 for id in ids {
1083 if let Some(rec) = self.get_by_id(id)? { out.push(rec); }
1084 }
1085 Ok(out)
1086 }
1087
1088 fn mark_forging(&self, ids: &[DropletId], round: u32) -> Result<()> {
1089 let conn = self.conn.lock();
1090 let tx = conn.unchecked_transaction()?;
1091 for id in ids {
1092 if let Some(mut rec) = self.get_by_id_inner(&conn, id)? {
1093 rec.state = ChouChang::Forging(round);
1094 rec.round = round;
1095 let val = serde_json::to_string(&rec).context("序列化失败")?;
1096 tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
1097 }
1098 }
1099 tx.commit()?;
1100 Ok(())
1101 }
1102
1103 fn mark_luminous(&self, results: &[YiLuan]) -> Result<()> {
1104 let conn = self.conn.lock();
1105 let tx = conn.unchecked_transaction()?;
1106 for res in results {
1107 for pid in &res.provenance {
1108 if let Some(mut rec) = self.get_by_id_inner(&conn, pid)? {
1109 rec.state = ChouChang::Luminous(res.round);
1110 let mut meta = rec.metadata.clone().unwrap_or_else(|| serde_json::json!({}));
1111 meta["净化方法"] = serde_json::json!(res.method);
1112 meta["净化置信度"] = serde_json::json!(res.confidence);
1113 meta["净化时间"] = serde_json::json!(res.ts.to_rfc3339());
1114 meta["净化轮次"] = serde_json::json!(res.round);
1115 rec.metadata = Some(meta);
1116 let val = serde_json::to_string(&rec).context("序列化失败")?;
1117 tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, pid])?;
1118 }
1119 }
1120 }
1121 tx.commit()?;
1122 Ok(())
1123 }
1124
1125 fn seal(&self, ids: &[DropletId]) -> Result<()> {
1126 let conn = self.conn.lock();
1127 let tx = conn.unchecked_transaction()?;
1128 for id in ids {
1129 if let Some(mut rec) = self.get_by_id_inner(&conn, id)? {
1130 rec.state = ChouChang::Sealed;
1131 let val = serde_json::to_string(&rec).context("序列化失败")?;
1132 tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
1133 }
1134 }
1135 tx.commit()?;
1136 Ok(())
1137 }
1138
1139 fn scan_prefix(&self, _prefix: &str) -> Result<Vec<ChouKu>> {
1140 let conn = self.conn.lock();
1141 let mut stmt = conn.prepare("SELECT val FROM sediment").context("准备失败")?;
1142 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1143 let mut out = Vec::new();
1144 for row in rows {
1145 if let Ok(rec) = serde_json::from_str::<ChouKu>(&row?) { out.push(rec); }
1146 }
1147 Ok(out)
1148 }
1149
1150 fn flush(&self) -> Result<()> { Ok(()) }
1151
1152 fn apply_shadow(&self, res: &YiLuan) -> Result<()> {
1153 let conn = self.conn.lock();
1154 let key = Self::make_key(&res.ts, &res.sensor_id);
1155 let val = serde_json::to_string(res).context("序列化意乱失败")?;
1156 conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
1157 Ok(())
1158 }
1159
1160 fn promote_shadow(&self, res: &YiLuan) -> Result<()> {
1161 let conn = self.conn.lock();
1162 let key = format!("promoted::{}", Self::make_key(&res.ts, &res.sensor_id));
1163 let val = serde_json::to_string(res).context("序列化意乱失败")?;
1164 conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
1165 Ok(())
1166 }
1167
1168 fn export_shadow(&self, path: &PathBuf) -> Result<()> {
1169 let conn = self.conn.lock();
1170 let mut stmt = conn.prepare("SELECT val FROM shadow").context("准备失败")?;
1171 let mut file = std::fs::File::create(path).context("创建导出文件失败")?;
1172 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1173 for row in rows {
1174 let val: String = row?;
1175 file.write_all(val.as_bytes())?;
1176 file.write_all(b"\n")?;
1177 }
1178 Ok(())
1179 }
1180
1181 fn audit(&self, rec: &QiChuang) -> Result<()> {
1182 let conn = self.conn.lock();
1183 let key = format!("audit::{}", Self::make_key(&rec.ts, &rec.droplet_id));
1184 let val = serde_json::to_string(rec).context("序列化凄怆失败")?;
1185 conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![key, val])?;
1186 Ok(())
1187 }
1188
1189 fn quarantine(&self, rec: &QiChuang) -> Result<()> {
1190 let conn = self.conn.lock();
1191 let ts_key = Self::make_key(&rec.ts, &rec.droplet_id);
1192 let qkey = format!("quarantine::{}", ts_key);
1194 let val = serde_json::to_string(rec).context("序列化凄怆失败")?;
1195 conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![qkey, val.clone()])?;
1196 let akey = format!("audit::{}", ts_key);
1198 conn.execute("INSERT OR REPLACE INTO shadow (key, val) VALUES (?1, ?2)", params![akey, val])?;
1199 Ok(())
1200 }
1201
1202 fn scan_audit(&self) -> Result<Vec<QiChuang>> {
1203 let conn = self.conn.lock();
1204 let mut stmt = conn.prepare("SELECT val FROM shadow WHERE key LIKE 'audit::%'")
1205 .context("准备失败")?;
1206 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1207 let mut out = Vec::new();
1208 for row in rows {
1209 if let Ok(rec) = serde_json::from_str::<QiChuang>(&row?) { out.push(rec); }
1210 }
1211 Ok(out)
1212 }
1213
1214 fn scan_quarantine(&self) -> Result<Vec<QiChuang>> {
1215 let conn = self.conn.lock();
1216 let mut stmt = conn.prepare("SELECT val FROM shadow WHERE key LIKE 'quarantine::%'")
1217 .context("准备失败")?;
1218 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
1219 let mut out = Vec::new();
1220 for row in rows {
1221 if let Ok(rec) = serde_json::from_str::<QiChuang>(&row?) { out.push(rec); }
1222 }
1223 Ok(out)
1224 }
1225
1226 fn export_audit_jsonl(&self, path: &PathBuf) -> Result<usize> {
1227 let records = self.scan_audit()?;
1228 let mut f = BufWriter::new(std::fs::File::create(path).context("创建导出文件失败")?);
1229 let n = records.len();
1230 for rec in &records {
1231 let line = serde_json::to_string(rec).context("序列化失败")?;
1232 f.write_all(line.as_bytes())?;
1233 f.write_all(b"\n")?;
1234 }
1235 f.flush()?;
1236 Ok(n)
1237 }
1238
1239 fn compare_and_swap_state(
1240 &self, id: &DropletId, old_state: ChouChang, new_state: ChouChang, new_round: u32,
1241 ) -> Result<bool> {
1242 let conn = self.conn.lock();
1243 let tx = conn.unchecked_transaction()?;
1244 match self.get_by_id_inner(&conn, id)? {
1245 None => { tx.commit()?; return Ok(false); }
1246 Some(mut rec) => {
1247 if rec.state != old_state { tx.commit()?; return Ok(false); }
1248 rec.state = new_state;
1249 rec.round = new_round;
1250 let val = serde_json::to_string(&rec)?;
1251 tx.execute("UPDATE sediment SET val = ?1 WHERE id = ?2", params![val, id])?;
1252 tx.commit()?;
1253 Ok(true)
1254 }
1255 }
1256 }
1257}
1258
1259pub struct LeiLuoProcessor {
1264 sensor_id: SensorId,
1265 tracker: Arc<AutoXiaoHun>,
1266 scorer: ShenShang,
1267 hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1268 tsdb: Arc<dyn TsdbSink>,
1269 metrics: Arc<dyn MetricsSink>,
1270 seq: AtomicU64,
1271}
1272
1273impl LeiLuoProcessor {
1274 pub fn new(
1275 sensor_id: SensorId,
1276 config: YanHunConfig,
1277 hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1278 tsdb: Arc<dyn TsdbSink>,
1279 metrics: Arc<dyn MetricsSink>,
1280 ) -> Self {
1281 let tracker = Arc::new(AutoXiaoHun::new(config.window_size, config.quantisation_eps));
1282 let scorer = ShenShang::new(config);
1283 Self { sensor_id, tracker, scorer, hunfei_tx, tsdb, metrics, seq: AtomicU64::new(0) }
1284 }
1285
1286 pub async fn process(&self, reading: LeiLuo) -> Result<()> {
1287 let seq = self.seq.fetch_add(1, AtomicOrd::Relaxed);
1288 let (median, mad) = self.tracker.insert_and_compute(reading.value);
1289 let order = self.scorer.assess(reading.value, median, mad, 1);
1290
1291 debug!(
1292 "黯然销魂 sensor={} seq={} value={:.6} median={:.6} mad={:.6} score={:.4} level={}",
1293 self.sensor_id, seq, reading.value, median, mad, order.score, order.level
1294 );
1295
1296 self.metrics.set_gauge(&format!("yanhun.score.{}", self.sensor_id), order.score);
1297
1298 if order.level != PoSan::Untainted {
1299 self.metrics.incr_counter("yanhun.remediation.emitted", 1);
1300 let msg = (self.sensor_id.clone(), order, reading);
1302 if self.hunfei_tx.try_send(msg.clone()).is_err() {
1303 if let Err(e) = self.hunfei_tx.send(msg).await {
1304 return Err(YanHunError::Channel(format!("hunfei send: {}", e)).into());
1305 }
1306 }
1307 }
1308 Ok(())
1309 }
1310
1311 pub fn tracker(&self) -> &Arc<AutoXiaoHun> { &self.tracker }
1312 pub fn snapshot_stats(&self) -> (f64, f64) { self.tracker.snapshot_stats() }
1313}
1314
1315pub struct HunFei {
1320 youming: Arc<dyn YouMingStore>,
1321 qingmi: Arc<QingMi>,
1322 processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1323 config: YanHunConfig,
1324 hunfei_rx: RwLock<Option<mpsc::Receiver<(SensorId, DuanChang, LeiLuo)>>>,
1325 concurrency:Arc<Semaphore>,
1326 running: Arc<AtomicBool>,
1327 tsdb: Arc<dyn TsdbSink>,
1328 metrics: Arc<dyn MetricsSink>,
1329}
1330
1331impl HunFei {
1332 pub fn new(
1333 youming: Arc<dyn YouMingStore>,
1334 qingmi: Arc<QingMi>,
1335 processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1336 config: YanHunConfig,
1337 hunfei_rx: mpsc::Receiver<(SensorId, DuanChang, LeiLuo)>,
1338 tsdb: Arc<dyn TsdbSink>,
1339 metrics: Arc<dyn MetricsSink>,
1340 ) -> Self {
1341 let concurrency = Arc::new(Semaphore::new(config.hunfei_concurrency.max(1)));
1342 Self {
1343 youming, qingmi, processors,
1344 config: config.clone(),
1345 hunfei_rx: RwLock::new(Some(hunfei_rx)),
1346 concurrency, running: Arc::new(AtomicBool::new(false)),
1347 tsdb, metrics,
1348 }
1349 }
1350
1351 pub fn start(self: Arc<Self>) {
1352 if self.running.swap(true, AtomicOrd::SeqCst) { return; }
1353 let this = self.clone();
1354 tokio::spawn(async move {
1355 let mut rx = match this.hunfei_rx.write().take() {
1356 Some(r) => r,
1357 None => { error!("魂飞: hunfei_rx 已被取走"); return; }
1358 };
1359 info!("魂飞: 净化执行器启动 (并发度={})", this.config.hunfei_concurrency);
1360 while let Some((sid, order, reading)) = rx.recv().await {
1361 let permit = match this.concurrency.clone().acquire_owned().await {
1362 Ok(p) => p,
1363 Err(_) => { warn!("魂飞: 信号量已关闭"); break; }
1364 };
1365 let t = this.clone();
1366 tokio::spawn(async move {
1367 let _p = permit;
1368 if let Err(e) = t.execute(&sid, order, reading).await {
1369 error!("魂飞: 执行错误: {:?}", e);
1370 }
1371 });
1372 }
1373 info!("魂飞: 净化执行器退出");
1374 this.running.store(false, AtomicOrd::SeqCst);
1375 });
1376 }
1377
1378 async fn execute(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1379 match order.level {
1380 PoSan::LightGrief => self.apply_light(sid, order, reading).await,
1381 PoSan::DeepSorrow => self.apply_medium(sid, order, reading).await,
1382 PoSan::SoulWithered => self.apply_heavy(sid, order, reading).await,
1383 PoSan::Untainted => Ok(()),
1384 }
1385 }
1386
1387 async fn apply_light(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1389 let (median, mad) = self.snapshot_median_mad(sid, &reading);
1390 let sigma = if mad > f64::EPSILON { mad * MAD_TO_SIGMA } else { 0.0 };
1391 let k = self.config.clip_k;
1392 let clipped = if sigma > f64::EPSILON {
1393 reading.value.max(median - k * sigma).min(median + k * sigma)
1394 } else { median };
1395
1396 let note = format!("裁剪至 [{:.6}, {:.6}] σ={:.6}", median - k*sigma, median + k*sigma, sigma);
1397 self.tsdb.write_corrected(sid, &reading.ts, clipped, ¬e);
1398 self.metrics.incr_counter("yanhun.light", 1);
1399
1400 let rec = QiChuang {
1401 sensor_id: sid.clone(),
1402 droplet_id: reading.id.clone(),
1403 order: order.clone(),
1404 original_value: reading.value,
1405 applied_value: Some(clipped),
1406 note: Some(note),
1407 ts: Utc::now(),
1408 };
1409 self.youming.audit(&rec)?;
1410
1411 self.feed_back(sid, clipped, &reading).await;
1413
1414 if self.config.verification_enabled { self.spawn_verify(sid.clone(), clipped).await; }
1415 Ok(())
1416 }
1417
1418 async fn apply_medium(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1420 let bg = self.qingmi.get(sid, reading.value);
1421 let alpha = self.config.blend_alpha;
1422 let blended = alpha * bg + (1.0 - alpha) * reading.value;
1423 self.qingmi.update(sid, blended);
1424
1425 let note = format!("背景混合 bg={:.6} α={:.3}", bg, alpha);
1426 self.tsdb.write_corrected(sid, &reading.ts, blended, ¬e);
1427 self.metrics.incr_counter("yanhun.medium", 1);
1428
1429 let rec = QiChuang {
1430 sensor_id: sid.clone(),
1431 droplet_id: reading.id.clone(),
1432 order: order.clone(),
1433 original_value: reading.value,
1434 applied_value: Some(blended),
1435 note: Some(note),
1436 ts: Utc::now(),
1437 };
1438 self.youming.audit(&rec)?;
1439 self.feed_back(sid, blended, &reading).await;
1440
1441 if self.config.verification_enabled { self.spawn_verify(sid.clone(), blended).await; }
1442 Ok(())
1443 }
1444
1445 async fn apply_heavy(&self, sid: &SensorId, order: DuanChang, reading: LeiLuo) -> Result<()> {
1447 let rec = QiChuang {
1448 sensor_id: sid.clone(),
1449 droplet_id: reading.id.clone(),
1450 order: order.clone(),
1451 original_value: reading.value,
1452 applied_value: None,
1453 note: Some("销魂隔离: 已阻断主链路".into()),
1454 ts: Utc::now(),
1455 };
1456 self.youming.quarantine(&rec)?;
1458 self.metrics.incr_counter("yanhun.heavy", 1);
1459 warn!("销魂: sensor={} score={:.4} 已隔离入幽冥", sid, order.score);
1460 Ok(())
1461 }
1462
1463 fn snapshot_median_mad(&self, sid: &SensorId, reading: &LeiLuo) -> (f64, f64) {
1464 let map = self.processors.read();
1465 if let Some(proc) = map.get(sid) { proc.snapshot_stats() }
1466 else { (reading.value, 0.0) }
1467 }
1468
1469 async fn feed_back(&self, sid: &SensorId, value: f64, original: &LeiLuo) {
1470 let proc = { self.processors.read().get(sid).cloned() };
1471 if let Some(p) = proc {
1472 let corrected = LeiLuo { value, ts: Utc::now(), ..original.clone() };
1473 if let Err(e) = p.process(corrected).await {
1474 warn!("魂飞: 反馈失败 {}: {:?}", sid, e);
1475 }
1476 }
1477 }
1478
1479 async fn spawn_verify(&self, sid: SensorId, applied: f64) {
1480 let qingmi = self.qingmi.clone();
1481 let delay = self.config.shadow_verify_delay_ms;
1482 tokio::spawn(async move {
1483 sleep(Duration::from_millis(delay)).await;
1484 let bg = qingmi.get(&sid, applied);
1485 let delta = (applied - bg).abs();
1486 if delta < 1e-3 {
1487 debug!("验证: sensor={} applied≈bg → 有效", sid);
1488 } else {
1489 info!("验证: sensor={} Δ={:.6} applied={:.6} bg={:.6}", sid, delta, applied, bg);
1490 }
1491 });
1492 }
1493}
1494
1495struct ShardEntry {
1500 tx: mpsc::Sender<LeiLuo>,
1501 tracker: Arc<AutoXiaoHun>,
1502}
1503
1504struct JingShenShard {
1505 entries: RwLock<HashMap<SensorId, ShardEntry>>,
1506}
1507
1508impl JingShenShard {
1509 fn new() -> Self { Self { entries: RwLock::new(HashMap::new()) } }
1510
1511 fn get_or_create(
1512 &self,
1513 sid: &SensorId,
1514 config: &YanHunConfig,
1515 hunfei_tx: &mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1516 processors: &Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1517 trackers: &Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>>,
1518 tsdb: &Arc<dyn TsdbSink>,
1519 metrics: &Arc<dyn MetricsSink>,
1520 ) -> mpsc::Sender<LeiLuo> {
1521 {
1523 let map = self.entries.read();
1524 if let Some(e) = map.get(sid) { return e.tx.clone(); }
1525 }
1526 let mut map = self.entries.write();
1528 if let Some(e) = map.get(sid) { return e.tx.clone(); }
1529
1530 let (tx, rx) = mpsc::channel::<LeiLuo>(config.per_sensor_queue);
1531 let proc = Arc::new(LeiLuoProcessor::new(
1532 sid.clone(), config.clone(), hunfei_tx.clone(), tsdb.clone(), metrics.clone(),
1533 ));
1534 let tracker_arc = proc.tracker().clone();
1535
1536 processors.write().insert(sid.clone(), proc.clone());
1538 trackers.write().insert(sid.clone(), tracker_arc.clone());
1539
1540 let batch_max = config.batch_max_size;
1542 let batch_wait = config.batch_max_wait_ms;
1543 let tsdb_c = tsdb.clone();
1544 let proc_c = proc.clone();
1545
1546 tokio::spawn(async move {
1547 let mut batch: Vec<LeiLuo> = Vec::with_capacity(batch_max);
1548 let mut last_flush = Instant::now();
1549 let mut inbound = rx;
1550 let wait = Duration::from_millis(batch_wait);
1551
1552 loop {
1553 tokio::select! {
1554 maybe = inbound.recv() => {
1555 match maybe {
1556 Some(r) => {
1557 tsdb_c.write_raw(&r);
1558 batch.push(r);
1559 if batch.len() >= batch_max {
1560 for item in batch.drain(..) {
1561 if let Err(e) = proc_c.process(item).await {
1562 error!("惊神: worker 处理失败: {:?}", e);
1563 }
1564 }
1565 last_flush = Instant::now();
1566 }
1567 }
1568 None => {
1569 for item in batch.drain(..) {
1570 let _ = proc_c.process(item).await;
1571 }
1572 break;
1573 }
1574 }
1575 }
1576 _ = sleep(wait) => {
1577 if !batch.is_empty() && last_flush.elapsed() >= wait {
1578 for item in batch.drain(..) {
1579 if let Err(e) = proc_c.process(item).await {
1580 error!("惊神: worker 批次处理失败: {:?}", e);
1581 }
1582 }
1583 last_flush = Instant::now();
1584 }
1585 }
1586 }
1587 }
1588 debug!("惊神: sensor worker {} 退出", proc_c.sensor_id);
1589 });
1590
1591 map.insert(sid.clone(), ShardEntry { tx: tx.clone(), tracker: tracker_arc });
1592 tx
1593 }
1594}
1595
1596pub struct JingShen {
1597 shards: Vec<Arc<JingShenShard>>,
1598 shard_count: usize,
1599 config: YanHunConfig,
1600 hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1601 processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1602 trackers: Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>>,
1604 tsdb: Arc<dyn TsdbSink>,
1605 metrics: Arc<dyn MetricsSink>,
1606}
1607
1608impl JingShen {
1609 pub fn new(
1610 config: YanHunConfig,
1611 hunfei_tx: mpsc::Sender<(SensorId, DuanChang, LeiLuo)>,
1612 processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1613 tsdb: Arc<dyn TsdbSink>,
1614 metrics: Arc<dyn MetricsSink>,
1615 ) -> Self {
1616 let shard_count = config.shard_count.max(1);
1617 let shards = (0..shard_count).map(|_| Arc::new(JingShenShard::new())).collect();
1618 Self {
1619 shards, shard_count, config, hunfei_tx, processors,
1620 trackers: Arc::new(RwLock::new(HashMap::new())),
1621 tsdb, metrics,
1622 }
1623 }
1624
1625 fn shard_index(&self, sensor_id: &str) -> usize {
1627 let mut h: u64 = 0xcbf29ce484222325;
1628 for b in sensor_id.bytes() { h ^= b as u64; h = h.wrapping_mul(0x100000001b3); }
1629 (h as usize) % self.shard_count
1630 }
1631
1632 pub async fn submit(&self, reading: LeiLuo) -> Result<()> {
1633 let idx = self.shard_index(&reading.sensor_id);
1634 let sender = self.shards[idx].get_or_create(
1635 &reading.sensor_id, &self.config, &self.hunfei_tx,
1636 &self.processors, &self.trackers, &self.tsdb, &self.metrics,
1637 );
1638 sender.send(reading).await
1639 .map_err(|e| YanHunError::Channel(format!("惊神 submit: {}", e)).into())
1640 }
1641
1642 pub fn trackers(&self) -> Arc<RwLock<HashMap<SensorId, Arc<AutoXiaoHun>>>> {
1643 self.trackers.clone()
1644 }
1645}
1646
1647pub struct BeiQie {
1652 config: YanHunConfig,
1653 metrics: Arc<dyn MetricsSink>,
1654}
1655
1656impl BeiQie {
1657 pub fn new(config: YanHunConfig, metrics: Arc<dyn MetricsSink>) -> Self {
1658 Self { config, metrics }
1659 }
1660
1661 pub async fn verify_and_promote(&self, res: YiLuan, youming: Arc<dyn YouMingStore>) -> Result<()> {
1662 if res.confidence < self.config.shadow_min_confidence {
1663 self.metrics.incr_counter("beiqie.skipped_low_conf", 1);
1664 return Ok(());
1665 }
1666 youming.apply_shadow(&res).context("施加暗影失败")?;
1667 self.metrics.incr_counter("beiqie.applied", 1);
1668 sleep(Duration::from_millis(self.config.shadow_verify_delay_ms)).await;
1669
1670 let roll: f64 = rand::thread_rng().gen();
1671 if roll <= res.confidence {
1672 youming.promote_shadow(&res).context("晋升暗影失败")?;
1673 youming.seal(&res.provenance)?;
1674 self.metrics.incr_counter("beiqie.promoted", 1);
1675 } else {
1676 self.metrics.incr_counter("beiqie.rejected", 1);
1677 }
1678 Ok(())
1679 }
1680}
1681
1682pub struct AiHui {
1687 youming: Arc<dyn YouMingStore>,
1688 config: YanHunConfig,
1689}
1690
1691impl AiHui {
1692 pub fn new(youming: Arc<dyn YouMingStore>, config: YanHunConfig) -> Self {
1693 Self { youming, config }
1694 }
1695
1696 pub fn process_batch(&self, batch: Vec<ChouKu>) -> Result<Vec<YiLuan>> {
1697 if batch.is_empty() { return Ok(vec![]); }
1698
1699 let mut by_sensor: HashMap<SensorId, Vec<ChouKu>> = HashMap::new();
1700 for rec in batch { by_sensor.entry(rec.sensor_id.clone()).or_default().push(rec); }
1701
1702 let mut out = Vec::new();
1703 for (sid, mut recs) in by_sensor {
1704 recs.sort_by_key(|r| r.ts);
1705 let values: Vec<f64> = recs.iter().map(|r| r.value).collect();
1706
1707 let eps = estimate_dbscan_eps(&values);
1709 let min_samples = ((values.len() as f64) * self.config.dbscan_min_samples_ratio)
1710 .ceil() as usize;
1711 let min_samples = min_samples.max(1);
1712 let labels = dbscan_1d(&values, eps, min_samples);
1713
1714 let mut clusters: HashMap<i32, Vec<(usize, f64)>> = HashMap::new();
1715 for (i, &lab) in labels.iter().enumerate() {
1716 clusters.entry(lab).or_default().push((i, values[i]));
1717 }
1718
1719 for (lab, members) in clusters {
1720 let vals: Vec<f64> = members.iter().map(|(_, v)| *v).collect();
1721
1722 if lab == -1 {
1723 if vals.len() >= 3 {
1725 let mut tmp = vals.clone();
1726 let mid = tmp.len() / 2;
1727 tmp.select_nth_unstable_by(mid, f64::total_cmp);
1728 let rep = tmp[mid];
1729 let conf = compute_confidence(&vals, rep);
1730 if conf >= self.config.promote_confidence {
1731 let prov: Vec<DropletId> =
1732 members.iter().map(|(idx, _)| recs[*idx].id.clone()).collect();
1733 out.push(YiLuan {
1734 sensor_id: sid.clone(),
1735 ts: Utc::now(),
1736 corrected_value: rep,
1737 confidence: conf,
1738 method: "DBSCAN_Noise_Median".into(),
1739 round: 1,
1740 provenance: prov,
1741 metrics: serde_json::json!({"eps": eps, "cluster": lab, "size": vals.len()}),
1742 });
1743 }
1744 }
1745 continue;
1746 }
1747
1748 let delta = match self.config.huber_strategy {
1750 HuberStrategy::Fixed(d) => d,
1751 HuberStrategy::AutoMAD => estimate_huber_delta(&vals),
1752 };
1753 let huber_est = huber_robust_location(&vals, delta, 50);
1754
1755 let mut kf = Kalman1D::new(self.config.kalman_q, self.config.kalman_r);
1756 let smoothed = kf.smooth_sequence(&vals);
1757 let mut tmp = smoothed;
1758 let mid = tmp.len() / 2;
1759 tmp.select_nth_unstable_by(mid, f64::total_cmp);
1760 let kalman_est = tmp[mid];
1761
1762 let combined = 0.5 * huber_est + 0.5 * kalman_est;
1763 let conf = compute_confidence(&vals, combined);
1764 let prov: Vec<DropletId> =
1765 members.iter().map(|(idx, _)| recs[*idx].id.clone()).collect();
1766
1767 out.push(YiLuan {
1768 sensor_id: sid.clone(),
1769 ts: Utc::now(),
1770 corrected_value: combined,
1771 confidence: conf,
1772 method: "DBSCAN_Huber_Kalman_Fusion".into(),
1773 round: 1,
1774 provenance: prov,
1775 metrics: serde_json::json!({
1776 "eps": eps, "min_samples": min_samples,
1777 "cluster_size": vals.len(),
1778 "huber_delta": delta,
1779 "huber_est": huber_est,
1780 "kalman_est": kalman_est,
1781 "combined": combined,
1782 }),
1783 });
1784 }
1785 }
1786 Ok(out)
1787 }
1788}
1789
1790pub struct XinLuan {
1795 youming: Arc<dyn YouMingStore>,
1796 config: YanHunConfig,
1797 worker_tx: mpsc::Sender<Vec<ChouKu>>,
1798 metrics: Arc<dyn MetricsSink>,
1799 beiqie: Arc<BeiQie>,
1800}
1801
1802impl XinLuan {
1803 pub fn new(
1804 youming: Arc<dyn YouMingStore>,
1805 config: YanHunConfig,
1806 worker_tx: mpsc::Sender<Vec<ChouKu>>,
1807 metrics: Arc<dyn MetricsSink>,
1808 beiqie: Arc<BeiQie>,
1809 ) -> Self {
1810 Self { youming, config, worker_tx, metrics, beiqie }
1811 }
1812
1813 pub async fn run(&self) -> Result<()> {
1814 info!("心乱: 多轮净化调度器启动");
1815 loop {
1816 let batch = self.youming.pull_candidates(None, self.config.multi_round_batch_size)?;
1817 if batch.is_empty() { sleep(Duration::from_millis(200)).await; continue; }
1818
1819 let shards = self.shard_batch(batch);
1820 for shard_batch in shards {
1821 let mut to_dispatch = Vec::new();
1822 for rec in shard_batch {
1823 if self.youming.compare_and_swap_state(&rec.id, ChouChang::Slumber, ChouChang::Forging(1), 1)? {
1824 if let Some(latest) = self.youming.get_by_id(&rec.id)? {
1825 to_dispatch.push(latest);
1826 }
1827 } else {
1828 self.metrics.incr_counter("xinluan.cas_conflict", 1);
1829 }
1830 }
1831 if !to_dispatch.is_empty() {
1832 if let Err(e) = self.worker_tx.send(to_dispatch).await {
1833 error!("心乱: 分发至肠断池失败: {:?}", e);
1834 } else {
1835 self.metrics.incr_counter("xinluan.dispatched", 1);
1836 }
1837 }
1838 }
1839 sleep(Duration::from_millis(10)).await;
1840 }
1841 }
1842
1843 fn shard_batch(&self, batch: Vec<ChouKu>) -> Vec<Vec<ChouKu>> {
1844 let n = self.config.multi_round_concurrency.max(1);
1845 let mut shards: Vec<Vec<ChouKu>> = vec![Vec::new(); n];
1846 for rec in batch {
1847 let mut h: u64 = 0xcbf29ce484222325;
1848 for b in rec.sensor_id.bytes() { h ^= b as u64; h = h.wrapping_mul(0x100000001b3); }
1849 shards[(h as usize) % n].push(rec);
1850 }
1851 shards.into_iter().filter(|v| !v.is_empty()).collect()
1852 }
1853}
1854
1855pub struct ChangDuan {
1860 worker_rx: mpsc::Receiver<Vec<ChouKu>>,
1861 youming: Arc<dyn YouMingStore>,
1862 config: YanHunConfig,
1863 metrics: Arc<dyn MetricsSink>,
1864 beiqie: Arc<BeiQie>,
1865 sem: Arc<Semaphore>,
1866}
1867
1868impl ChangDuan {
1869 pub fn new(
1870 worker_rx: mpsc::Receiver<Vec<ChouKu>>,
1871 youming: Arc<dyn YouMingStore>,
1872 config: YanHunConfig,
1873 metrics: Arc<dyn MetricsSink>,
1874 beiqie: Arc<BeiQie>,
1875 ) -> Self {
1876 let concurrency = config.multi_round_concurrency.max(1);
1877 Self { worker_rx, youming, config, metrics, beiqie, sem: Arc::new(Semaphore::new(concurrency)) }
1878 }
1879
1880 pub async fn run(mut self) {
1881 info!("肠断: 并行 Worker 池启动,并发度={}", self.config.multi_round_concurrency);
1882 while let Some(batch) = self.worker_rx.recv().await {
1883 let permit = match self.sem.clone().acquire_owned().await {
1884 Ok(p) => p,
1885 Err(_) => { warn!("肠断: 信号量已关闭"); break; }
1886 };
1887 let youming = self.youming.clone();
1888 let cfg = self.config.clone();
1889 let metrics = self.metrics.clone();
1890 let beiqie = self.beiqie.clone();
1891 task::spawn(async move {
1892 let _p = permit;
1893 if let Err(e) = Self::multi_round_purify(youming, cfg, batch, metrics, beiqie).await {
1894 error!("肠断: 批次净化失败: {:?}", e);
1895 }
1896 });
1897 }
1898 info!("肠断: Worker 池退出");
1899 }
1900
1901 async fn multi_round_purify(
1902 youming: Arc<dyn YouMingStore>,
1903 cfg: YanHunConfig,
1904 initial_batch: Vec<ChouKu>,
1905 metrics: Arc<dyn MetricsSink>,
1906 beiqie: Arc<BeiQie>,
1907 ) -> Result<()> {
1908 let mut current = initial_batch;
1909 let mut round = 1u32;
1910 let mut last_conf = 0.0f64;
1911
1912 loop {
1913 let start = Instant::now();
1914 let purifier = AiHui::new(youming.clone(), cfg.clone());
1915 let results = purifier.process_batch(current.clone())?;
1916
1917 metrics.incr_counter("yanhun.round_executed", 1);
1918 let avg_conf = if results.is_empty() { 0.0 }
1919 else { results.iter().map(|r| r.confidence).sum::<f64>() / results.len() as f64 };
1920 metrics.set_gauge("yanhun.avg_confidence", avg_conf);
1921
1922 if !results.is_empty() {
1923 youming.mark_luminous(&results)?;
1924 for res in results {
1925 let beiqie_c = beiqie.clone();
1926 let ym_c = youming.clone();
1927 task::spawn(async move {
1928 if let Err(e) = beiqie_c.verify_and_promote(res, ym_c).await {
1929 warn!("悲切验证/晋升失败: {:?}", e);
1930 }
1931 });
1932 }
1933 }
1934
1935 if (avg_conf - last_conf).abs() < cfg.convergence_delta {
1936 metrics.incr_counter("yanhun.converged", 1);
1937 break;
1938 }
1939 last_conf = avg_conf;
1940 round += 1;
1941 if round > cfg.max_rounds { metrics.incr_counter("yanhun.max_rounds_reached", 1); break; }
1942
1943 let ids: Vec<DropletId> = current.iter().map(|r| r.id.clone()).collect();
1944 let next = youming.batch_get_by_ids(&ids)?;
1945 if next.is_empty() { break; }
1946 current = next;
1947
1948 let elapsed = start.elapsed();
1949 if elapsed < Duration::from_millis(50) { sleep(Duration::from_millis(10)).await; }
1950 }
1951 Ok(())
1952 }
1953}
1954
1955pub struct YanHun {
1960 jingshen: Arc<JingShen>,
1961 hunfei: Arc<HunFei>,
1962 youming: Arc<dyn YouMingStore>,
1963 qingmi: Arc<QingMi>,
1964 processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>>,
1965 ingestion_tx:mpsc::Sender<LeiLuo>,
1966 config: YanHunConfig,
1967 tsdb: Arc<dyn TsdbSink>,
1968 metrics: Arc<dyn MetricsSink>,
1969 beiqie: Arc<BeiQie>,
1970 xinluan: Option<Arc<XinLuan>>,
1971 multi_tx: Option<mpsc::Sender<Vec<ChouKu>>>,
1972}
1973
1974impl YanHun {
1975 pub fn initialize(config: YanHunConfig, youming: Arc<dyn YouMingStore>) -> Self {
1977 let processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>> =
1978 Arc::new(RwLock::new(HashMap::new()));
1979
1980 let tsdb: Arc<dyn TsdbSink> = Arc::new(NoopTsdb);
1981 let metrics: Arc<dyn MetricsSink> = Arc::new(NoopMetrics);
1982
1983 let (hunfei_tx, hunfei_rx) = mpsc::channel::<(SensorId, DuanChang, LeiLuo)>(config.ingestion_capacity);
1984 let (ingest_tx, mut ingest_rx) = mpsc::channel::<LeiLuo>(config.ingestion_capacity);
1985
1986 let qingmi = Arc::new(QingMi::new(config.qingmi_alpha));
1987 let beiqie = Arc::new(BeiQie::new(config.clone(), metrics.clone()));
1988
1989 let jingshen = Arc::new(JingShen::new(
1990 config.clone(), hunfei_tx.clone(), processors.clone(), tsdb.clone(), metrics.clone(),
1991 ));
1992
1993 let hunfei = Arc::new(HunFei::new(
1994 youming.clone(), qingmi.clone(), processors.clone(), config.clone(),
1995 hunfei_rx, tsdb.clone(), metrics.clone(),
1996 ));
1997
1998 {
2000 let js = jingshen.clone();
2001 tokio::spawn(async move {
2002 while let Some(r) = ingest_rx.recv().await {
2003 if let Err(e) = js.submit(r).await {
2004 error!("黯然: 摄入路由错误: {:?}", e);
2005 }
2006 }
2007 info!("黯然: 摄入调度器退出");
2008 });
2009 }
2010
2011 Self {
2012 jingshen, hunfei, youming, qingmi, processors,
2013 ingestion_tx: ingest_tx, config, tsdb, metrics, beiqie,
2014 xinluan: None, multi_tx: None,
2015 }
2016 }
2017
2018 pub fn initialize_with_sinks(
2020 config: YanHunConfig,
2021 youming: Arc<dyn YouMingStore>,
2022 tsdb: Arc<dyn TsdbSink>,
2023 metrics: Arc<dyn MetricsSink>,
2024 ) -> Self {
2025 let processors: Arc<RwLock<HashMap<SensorId, Arc<LeiLuoProcessor>>>> =
2026 Arc::new(RwLock::new(HashMap::new()));
2027
2028 let (hunfei_tx, hunfei_rx) = mpsc::channel::<(SensorId, DuanChang, LeiLuo)>(config.ingestion_capacity);
2029 let (ingest_tx, mut ingest_rx) = mpsc::channel::<LeiLuo>(config.ingestion_capacity);
2030
2031 let qingmi = Arc::new(QingMi::new(config.qingmi_alpha));
2032 let beiqie = Arc::new(BeiQie::new(config.clone(), metrics.clone()));
2033
2034 let jingshen = Arc::new(JingShen::new(
2035 config.clone(), hunfei_tx.clone(), processors.clone(), tsdb.clone(), metrics.clone(),
2036 ));
2037 let hunfei = Arc::new(HunFei::new(
2038 youming.clone(), qingmi.clone(), processors.clone(), config.clone(),
2039 hunfei_rx, tsdb.clone(), metrics.clone(),
2040 ));
2041
2042 {
2043 let js = jingshen.clone();
2044 tokio::spawn(async move {
2045 while let Some(r) = ingest_rx.recv().await {
2046 if let Err(e) = js.submit(r).await {
2047 error!("黯然: 摄入路由错误: {:?}", e);
2048 }
2049 }
2050 });
2051 }
2052
2053 Self {
2054 jingshen, hunfei, youming, qingmi, processors,
2055 ingestion_tx: ingest_tx, config, tsdb, metrics, beiqie,
2056 xinluan: None, multi_tx: None,
2057 }
2058 }
2059
2060 pub fn start(&self) {
2062 self.hunfei.clone().start();
2063 info!("黯然销魂系统: 启动完成");
2064 }
2065
2066 pub fn start_multi_round(&mut self) {
2068 let (tx, rx) = mpsc::channel::<Vec<ChouKu>>(self.config.shard_count * 4);
2069 let xinluan = Arc::new(XinLuan::new(
2070 self.youming.clone(), self.config.clone(), tx.clone(),
2071 self.metrics.clone(), self.beiqie.clone(),
2072 ));
2073 let changduan = ChangDuan::new(
2074 rx, self.youming.clone(), self.config.clone(),
2075 self.metrics.clone(), self.beiqie.clone(),
2076 );
2077 let xl = xinluan.clone();
2078 tokio::spawn(async move { if let Err(e) = xl.run().await { error!("心乱运行错误: {:?}", e); } });
2079 tokio::spawn(async move { changduan.run().await; });
2080 self.xinluan = Some(xinluan);
2081 self.multi_tx = Some(tx);
2082 info!("黯然销魂系统: 多轮迭代净化已启动");
2083 }
2084
2085 pub async fn ingest(&self, reading: LeiLuo) -> Result<()> {
2087 self.ingestion_tx.send(reading).await
2088 .map_err(|e| anyhow!("黯然: ingest 失败: {}", e))
2089 }
2090
2091 pub async fn ingest_batch(&self, readings: Vec<LeiLuo>) -> Result<()> {
2093 for r in readings { self.ingest(r).await?; }
2094 Ok(())
2095 }
2096
2097 pub async fn run_daemon(self) -> Result<()> {
2099 info!("黯然销魂系统: 守护模式运行中 — 按 Ctrl+C 停止");
2100 tokio::signal::ctrl_c().await.context("ctrl-c 监听失败")?;
2101 info!("黯然销魂系统: 收到关闭信号,正在排空管道…");
2102 sleep(Duration::from_millis(800)).await;
2103 self.youming.flush()?;
2104 info!("黯然销魂系统: 关闭完成 — 黯然销魂,万念俱灰");
2105 Ok(())
2106 }
2107
2108 pub fn export_audit(&self, path: &PathBuf) -> Result<usize> {
2110 self.youming.export_audit_jsonl(path)
2111 }
2112
2113 pub async fn replay_quarantine(&self) -> Result<usize> {
2115 let records = self.youming.scan_quarantine()?;
2116 let count = records.len();
2117 for rec in records {
2118 let reading = LeiLuo {
2119 id: rec.droplet_id,
2120 sensor_id: rec.sensor_id,
2121 ts: rec.ts,
2122 value: rec.original_value,
2123 quality: None,
2124 metadata: None,
2125 };
2126 self.ingest(reading).await?;
2127 sleep(Duration::from_millis(5)).await;
2128 }
2129 info!("回放: 重新注入 {} 条隔离记录", count);
2130 Ok(count)
2131 }
2132
2133 pub fn youming(&self) -> &Arc<dyn YouMingStore> { &self.youming }
2134 pub fn config(&self) -> &YanHunConfig { &self.config }
2135}
2136
2137#[derive(Parser, Debug)]
2142#[command(
2143 name = "yanhun",
2144 about = "黯然销魂系统 v1.0 — 工业级具身智能传感器数据流净化引擎",
2145 version = "1.0.0"
2146)]
2147pub struct Cli {
2148 #[arg(short, long)]
2150 pub config: Option<PathBuf>,
2151
2152 #[arg(short = 'd', long, default_value = "./yanhunsystem_db")]
2154 pub db_path: String,
2155
2156 #[arg(long, default_value = "sled")]
2158 pub backend: String,
2159
2160 #[arg(short = 't', long, default_value = "leiluo")]
2162 pub tree_name: String,
2163
2164 #[arg(long, default_value = "anying")]
2166 pub shadow_tree_name: String,
2167
2168 #[arg(short = 's', long)]
2170 pub shards: Option<usize>,
2171
2172 #[arg(short = 'w', long)]
2174 pub window_size: Option<usize>,
2175
2176 #[arg(long)]
2178 pub threshold_light: Option<f64>,
2179
2180 #[arg(long)]
2182 pub threshold_heavy: Option<f64>,
2183
2184 #[arg(long, default_value_t = true)]
2186 pub enable_multi_round: bool,
2187
2188 #[arg(long, default_value_t = false)]
2190 pub simulate: bool,
2191
2192 #[arg(long, default_value_t = 50)]
2194 pub sim_interval_ms: u64,
2195
2196 #[arg(long)]
2198 pub export: Option<PathBuf>,
2199
2200 #[arg(long, default_value_t = false)]
2202 pub replay: bool,
2203
2204 #[arg(long)]
2206 pub export_shadow: Option<PathBuf>,
2207}
2208
2209pub fn start_simulation(system: Arc<YanHun>, interval_ms: u64) -> tokio::task::JoinHandle<()> {
2214 tokio::spawn(async move {
2215 let sensors = ["kinoko-1", "suimei-imu", "byakuya-prox", "yoru-wheel", "tsukikage-gps"];
2216 let mut id_ctr = AtomicU64::new(0);
2217 loop {
2218 for &sid in &sensors {
2219 let v = if rand::thread_rng().gen::<f64>() < 0.995 {
2220 10.0 + (rand::thread_rng().gen::<f64>() - 0.5) * 0.5
2221 } else {
2222 200.0 + rand::thread_rng().gen::<f64>() * 50.0
2223 };
2224 let seq = id_ctr.fetch_add(1, AtomicOrd::Relaxed);
2225 let r = LeiLuo {
2226 id: format!("sim-{}-{}", sid, seq),
2227 sensor_id: sid.to_string(),
2228 ts: Utc::now(),
2229 value: v,
2230 quality: Some(1.0),
2231 metadata: None,
2232 };
2233 if let Err(e) = system.ingest(r).await {
2234 error!("模拟器: 提交错误: {:?}", e);
2235 }
2236 }
2237 sleep(Duration::from_millis(interval_ms)).await;
2238 }
2239 })
2240}
2241
2242#[tokio::main(flavor = "multi_thread")]
2247async fn main() -> Result<()> {
2248 env_logger::Builder::from_env(
2249 env_logger::Env::default().default_filter_or("info")
2250 ).format_timestamp_millis().init();
2251
2252 let cli = Cli::parse();
2253
2254 let mut app_cfg = if let Some(ref path) = cli.config {
2256 AppConfig::load(path).unwrap_or_else(|e| {
2257 warn!("配置文件加载失败 ({:?}),使用默认配置", e);
2258 AppConfig::default()
2259 })
2260 } else {
2261 AppConfig::default()
2262 };
2263
2264 app_cfg = app_cfg.apply_env();
2265
2266 if let Some(v) = cli.shards { app_cfg.shard_count = Some(v); }
2268 if let Some(v) = cli.window_size { app_cfg.window_size = Some(v); }
2269 if let Some(v) = cli.threshold_light { app_cfg.threshold_light = Some(v); }
2270 if let Some(v) = cli.threshold_heavy { app_cfg.threshold_heavy = Some(v); }
2271
2272 let base_cfg = YanHunConfig {
2273 db_path: cli.db_path.clone(),
2274 tree_name: cli.tree_name.clone(),
2275 shadow_tree_name: cli.shadow_tree_name.clone(),
2276 persistence_backend: PersistenceBackend::Sqlite,
2277 ..YanHunConfig::default()
2278 };
2279
2280 let final_cfg = app_cfg.into_inner_config(base_cfg);
2281 final_cfg.validate().context("配置校验失败")?;
2282
2283 let youming: Arc<dyn YouMingStore> = if final_cfg.db_path == ":memory:" {
2285 Arc::new(SqliteYouMing::open_in_memory()?)
2286 } else {
2287 Arc::new(SqliteYouMing::open(&final_cfg.db_path)?)
2288 };
2289
2290 if let Some(ref out) = cli.export {
2292 info!("导出: 审计记录 → {:?}", out);
2293 let n = youming.export_audit_jsonl(out)?;
2294 info!("导出完成: {} 条记录", n);
2295 return Ok(());
2296 }
2297 if let Some(ref out) = cli.export_shadow {
2298 info!("导出: 暗影记录 → {:?}", out);
2299 youming.export_shadow(out)?;
2300 info!("暗影导出完成");
2301 return Ok(());
2302 }
2303
2304 let mut system = YanHun::initialize(final_cfg.clone(), youming.clone());
2306 system.start();
2307 if cli.enable_multi_round { system.start_multi_round(); }
2308
2309 info!(
2311 "黯然销魂系统上线 | shards={} window={} eps={} backend={:?} \
2312 thresholds=[{:.1}/{:.1}/{:.1}] multi_round={} verify={}",
2313 final_cfg.shard_count,
2314 final_cfg.window_size,
2315 final_cfg.quantisation_eps,
2316 final_cfg.persistence_backend,
2317 final_cfg.threshold_light,
2318 final_cfg.threshold_medium,
2319 final_cfg.threshold_heavy,
2320 cli.enable_multi_round,
2321 final_cfg.verification_enabled,
2322 );
2323
2324 if cli.replay {
2326 info!("回放: 重新注入隔离记录");
2327 let n = system.replay_quarantine().await?;
2328 info!("回放完成: {} 条", n);
2329 return Ok(());
2330 }
2331
2332 let system = Arc::new(system);
2333
2334 let sim = if cli.simulate {
2336 info!("模拟器: 启动合成数据流 ({}ms 间隔)", cli.sim_interval_ms);
2337 Some(start_simulation(system.clone(), cli.sim_interval_ms))
2338 } else {
2339 None
2340 };
2341
2342 tokio::signal::ctrl_c().await?;
2344 info!("黯然销魂系统: 收到关闭信号");
2345 if let Some(h) = sim { h.abort(); }
2346 info!("黯然销魂系统: 正在排空管道…");
2347 sleep(Duration::from_millis(800)).await;
2348 youming.flush()?;
2349 info!("黯然销魂系统: 关闭完成 — 此情可待成追忆,只是当时已惘然");
2350 Ok(())
2351}
2352
2353#[cfg(test)]
2358mod tests {
2359 use super::*;
2360 use tempfile::tempdir;
2361
2362 fn test_config() -> YanHunConfig {
2363 YanHunConfig {
2364 window_size: 100,
2365 quantisation_eps: 1e-3,
2366 threshold_light: 2.5,
2367 threshold_medium: 4.5,
2368 threshold_heavy: 8.0,
2369 ingestion_capacity: 512,
2370 per_sensor_queue: 64,
2371 shard_count: 2,
2372 batch_max_size: 16,
2373 batch_max_wait_ms: 5,
2374 max_rounds: 3,
2375 verification_enabled: false,
2376 multi_round_concurrency: 2,
2377 ..YanHunConfig::default()
2378 }
2379 }
2380
2381 #[test]
2384 fn test_quantise_roundtrip() {
2385 let eps = 1e-3;
2386 let v = 12.345678_f64;
2387 let mut t = OrderStatTracker::new(50, eps);
2388 let b = (v / eps).round() as i64;
2389 let v2 = (b as f64) * eps;
2390 assert!((v - v2).abs() <= eps, "量化往返误差过大: {} vs {}", v, v2);
2391 }
2392
2393 #[test]
2394 fn test_dual_heap_median() {
2395 let mut t = DualHeapTracker::new(7);
2396 for v in 1..=7 { let _ = t.insert_and_compute(v as f64); }
2397 assert!((t.current_median() - 4.0).abs() < 1e-9);
2398 }
2399
2400 #[test]
2401 fn test_dual_heap_mad_constant_sequence() {
2402 let mut t = DualHeapTracker::new(11);
2403 for _ in 0..11 { let _ = t.insert_and_compute(10.0); }
2404 let (_, mad) = t.snapshot_stats();
2405 assert!(mad < 1e-9, "常数序列 MAD 应为 0,实际={}", mad);
2406 }
2407
2408 #[test]
2409 fn test_order_stat_median_robust() {
2410 let mut t = OrderStatTracker::new(50, 1e-3);
2411 for v in (0..20).map(|i| 10.0 + (i % 3) as f64 * 0.01) {
2412 t.insert_and_compute(v);
2413 }
2414 let (median, mad) = t.insert_and_compute(10_000.0);
2415 assert!((median - 10.0).abs() < 2.0, "中位数应对异常值不敏感,median={}", median);
2416 assert!(mad >= 0.0);
2417 }
2418
2419 #[test]
2420 fn test_order_stat_sliding_window() {
2421 let cap = 10;
2422 let mut t = OrderStatTracker::new(cap, 1e-3);
2423 for v in 0..cap { t.insert_and_compute(v as f64); }
2424 assert_eq!(t.window_len(), cap);
2425 t.insert_and_compute(100.0);
2426 assert_eq!(t.window_len(), cap, "窗口应维持固定大小");
2427 }
2428
2429 #[test]
2430 fn test_auto_xiaohuun_small_uses_dual_heap() {
2431 let t = AutoXiaoHun::new(512, 1e-4);
2432 let (med, _) = t.insert_and_compute(42.0);
2433 assert!((med - 42.0).abs() < 1e-6);
2434 }
2435
2436 #[test]
2437 fn test_auto_xiaohuun_large_uses_order_stat() {
2438 let t = AutoXiaoHun::new(LARGE_WINDOW_THRESHOLD + 1, 1e-4);
2439 let (med, _) = t.insert_and_compute(7.0);
2440 assert!((med - 7.0).abs() < 1e-3);
2441 }
2442
2443 #[test]
2446 fn test_shenshang_levels() {
2447 let scorer = ShenShang::new(test_config());
2448 assert_eq!(scorer.assess(10.0, 10.0, 1.0, 1).level, PoSan::Untainted, "正常值应为无染");
2449 assert_eq!(scorer.assess(14.5, 10.0, 1.0, 1).level, PoSan::LightGrief, "轻度异常应为轻愁");
2451 assert_eq!(scorer.assess(25.0, 10.0, 1.0, 1).level, PoSan::SoulWithered,"重度异常应为销魂");
2453 }
2454
2455 #[test]
2456 fn test_shenshang_zero_mad_fallback() {
2457 let scorer = ShenShang::new(test_config());
2458 let order = scorer.assess(50.0, 10.0, 0.0, 1);
2459 assert!(order.score > 5.0, "MAD=0 应退化为绝对差值: score={}", order.score);
2460 assert_ne!(order.level, PoSan::Untainted);
2461 }
2462
2463 #[test]
2464 fn test_poshan_display() {
2465 assert_eq!(format!("{}", PoSan::Untainted), "无染");
2466 assert_eq!(format!("{}", PoSan::LightGrief), "轻愁");
2467 assert_eq!(format!("{}", PoSan::DeepSorrow), "深恸");
2468 assert_eq!(format!("{}", PoSan::SoulWithered), "销魂");
2469 }
2470
2471 #[test]
2474 fn test_qingmi_converges() {
2475 let qm = QingMi::new(0.1);
2476 let sid = "sensor-test".to_string();
2477 for _ in 0..200 { qm.update(&sid, 10.0); }
2478 let v = qm.get(&sid, 0.0);
2479 assert!((v - 10.0).abs() < 0.01, "EMA 应收敛至 10,实际={}", v);
2480 }
2481
2482 #[test]
2485 fn test_config_validation_pass() {
2486 assert!(YanHunConfig::default().validate().is_ok());
2487 }
2488
2489 #[test]
2490 fn test_config_validation_zero_window() {
2491 let bad = YanHunConfig { window_size: 0, ..YanHunConfig::default() };
2492 assert!(bad.validate().is_err());
2493 }
2494
2495 #[test]
2496 fn test_config_validation_negative_eps() {
2497 let bad = YanHunConfig { quantisation_eps: -1.0, ..YanHunConfig::default() };
2498 assert!(bad.validate().is_err());
2499 }
2500
2501 #[test]
2502 fn test_config_validation_bad_confidence() {
2503 let bad = YanHunConfig { promote_confidence: 1.5, ..YanHunConfig::default() };
2504 assert!(bad.validate().is_err());
2505 }
2506
2507 #[test]
2510 fn test_leiluo_new() {
2511 let r = LeiLuo::new("sensor-a", 42.0);
2512 assert_eq!(r.sensor_id, "sensor-a");
2513 assert!((r.value - 42.0).abs() < 1e-9);
2514 assert!(r.quality.is_none());
2515 assert!(r.metadata.is_none());
2516 }
2517
2518 #[test]
2519 fn test_leiluo_with_quality_and_metadata() {
2520 let r = LeiLuo::new("s1", 1.0)
2521 .with_quality(0.95)
2522 .with_metadata(serde_json::json!({"source": "test"}));
2523 assert!((r.quality.unwrap() - 0.95).abs() < 1e-9);
2524 assert!(r.metadata.is_some());
2525 }
2526
2527 #[test]
2530 fn test_sqlite_youming_audit_roundtrip() {
2531 let ym = SqliteYouMing::open_in_memory().unwrap();
2532 let rec = QiChuang {
2533 sensor_id: "s1".into(),
2534 droplet_id: "d1".into(),
2535 order: DuanChang {
2536 level: PoSan::LightGrief,
2537 score: 3.5,
2538 reason: "test".into(),
2539 ts: Utc::now(),
2540 round: 1,
2541 },
2542 original_value: 15.0,
2543 applied_value: Some(12.0),
2544 note: Some("clip".into()),
2545 ts: Utc::now(),
2546 };
2547 ym.audit(&rec).unwrap();
2548 let audits = ym.scan_audit().unwrap();
2549 assert_eq!(audits.len(), 1);
2550 assert!((audits[0].applied_value.unwrap() - 12.0).abs() < 1e-9);
2551 }
2552
2553 #[test]
2554 fn test_sqlite_youming_quarantine_dual_write() {
2555 let ym = SqliteYouMing::open_in_memory().unwrap();
2556 let rec = QiChuang {
2557 sensor_id: "sq".into(),
2558 droplet_id: "dq".into(),
2559 order: DuanChang {
2560 level: PoSan::SoulWithered,
2561 score: 15.0,
2562 reason: "spike".into(),
2563 ts: Utc::now(),
2564 round: 1,
2565 },
2566 original_value: 500.0,
2567 applied_value: None,
2568 note: Some("quarantined".into()),
2569 ts: Utc::now(),
2570 };
2571 ym.quarantine(&rec).unwrap();
2572 let q = ym.scan_quarantine().unwrap();
2574 assert_eq!(q.len(), 1, "隔离树应有 1 条记录");
2575 let a = ym.scan_audit().unwrap();
2577 assert_eq!(a.len(), 1, "审计树应也有 1 条记录(双写)");
2578 }
2579
2580 #[test]
2581 fn test_sqlite_youming_cas() {
2582 let ym = SqliteYouMing::open_in_memory().unwrap();
2583 let rec = ChouKu {
2584 id: "cas-1".into(),
2585 sensor_id: "s1".into(),
2586 ts: Utc::now(),
2587 value: 10.0,
2588 quality: None,
2589 score: 0.5,
2590 poshan: "无染".into(),
2591 origin_median:None,
2592 origin_mad: None,
2593 state: ChouChang::Slumber,
2594 round: 0,
2595 metadata: None,
2596 };
2597 ym.append_sediment(&rec).unwrap();
2598
2599 let ok = ym.compare_and_swap_state(&"cas-1".into(), ChouChang::Slumber, ChouChang::Forging(1), 1).unwrap();
2601 assert!(ok, "CAS 应成功");
2602
2603 let ok2 = ym.compare_and_swap_state(&"cas-1".into(), ChouChang::Slumber, ChouChang::Forging(2), 2).unwrap();
2605 assert!(!ok2, "CAS 应失败(状态已变)");
2606 }
2607
2608 #[test]
2611 fn test_dbscan_1d_basic() {
2612 let values = vec![1.0, 1.1, 1.2, 50.0, 1.15, 1.05];
2613 let labels = dbscan_1d(&values, 0.5, 2);
2614 let noise = labels.iter().filter(|&&l| l == -1).count();
2616 let in_cluster = labels.iter().filter(|&&l| l >= 0).count();
2617 assert!(in_cluster >= 4, "多数点应聚为簇,噪声={} 簇内={}", noise, in_cluster);
2618 }
2619
2620 #[test]
2621 fn test_huber_robust_location() {
2622 let mut vals: Vec<f64> = (0..15).map(|i| 10.0 + i as f64 * 0.01).collect();
2624 vals.push(10_000.0);
2625 let est = huber_robust_location(&vals, 1.0, 50);
2626 assert!((est - 10.07).abs() < 0.5, "Huber 估计应接近真实中心,est={}", est);
2627 }
2628
2629 #[test]
2630 fn test_kalman_convergence() {
2631 let mut kf = Kalman1D::new(1e-3, 1e-2);
2632 let signal = vec![10.0; 50];
2633 let out = kf.smooth_sequence(&signal);
2634 let last = *out.last().unwrap();
2635 assert!((last - 10.0).abs() < 0.01, "卡尔曼平滑应收敛至真值,last={}", last);
2636 }
2637
2638 #[test]
2639 fn test_confidence_computation() {
2640 let tight = vec![10.0, 10.01, 9.99, 10.02, 9.98];
2641 let spread = vec![1.0, 5.0, 10.0, 15.0, 20.0];
2642 let c_tight = compute_confidence(&tight, 10.0);
2643 let c_spread = compute_confidence(&spread, 10.2);
2644 assert!(c_tight > c_spread, "紧凑簇置信度应高于分散簇");
2645 assert!(c_tight <= 1.0);
2646 assert!(c_spread >= 0.0);
2647 }
2648}
2649
2650