1use std::collections::{HashMap, VecDeque};
8use std::sync::RwLock;
9use std::sync::atomic::AtomicBool;
10use std::time::{Duration, Instant};
11
12use serde::Serialize;
13
14#[derive(Debug, Clone, Serialize)]
16pub struct CommandTimingStats {
17 pub command: String,
19 pub count: u64,
21 pub min_ms: f64,
23 pub max_ms: f64,
25 pub avg_ms: f64,
27 pub p95_ms: f64,
29 pub total_ms: f64,
31}
32
33const MAX_TIMING_SAMPLES: usize = 1024;
40
41#[derive(Debug, Default)]
46pub(crate) struct TimingSamples {
47 recent: VecDeque<Duration>,
49 count: u64,
51 total: Duration,
53 min: Option<Duration>,
55 max: Option<Duration>,
57}
58
59impl TimingSamples {
60 pub fn record(&mut self, duration: Duration) {
62 self.count += 1;
63 self.total = self.total.saturating_add(duration);
64 self.min = Some(self.min.map_or(duration, |m| m.min(duration)));
65 self.max = Some(self.max.map_or(duration, |m| m.max(duration)));
66 if self.recent.len() == MAX_TIMING_SAMPLES {
67 self.recent.pop_front();
68 }
69 self.recent.push_back(duration);
70 }
71
72 #[must_use]
76 pub fn stats(&self, command: &str) -> CommandTimingStats {
77 if self.count == 0 {
78 return CommandTimingStats {
79 command: command.to_string(),
80 count: 0,
81 min_ms: 0.0,
82 max_ms: 0.0,
83 avg_ms: 0.0,
84 p95_ms: 0.0,
85 total_ms: 0.0,
86 };
87 }
88 let to_ms = |d: Duration| d.as_secs_f64() * 1000.0;
89 let round2 = |v: f64| (v * 100.0).round() / 100.0;
90
91 let total_ms = to_ms(self.total);
92 let avg_ms = total_ms / self.count as f64;
93 let min_ms = self.min.map_or(0.0, to_ms);
94 let max_ms = self.max.map_or(0.0, to_ms);
95
96 let mut sorted: Vec<f64> = self.recent.iter().copied().map(to_ms).collect();
98 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
99 let p95 = if sorted.is_empty() {
100 0.0
101 } else {
102 let idx = ((sorted.len() as f64) * 0.95).ceil() as usize;
103 sorted[idx.min(sorted.len() - 1)]
104 };
105
106 CommandTimingStats {
107 command: command.to_string(),
108 count: self.count,
109 min_ms: round2(min_ms),
110 max_ms: round2(max_ms),
111 avg_ms: round2(avg_ms),
112 p95_ms: round2(p95),
113 total_ms: round2(total_ms),
114 }
115 }
116}
117
118pub struct CommandTimings {
120 inner: RwLock<HashMap<String, TimingSamples>>,
121}
122
123impl CommandTimings {
124 #[must_use]
126 pub fn new() -> Self {
127 Self {
128 inner: RwLock::new(HashMap::new()),
129 }
130 }
131
132 pub fn record(&self, command: &str, duration: Duration) {
134 let mut map = self
135 .inner
136 .write()
137 .unwrap_or_else(std::sync::PoisonError::into_inner);
138 map.entry(command.to_string()).or_default().record(duration);
139 }
140
141 #[must_use]
143 pub fn all_stats(&self) -> Vec<CommandTimingStats> {
144 let map = self
145 .inner
146 .read()
147 .unwrap_or_else(std::sync::PoisonError::into_inner);
148 let mut stats: Vec<CommandTimingStats> =
149 map.iter().map(|(name, s)| s.stats(name)).collect();
150 stats.sort_by(|a, b| {
151 b.total_ms
152 .partial_cmp(&a.total_ms)
153 .unwrap_or(std::cmp::Ordering::Equal)
154 });
155 stats
156 }
157
158 #[must_use]
160 pub fn stats_for(&self, command: &str) -> Option<CommandTimingStats> {
161 let map = self
162 .inner
163 .read()
164 .unwrap_or_else(std::sync::PoisonError::into_inner);
165 map.get(command).map(|s| s.stats(command))
166 }
167
168 pub fn clear(&self) {
170 let mut map = self
171 .inner
172 .write()
173 .unwrap_or_else(std::sync::PoisonError::into_inner);
174 map.clear();
175 }
176}
177
178impl Default for CommandTimings {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184#[derive(Debug, Clone, Serialize)]
188pub enum FaultType {
189 Delay {
191 delay_ms: u64,
193 },
194 Error {
196 message: String,
198 },
199 Drop,
201 Corrupt,
203}
204
205#[derive(Debug, Clone, Serialize)]
207pub struct FaultConfig {
208 pub command: String,
210 pub fault_type: FaultType,
212 pub trigger_count: u64,
214 pub max_triggers: u64,
216 #[serde(skip)]
218 pub created_at: Instant,
219}
220
221pub const FAULT_TTL: Duration = Duration::from_secs(900); impl FaultConfig {
226 #[must_use]
229 pub fn should_trigger_at(&self, now: Instant) -> bool {
230 if now.saturating_duration_since(self.created_at) >= FAULT_TTL {
231 return false;
232 }
233 self.max_triggers == 0 || self.trigger_count < self.max_triggers
234 }
235
236 #[must_use]
238 pub fn should_trigger(&self) -> bool {
239 self.should_trigger_at(Instant::now())
240 }
241}
242
243pub struct FaultRegistry {
245 inner: RwLock<HashMap<String, FaultConfig>>,
246}
247
248impl FaultRegistry {
249 #[must_use]
251 pub fn new() -> Self {
252 Self {
253 inner: RwLock::new(HashMap::new()),
254 }
255 }
256
257 pub fn inject(&self, config: FaultConfig) {
259 let mut map = self
260 .inner
261 .write()
262 .unwrap_or_else(std::sync::PoisonError::into_inner);
263 map.insert(config.command.clone(), config);
264 }
265
266 pub fn check_and_trigger(&self, command: &str) -> Option<FaultType> {
269 let mut map = self
270 .inner
271 .write()
272 .unwrap_or_else(std::sync::PoisonError::into_inner);
273 if let Some(config) = map.get_mut(command)
274 && config.should_trigger()
275 {
276 config.trigger_count += 1;
277 return Some(config.fault_type.clone());
278 }
279 None
280 }
281
282 #[must_use]
284 pub fn list(&self) -> Vec<FaultConfig> {
285 let map = self
286 .inner
287 .read()
288 .unwrap_or_else(std::sync::PoisonError::into_inner);
289 map.values().cloned().collect()
290 }
291
292 pub fn clear(&self, command: &str) -> bool {
294 let mut map = self
295 .inner
296 .write()
297 .unwrap_or_else(std::sync::PoisonError::into_inner);
298 map.remove(command).is_some()
299 }
300
301 pub fn clear_all(&self) -> usize {
303 let mut map = self
304 .inner
305 .write()
306 .unwrap_or_else(std::sync::PoisonError::into_inner);
307 let count = map.len();
308 map.clear();
309 count
310 }
311}
312
313impl Default for FaultRegistry {
314 fn default() -> Self {
315 Self::new()
316 }
317}
318
319#[derive(Debug, Clone, Serialize, PartialEq)]
323pub enum JsonShape {
324 Null,
326 Bool,
328 Number,
330 String,
332 Array(Box<Self>),
334 Object(HashMap<String, Self>),
336}
337
338impl JsonShape {
339 #[must_use]
341 pub fn from_value(value: &serde_json::Value) -> Self {
342 match value {
343 serde_json::Value::Null => Self::Null,
344 serde_json::Value::Bool(_) => Self::Bool,
345 serde_json::Value::Number(_) => Self::Number,
346 serde_json::Value::String(_) => Self::String,
347 serde_json::Value::Array(arr) => {
348 let elem = arr.first().map_or(Self::Null, Self::from_value);
349 Self::Array(Box::new(elem))
350 }
351 serde_json::Value::Object(obj) => {
352 let fields: HashMap<String, Self> = obj
353 .iter()
354 .map(|(k, v)| (k.clone(), Self::from_value(v)))
355 .collect();
356 Self::Object(fields)
357 }
358 }
359 }
360
361 #[must_use]
363 pub fn type_name(&self) -> &'static str {
364 match self {
365 Self::Null => "null",
366 Self::Bool => "bool",
367 Self::Number => "number",
368 Self::String => "string",
369 Self::Array(_) => "array",
370 Self::Object(_) => "object",
371 }
372 }
373}
374
375#[derive(Debug, Clone, Serialize)]
377pub struct ContractBaseline {
378 pub command: String,
380 pub args: serde_json::Value,
382 pub shape: JsonShape,
384 pub sample: String,
386 pub recorded_at: String,
388}
389
390#[derive(Debug, Clone, Serialize)]
392pub struct ContractDrift {
393 pub command: String,
395 pub new_fields: Vec<String>,
397 pub removed_fields: Vec<String>,
399 pub type_changes: Vec<TypeChange>,
401 pub shape_matches: bool,
403}
404
405#[derive(Debug, Clone, Serialize)]
407pub struct TypeChange {
408 pub path: String,
410 pub baseline_type: String,
412 pub current_type: String,
414}
415
416#[must_use]
418pub fn diff_shapes(baseline: &JsonShape, current: &JsonShape, prefix: &str) -> ContractDrift {
419 let mut new_fields = Vec::new();
420 let mut removed_fields = Vec::new();
421 let mut type_changes = Vec::new();
422
423 diff_shapes_inner(
424 baseline,
425 current,
426 prefix,
427 &mut new_fields,
428 &mut removed_fields,
429 &mut type_changes,
430 );
431
432 let shape_matches =
433 new_fields.is_empty() && removed_fields.is_empty() && type_changes.is_empty();
434 ContractDrift {
435 command: prefix.to_string(),
436 new_fields,
437 removed_fields,
438 type_changes,
439 shape_matches,
440 }
441}
442
443fn diff_shapes_inner(
444 baseline: &JsonShape,
445 current: &JsonShape,
446 prefix: &str,
447 new_fields: &mut Vec<String>,
448 removed_fields: &mut Vec<String>,
449 type_changes: &mut Vec<TypeChange>,
450) {
451 match (baseline, current) {
452 (JsonShape::Object(b_fields), JsonShape::Object(c_fields)) => {
453 for (key, b_shape) in b_fields {
454 let path = if prefix.is_empty() {
455 key.clone()
456 } else {
457 format!("{prefix}.{key}")
458 };
459 if let Some(c_shape) = c_fields.get(key) {
460 diff_shapes_inner(
461 b_shape,
462 c_shape,
463 &path,
464 new_fields,
465 removed_fields,
466 type_changes,
467 );
468 } else {
469 removed_fields.push(path);
470 }
471 }
472 for key in c_fields.keys() {
473 if !b_fields.contains_key(key) {
474 let path = if prefix.is_empty() {
475 key.clone()
476 } else {
477 format!("{prefix}.{key}")
478 };
479 new_fields.push(path);
480 }
481 }
482 }
483 (JsonShape::Array(b_elem), JsonShape::Array(c_elem)) => {
484 let path = format!("{prefix}[]");
485 diff_shapes_inner(
486 b_elem,
487 c_elem,
488 &path,
489 new_fields,
490 removed_fields,
491 type_changes,
492 );
493 }
494 (b, c) if b.type_name() != c.type_name() => {
495 type_changes.push(TypeChange {
496 path: prefix.to_string(),
497 baseline_type: b.type_name().to_string(),
498 current_type: c.type_name().to_string(),
499 });
500 }
501 _ => {}
502 }
503}
504
505pub struct ContractStore {
507 inner: RwLock<HashMap<String, ContractBaseline>>,
508}
509
510impl ContractStore {
511 #[must_use]
513 pub fn new() -> Self {
514 Self {
515 inner: RwLock::new(HashMap::new()),
516 }
517 }
518
519 pub fn record(&self, baseline: ContractBaseline) {
521 let mut map = self
522 .inner
523 .write()
524 .unwrap_or_else(std::sync::PoisonError::into_inner);
525 map.insert(baseline.command.clone(), baseline);
526 }
527
528 #[must_use]
530 pub fn get(&self, command: &str) -> Option<ContractBaseline> {
531 let map = self
532 .inner
533 .read()
534 .unwrap_or_else(std::sync::PoisonError::into_inner);
535 map.get(command).cloned()
536 }
537
538 #[must_use]
540 pub fn all(&self) -> Vec<ContractBaseline> {
541 let map = self
542 .inner
543 .read()
544 .unwrap_or_else(std::sync::PoisonError::into_inner);
545 map.values().cloned().collect()
546 }
547
548 pub fn clear(&self) -> usize {
550 let mut map = self
551 .inner
552 .write()
553 .unwrap_or_else(std::sync::PoisonError::into_inner);
554 let count = map.len();
555 map.clear();
556 count
557 }
558}
559
560impl Default for ContractStore {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566#[derive(Debug, Clone, Serialize)]
570pub struct StartupPhase {
571 pub name: String,
573 pub duration_ms: f64,
575 pub cumulative_ms: f64,
577}
578
579pub struct StartupTimeline {
581 start: Instant,
582 phases: RwLock<Vec<(String, Instant)>>,
583}
584
585impl StartupTimeline {
586 #[must_use]
588 pub fn new() -> Self {
589 Self {
590 start: Instant::now(),
591 phases: RwLock::new(Vec::new()),
592 }
593 }
594
595 pub fn mark(&self, name: &str) {
597 let mut phases = self
598 .phases
599 .write()
600 .unwrap_or_else(std::sync::PoisonError::into_inner);
601 phases.push((name.to_string(), Instant::now()));
602 }
603
604 #[must_use]
606 pub fn report(&self) -> Vec<StartupPhase> {
607 let phases = self
608 .phases
609 .read()
610 .unwrap_or_else(std::sync::PoisonError::into_inner);
611 let mut result = Vec::new();
612 let mut prev = self.start;
613
614 for (name, instant) in phases.iter() {
615 let duration = instant.duration_since(prev);
616 let cumulative = instant.duration_since(self.start);
617 result.push(StartupPhase {
618 name: name.clone(),
619 duration_ms: (duration.as_secs_f64() * 1000.0 * 100.0).round() / 100.0,
620 cumulative_ms: (cumulative.as_secs_f64() * 1000.0 * 100.0).round() / 100.0,
621 });
622 prev = *instant;
623 }
624 result
625 }
626
627 #[must_use]
629 pub fn total_ms(&self) -> f64 {
630 let phases = self
631 .phases
632 .read()
633 .unwrap_or_else(std::sync::PoisonError::into_inner);
634 if let Some((_, last)) = phases.last() {
635 (last.duration_since(self.start).as_secs_f64() * 1000.0 * 100.0).round() / 100.0
636 } else {
637 0.0
638 }
639 }
640}
641
642impl Default for StartupTimeline {
643 fn default() -> Self {
644 Self::new()
645 }
646}
647
648#[derive(Debug, Clone, Serialize)]
652pub struct CapturedTauriEvent {
653 pub name: String,
655 pub payload: String,
657 pub timestamp: String,
659}
660
661const DEFAULT_EVENT_BUS_CAPACITY: usize = 1000;
662
663#[derive(Clone)]
665pub struct EventBusMonitor {
666 inner: std::sync::Arc<RwLock<VecDeque<CapturedTauriEvent>>>,
667 capacity: usize,
668}
669
670impl EventBusMonitor {
671 #[must_use]
673 pub fn new(capacity: usize) -> Self {
674 Self {
675 inner: std::sync::Arc::new(RwLock::new(VecDeque::with_capacity(capacity))),
676 capacity,
677 }
678 }
679
680 pub fn push(&self, event: CapturedTauriEvent) {
682 let mut buf = self
683 .inner
684 .write()
685 .unwrap_or_else(std::sync::PoisonError::into_inner);
686 if buf.len() >= self.capacity {
687 buf.pop_front();
688 }
689 buf.push_back(event);
690 }
691
692 #[must_use]
694 pub fn events(&self) -> Vec<CapturedTauriEvent> {
695 self.inner
696 .read()
697 .unwrap_or_else(std::sync::PoisonError::into_inner)
698 .iter()
699 .cloned()
700 .collect()
701 }
702
703 #[must_use]
705 pub fn len(&self) -> usize {
706 self.inner
707 .read()
708 .unwrap_or_else(std::sync::PoisonError::into_inner)
709 .len()
710 }
711
712 #[must_use]
714 pub fn is_empty(&self) -> bool {
715 self.len() == 0
716 }
717
718 pub fn clear(&self) -> usize {
720 let mut buf = self
721 .inner
722 .write()
723 .unwrap_or_else(std::sync::PoisonError::into_inner);
724 let count = buf.len();
725 buf.clear();
726 count
727 }
728}
729
730impl Default for EventBusMonitor {
731 fn default() -> Self {
732 Self::new(DEFAULT_EVENT_BUS_CAPACITY)
733 }
734}
735
736pub type ProbeFn = dyn Fn() -> serde_json::Value + Send + Sync + 'static;
741
742#[derive(Clone, Default)]
752pub struct AppStateProbes {
753 inner: std::sync::Arc<RwLock<std::collections::BTreeMap<String, std::sync::Arc<ProbeFn>>>>,
754}
755
756impl AppStateProbes {
757 pub fn register(&self, name: impl Into<String>, probe: std::sync::Arc<ProbeFn>) {
759 self.inner
760 .write()
761 .unwrap_or_else(std::sync::PoisonError::into_inner)
762 .insert(name.into(), probe);
763 }
764
765 #[must_use]
767 pub fn names(&self) -> Vec<String> {
768 self.inner
769 .read()
770 .unwrap_or_else(std::sync::PoisonError::into_inner)
771 .keys()
772 .cloned()
773 .collect()
774 }
775
776 #[must_use]
779 pub fn run(&self, name: &str) -> Option<serde_json::Value> {
780 let probe = self
781 .inner
782 .read()
783 .unwrap_or_else(std::sync::PoisonError::into_inner)
784 .get(name)
785 .cloned();
786 probe.map(|p| p())
787 }
788
789 #[must_use]
791 pub fn len(&self) -> usize {
792 self.inner
793 .read()
794 .unwrap_or_else(std::sync::PoisonError::into_inner)
795 .len()
796 }
797
798 #[must_use]
800 pub fn is_empty(&self) -> bool {
801 self.len() == 0
802 }
803}
804
805#[derive(Debug, Clone, Serialize)]
809pub struct TrackedTaskInfo {
810 pub name: String,
812 pub spawned_at: String,
814 pub is_finished: bool,
816 pub uptime_secs: u64,
818}
819
820struct TrackedTaskEntry {
821 name: String,
822 spawned_at: Instant,
823 spawned_at_wall: String,
824 finished: std::sync::Arc<AtomicBool>,
825}
826
827pub struct TaskTracker {
829 tasks: RwLock<Vec<TrackedTaskEntry>>,
830}
831
832impl TaskTracker {
833 #[must_use]
835 pub fn new() -> Self {
836 Self {
837 tasks: RwLock::new(Vec::new()),
838 }
839 }
840
841 pub fn track(&self, name: &str) -> std::sync::Arc<AtomicBool> {
843 let finished = std::sync::Arc::new(AtomicBool::new(false));
844 let entry = TrackedTaskEntry {
845 name: name.to_string(),
846 spawned_at: Instant::now(),
847 spawned_at_wall: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
848 finished: finished.clone(),
849 };
850 self.tasks
851 .write()
852 .unwrap_or_else(std::sync::PoisonError::into_inner)
853 .push(entry);
854 finished
855 }
856
857 #[must_use]
859 pub fn list(&self) -> Vec<TrackedTaskInfo> {
860 let tasks = self
861 .tasks
862 .read()
863 .unwrap_or_else(std::sync::PoisonError::into_inner);
864 tasks
865 .iter()
866 .map(|t| TrackedTaskInfo {
867 name: t.name.clone(),
868 spawned_at: t.spawned_at_wall.clone(),
869 is_finished: t.finished.load(std::sync::atomic::Ordering::Relaxed),
870 uptime_secs: t.spawned_at.elapsed().as_secs(),
871 })
872 .collect()
873 }
874
875 #[must_use]
877 pub fn active_count(&self) -> usize {
878 let tasks = self
879 .tasks
880 .read()
881 .unwrap_or_else(std::sync::PoisonError::into_inner);
882 tasks
883 .iter()
884 .filter(|t| !t.finished.load(std::sync::atomic::Ordering::Relaxed))
885 .count()
886 }
887}
888
889impl Default for TaskTracker {
890 fn default() -> Self {
891 Self::new()
892 }
893}
894
895#[derive(Debug, Clone, Serialize)]
899pub struct ChildProcessInfo {
900 pub pid: u32,
902 pub ppid: u32,
904 pub name: String,
906 pub memory_bytes: Option<u64>,
908}
909
910#[must_use]
917pub fn enumerate_child_processes() -> Vec<ChildProcessInfo> {
918 let my_pid = std::process::id();
919
920 #[cfg(windows)]
921 {
922 enumerate_children_windows(my_pid)
923 }
924
925 #[cfg(target_os = "linux")]
926 {
927 enumerate_children_linux(my_pid)
928 }
929
930 #[cfg(target_os = "macos")]
931 {
932 enumerate_children_macos(my_pid)
933 }
934
935 #[cfg(not(any(windows, target_os = "linux", target_os = "macos")))]
936 {
937 let _ = my_pid;
938 Vec::new()
939 }
940}
941
942#[cfg(windows)]
943#[allow(unsafe_code)]
944fn enumerate_children_windows(parent_pid: u32) -> Vec<ChildProcessInfo> {
945 use windows::Win32::Foundation::CloseHandle;
946 use windows::Win32::System::Diagnostics::ToolHelp::{
947 CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next, TH32CS_SNAPPROCESS,
948 };
949
950 let mut children = Vec::new();
951
952 unsafe {
957 let Ok(snapshot) = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) else {
958 return children;
959 };
960
961 let mut entry: PROCESSENTRY32 = std::mem::zeroed();
962 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
963
964 if Process32First(snapshot, &mut entry).is_ok() {
965 loop {
966 if entry.th32ParentProcessID == parent_pid && entry.th32ProcessID != parent_pid {
967 let name_bytes: Vec<u8> = entry
968 .szExeFile
969 .iter()
970 .take_while(|&&b| b != 0)
971 .map(|&b| b as u8)
972 .collect();
973 let name = String::from_utf8_lossy(&name_bytes).to_string();
974
975 let memory_bytes = get_process_memory_windows(entry.th32ProcessID);
976
977 children.push(ChildProcessInfo {
978 pid: entry.th32ProcessID,
979 ppid: entry.th32ParentProcessID,
980 name,
981 memory_bytes,
982 });
983 }
984
985 if Process32Next(snapshot, &mut entry).is_err() {
986 break;
987 }
988 }
989 }
990
991 let _ = CloseHandle(snapshot);
992 }
993
994 children
995}
996
997#[cfg(windows)]
998#[allow(unsafe_code)]
999fn get_process_memory_windows(pid: u32) -> Option<u64> {
1000 use windows::Win32::System::ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
1001 use windows::Win32::System::Threading::{
1002 OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, PROCESS_VM_READ,
1003 };
1004
1005 unsafe {
1009 let process = OpenProcess(
1010 PROCESS_QUERY_LIMITED_INFORMATION | PROCESS_VM_READ,
1011 false,
1012 pid,
1013 )
1014 .ok()?;
1015
1016 let mut counters: PROCESS_MEMORY_COUNTERS = std::mem::zeroed();
1017 counters.cb = std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32;
1018
1019 if GetProcessMemoryInfo(process, &mut counters, counters.cb).is_ok() {
1020 Some(counters.WorkingSetSize as u64)
1021 } else {
1022 None
1023 }
1024 }
1025}
1026
1027#[cfg(target_os = "linux")]
1028fn enumerate_children_linux(parent_pid: u32) -> Vec<ChildProcessInfo> {
1029 let mut children = Vec::new();
1030 let Ok(entries) = std::fs::read_dir("/proc") else {
1031 return children;
1032 };
1033
1034 for entry in entries.flatten() {
1035 let file_name = entry.file_name();
1036 let Some(pid_str) = file_name.to_str() else {
1037 continue;
1038 };
1039 let Ok(pid) = pid_str.parse::<u32>() else {
1040 continue;
1041 };
1042
1043 let status_path = format!("/proc/{pid}/status");
1044 let Ok(status) = std::fs::read_to_string(&status_path) else {
1045 continue;
1046 };
1047
1048 let mut ppid: Option<u32> = None;
1049 let mut name = String::new();
1050 let mut vm_rss_kb: u64 = 0;
1051
1052 for line in status.lines() {
1053 if let Some(v) = line.strip_prefix("PPid:\t") {
1054 ppid = v.trim().parse().ok();
1055 } else if let Some(v) = line.strip_prefix("Name:\t") {
1056 name = v.trim().to_string();
1057 } else if let Some(v) = line.strip_prefix("VmRSS:") {
1058 vm_rss_kb = v
1059 .split_whitespace()
1060 .next()
1061 .and_then(|n| n.parse().ok())
1062 .unwrap_or(0);
1063 }
1064 }
1065
1066 if ppid == Some(parent_pid) {
1067 children.push(ChildProcessInfo {
1068 pid,
1069 ppid: parent_pid,
1070 name,
1071 memory_bytes: if vm_rss_kb > 0 {
1072 Some(vm_rss_kb * 1024)
1073 } else {
1074 None
1075 },
1076 });
1077 }
1078 }
1079
1080 children
1081}
1082
1083#[cfg(target_os = "macos")]
1084#[allow(unsafe_code)]
1085fn enumerate_children_macos(parent_pid: u32) -> Vec<ChildProcessInfo> {
1086 use std::mem;
1087
1088 unsafe extern "C" {
1089 fn proc_listchildpids(ppid: i32, buffer: *mut i32, buffersize: i32) -> i32;
1090 fn proc_pidinfo(pid: i32, flavor: i32, arg: u64, buffer: *mut u8, buffersize: i32) -> i32;
1091 fn proc_name(pid: i32, buffer: *mut u8, buffersize: u32) -> i32;
1092 }
1093
1094 const PROC_PIDTASKINFO: i32 = 4;
1095
1096 #[repr(C)]
1097 struct ProcTaskInfo {
1098 pti_virtual_size: u64,
1099 pti_resident_size: u64,
1100 pti_total_user: u64,
1101 pti_total_system: u64,
1102 pti_threads_user: u64,
1103 pti_threads_system: u64,
1104 pti_policy: i32,
1105 pti_faults: i32,
1106 pti_pageins: i32,
1107 pti_cow_faults: i32,
1108 pti_messages_sent: i32,
1109 pti_messages_received: i32,
1110 pti_syscalls_mach: i32,
1111 pti_syscalls_unix: i32,
1112 pti_csw: i32,
1113 pti_threadnum: i32,
1114 pti_numrunning: i32,
1115 pti_priority: i32,
1116 }
1117
1118 let mut children = Vec::new();
1119
1120 unsafe {
1124 let ppid = parent_pid as i32;
1125 let mut cap = 256usize;
1132 let (pids, n) = loop {
1133 let mut pids = vec![0i32; cap];
1134 let buf_size = (cap * mem::size_of::<i32>()) as i32;
1135 let actual = proc_listchildpids(ppid, pids.as_mut_ptr(), buf_size);
1136 if actual <= 0 {
1137 return children;
1138 }
1139 let count = actual as usize;
1140 if count < cap || cap >= 65536 {
1143 break (pids, count.min(cap));
1144 }
1145 cap = (count + 16).max(cap * 2);
1146 };
1147 for &pid in &pids[..n] {
1148 if pid <= 0 {
1149 continue;
1150 }
1151
1152 let mut name_buf = [0u8; 256];
1153 let name_len = proc_name(pid, name_buf.as_mut_ptr(), 256);
1154 let name = if name_len > 0 {
1155 String::from_utf8_lossy(&name_buf[..name_len as usize]).to_string()
1156 } else {
1157 String::from("<unknown>")
1158 };
1159
1160 let mut task_info: ProcTaskInfo = mem::zeroed();
1161 let info_size = mem::size_of::<ProcTaskInfo>() as i32;
1162 let ret = proc_pidinfo(
1163 pid,
1164 PROC_PIDTASKINFO,
1165 0,
1166 &mut task_info as *mut _ as *mut u8,
1167 info_size,
1168 );
1169
1170 let memory_bytes = if ret == info_size {
1171 Some(task_info.pti_resident_size)
1172 } else {
1173 None
1174 };
1175
1176 children.push(ChildProcessInfo {
1177 pid: pid as u32,
1178 ppid: parent_pid,
1179 name,
1180 memory_bytes,
1181 });
1182 }
1183 }
1184
1185 children
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use super::*;
1191
1192 #[test]
1193 fn app_state_probes_register_run_list() {
1194 let probes = AppStateProbes::default();
1195 assert!(probes.is_empty());
1196
1197 probes.register(
1198 "scoring",
1199 std::sync::Arc::new(|| serde_json::json!({ "pipeline_version": 5 })),
1200 );
1201 probes.register("queue", std::sync::Arc::new(|| serde_json::json!(0)));
1202
1203 assert_eq!(
1205 probes.names(),
1206 vec!["queue".to_string(), "scoring".to_string()]
1207 );
1208 assert_eq!(probes.len(), 2);
1209
1210 let snapshot = probes.run("scoring").expect("probe runs");
1211 assert_eq!(snapshot["pipeline_version"], 5);
1212 assert!(probes.run("missing").is_none());
1213 }
1214
1215 #[test]
1216 fn app_state_probe_reflects_live_state() {
1217 let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
1220 let probe_counter = std::sync::Arc::clone(&counter);
1221 let probes = AppStateProbes::default();
1222 probes.register(
1223 "counter",
1224 std::sync::Arc::new(move || {
1225 serde_json::json!(probe_counter.load(std::sync::atomic::Ordering::SeqCst))
1226 }),
1227 );
1228
1229 assert_eq!(probes.run("counter").unwrap(), serde_json::json!(0));
1230 counter.store(42, std::sync::atomic::Ordering::SeqCst);
1231 assert_eq!(probes.run("counter").unwrap(), serde_json::json!(42));
1232 }
1233
1234 #[test]
1235 fn event_bus_push_and_read() {
1236 let bus = EventBusMonitor::new(3);
1237 assert!(bus.is_empty());
1238 bus.push(CapturedTauriEvent {
1239 name: "test".to_string(),
1240 payload: "{}".to_string(),
1241 timestamp: "2026-01-01T00:00:00Z".to_string(),
1242 });
1243 assert_eq!(bus.len(), 1);
1244 assert_eq!(bus.events()[0].name, "test");
1245 }
1246
1247 #[test]
1248 fn event_bus_ring_buffer_eviction() {
1249 let bus = EventBusMonitor::new(2);
1250 for i in 0..5 {
1251 bus.push(CapturedTauriEvent {
1252 name: format!("event_{i}"),
1253 payload: String::new(),
1254 timestamp: String::new(),
1255 });
1256 }
1257 assert_eq!(bus.len(), 2);
1258 assert_eq!(bus.events()[0].name, "event_3");
1259 assert_eq!(bus.events()[1].name, "event_4");
1260 }
1261
1262 #[test]
1263 fn event_bus_clear() {
1264 let bus = EventBusMonitor::new(10);
1265 bus.push(CapturedTauriEvent {
1266 name: "a".to_string(),
1267 payload: String::new(),
1268 timestamp: String::new(),
1269 });
1270 assert_eq!(bus.clear(), 1);
1271 assert!(bus.is_empty());
1272 }
1273
1274 #[test]
1275 fn task_tracker_lifecycle() {
1276 let tracker = TaskTracker::new();
1277 let flag = tracker.track("mcp_server");
1278 let tasks = tracker.list();
1279 assert_eq!(tasks.len(), 1);
1280 assert_eq!(tasks[0].name, "mcp_server");
1281 assert!(!tasks[0].is_finished);
1282 assert_eq!(tracker.active_count(), 1);
1283
1284 flag.store(true, std::sync::atomic::Ordering::Relaxed);
1285 let tasks = tracker.list();
1286 assert!(tasks[0].is_finished);
1287 assert_eq!(tracker.active_count(), 0);
1288 }
1289
1290 #[test]
1291 fn timing_samples_basic() {
1292 let mut samples = TimingSamples::default();
1293 samples.record(Duration::from_millis(10));
1294 samples.record(Duration::from_millis(20));
1295 samples.record(Duration::from_millis(30));
1296 let stats = samples.stats("test_cmd");
1297 assert_eq!(stats.count, 3);
1298 assert!((stats.min_ms - 10.0).abs() < 1.0);
1299 assert!((stats.max_ms - 30.0).abs() < 1.0);
1300 assert!((stats.avg_ms - 20.0).abs() < 1.0);
1301 }
1302
1303 #[test]
1304 fn timing_samples_empty() {
1305 let samples = TimingSamples::default();
1306 let stats = samples.stats("empty");
1307 assert_eq!(stats.count, 0);
1308 assert_eq!(stats.min_ms, 0.0);
1309 }
1310
1311 #[test]
1312 fn timing_samples_bounded_but_count_accurate() {
1313 let mut samples = TimingSamples::default();
1319 let n = MAX_TIMING_SAMPLES * 3;
1320 for i in 0..n {
1321 samples.record(Duration::from_millis((i + 1) as u64));
1324 }
1325 assert!(
1326 samples.recent.len() <= MAX_TIMING_SAMPLES,
1327 "recent window must stay bounded, got {}",
1328 samples.recent.len()
1329 );
1330 let stats = samples.stats("soak");
1331 assert_eq!(stats.count, n as u64, "count must reflect full history");
1332 assert!(
1333 (stats.min_ms - 1.0).abs() < 0.5,
1334 "all-time min lost: {}",
1335 stats.min_ms
1336 );
1337 assert!(
1338 (stats.max_ms - n as f64).abs() < 0.5,
1339 "all-time max lost: {}",
1340 stats.max_ms
1341 );
1342 }
1343
1344 #[test]
1345 fn command_timings_thread_safe() {
1346 let timings = CommandTimings::new();
1347 timings.record("cmd_a", Duration::from_millis(5));
1348 timings.record("cmd_a", Duration::from_millis(15));
1349 timings.record("cmd_b", Duration::from_millis(100));
1350
1351 let all = timings.all_stats();
1352 assert_eq!(all.len(), 2);
1353 assert_eq!(all[0].command, "cmd_b");
1354
1355 let a = timings.stats_for("cmd_a").unwrap();
1356 assert_eq!(a.count, 2);
1357 }
1358
1359 #[test]
1360 fn fault_registry_lifecycle() {
1361 let registry = FaultRegistry::new();
1362 registry.inject(FaultConfig {
1363 command: "slow_cmd".to_string(),
1364 fault_type: FaultType::Delay { delay_ms: 500 },
1365 trigger_count: 0,
1366 max_triggers: 2,
1367 created_at: Instant::now(),
1368 });
1369
1370 assert!(registry.check_and_trigger("slow_cmd").is_some());
1371 assert!(registry.check_and_trigger("slow_cmd").is_some());
1372 assert!(registry.check_and_trigger("slow_cmd").is_none());
1373
1374 assert_eq!(registry.list().len(), 1);
1375 assert!(registry.clear("slow_cmd"));
1376 assert_eq!(registry.list().len(), 0);
1377 }
1378
1379 #[test]
1380 fn fault_registry_unlimited() {
1381 let registry = FaultRegistry::new();
1382 registry.inject(FaultConfig {
1383 command: "always_fail".to_string(),
1384 fault_type: FaultType::Error {
1385 message: "injected".to_string(),
1386 },
1387 trigger_count: 0,
1388 max_triggers: 0,
1389 created_at: Instant::now(),
1390 });
1391
1392 for _ in 0..100 {
1393 assert!(registry.check_and_trigger("always_fail").is_some());
1394 }
1395 }
1396
1397 #[test]
1398 fn fault_expires_after_ttl() {
1399 let cfg = FaultConfig {
1400 command: "x".to_string(),
1401 fault_type: FaultType::Error {
1402 message: "e".to_string(),
1403 },
1404 trigger_count: 0,
1405 max_triggers: 0, created_at: Instant::now(),
1407 };
1408 assert!(cfg.should_trigger_at(cfg.created_at));
1410 assert!(!cfg.should_trigger_at(cfg.created_at + FAULT_TTL + Duration::from_secs(1)));
1411 }
1412
1413 #[test]
1414 fn json_shape_extraction() {
1415 let value = serde_json::json!({
1416 "name": "test",
1417 "count": 42,
1418 "active": true,
1419 "items": [{"id": 1}],
1420 "meta": null
1421 });
1422 let shape = JsonShape::from_value(&value);
1423 match &shape {
1424 JsonShape::Object(fields) => {
1425 assert_eq!(fields.len(), 5);
1426 assert_eq!(*fields.get("name").unwrap(), JsonShape::String);
1427 assert_eq!(*fields.get("count").unwrap(), JsonShape::Number);
1428 assert_eq!(*fields.get("active").unwrap(), JsonShape::Bool);
1429 assert_eq!(*fields.get("meta").unwrap(), JsonShape::Null);
1430 }
1431 _ => panic!("expected object"),
1432 }
1433 }
1434
1435 #[test]
1436 fn contract_diff_detects_changes() {
1437 let baseline = serde_json::json!({"name": "old", "count": 1});
1438 let current = serde_json::json!({"name": "new", "count": "not_a_number", "extra": true});
1439
1440 let b_shape = JsonShape::from_value(&baseline);
1441 let c_shape = JsonShape::from_value(¤t);
1442 let drift = diff_shapes(&b_shape, &c_shape, "test_cmd");
1443
1444 assert!(!drift.shape_matches);
1445 assert_eq!(drift.new_fields, vec!["test_cmd.extra"]);
1446 assert_eq!(drift.type_changes.len(), 1);
1447 assert_eq!(drift.type_changes[0].path, "test_cmd.count");
1448 }
1449
1450 #[test]
1451 fn contract_store_crud() {
1452 let store = ContractStore::new();
1453 let baseline = ContractBaseline {
1454 command: "get_user".to_string(),
1455 args: serde_json::json!({}),
1456 shape: JsonShape::Object(HashMap::new()),
1457 sample: "{}".to_string(),
1458 recorded_at: "2026-05-26".to_string(),
1459 };
1460 store.record(baseline);
1461 assert!(store.get("get_user").is_some());
1462 assert_eq!(store.all().len(), 1);
1463 assert_eq!(store.clear(), 1);
1464 assert!(store.get("get_user").is_none());
1465 }
1466
1467 #[test]
1468 fn startup_timeline_records_phases() {
1469 let timeline = StartupTimeline::new();
1470 std::thread::sleep(Duration::from_millis(5));
1471 timeline.mark("phase_1");
1472 std::thread::sleep(Duration::from_millis(5));
1473 timeline.mark("phase_2");
1474
1475 let report = timeline.report();
1476 assert_eq!(report.len(), 2);
1477 assert_eq!(report[0].name, "phase_1");
1478 assert!(report[1].cumulative_ms >= report[0].cumulative_ms);
1479 assert!(timeline.total_ms() > 0.0);
1480 }
1481
1482 #[test]
1483 fn enumerate_child_processes_returns_vec() {
1484 let children = enumerate_child_processes();
1485 for child in &children {
1488 assert_ne!(child.pid, 0, "child PID should be non-zero");
1489 assert_eq!(
1490 child.ppid,
1491 std::process::id(),
1492 "parent PID should match current process"
1493 );
1494 assert!(!child.name.is_empty(), "child name should not be empty");
1495 }
1496 }
1497
1498 #[test]
1499 fn enumerate_child_processes_with_spawned_child() {
1500 let child = std::process::Command::new(if cfg!(windows) { "cmd.exe" } else { "sleep" })
1502 .args(if cfg!(windows) {
1503 &["/c", "timeout /t 10 /nobreak >nul"][..]
1504 } else {
1505 &["10"][..]
1506 })
1507 .spawn();
1508
1509 if let Ok(mut child_proc) = child {
1510 let children = enumerate_child_processes();
1511 assert!(
1512 !children.is_empty(),
1513 "should find at least one child process"
1514 );
1515
1516 let found = children.iter().any(|c| c.pid == child_proc.id());
1517 assert!(
1518 found,
1519 "spawned child (PID {}) should appear in enumeration",
1520 child_proc.id()
1521 );
1522
1523 let _ = child_proc.kill();
1524 let _ = child_proc.wait();
1525 }
1526 }
1527
1528 #[test]
1529 fn child_process_info_serializes() {
1530 let info = ChildProcessInfo {
1531 pid: 1234,
1532 ppid: 5678,
1533 name: "test-sidecar".to_string(),
1534 memory_bytes: Some(1_048_576),
1535 };
1536 let json = serde_json::to_value(&info).unwrap();
1537 assert_eq!(json["pid"], 1234);
1538 assert_eq!(json["ppid"], 5678);
1539 assert_eq!(json["name"], "test-sidecar");
1540 assert_eq!(json["memory_bytes"], 1_048_576);
1541 }
1542
1543 #[test]
1544 fn child_process_info_serializes_no_memory() {
1545 let info = ChildProcessInfo {
1546 pid: 42,
1547 ppid: 1,
1548 name: "zombie".to_string(),
1549 memory_bytes: None,
1550 };
1551 let json = serde_json::to_value(&info).unwrap();
1552 assert!(json["memory_bytes"].is_null());
1553 }
1554}