1use sklears_core::{error::Result as SklResult, prelude::SklearsError};
8use std::collections::{HashMap, VecDeque};
9use std::fmt;
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, Instant, SystemTime};
12
13pub struct AdvancedPipelineDebugger {
15 sessions: Arc<RwLock<HashMap<String, DebugSession>>>,
16 global_config: DebugConfig,
17 event_log: Arc<Mutex<VecDeque<DebugEvent>>>,
18 profiler: Arc<Mutex<AdvancedProfiler>>,
19}
20
21impl AdvancedPipelineDebugger {
22 #[must_use]
23 pub fn new(config: DebugConfig) -> Self {
24 Self {
25 sessions: Arc::new(RwLock::new(HashMap::new())),
26 global_config: config,
27 event_log: Arc::new(Mutex::new(VecDeque::new())),
28 profiler: Arc::new(Mutex::new(AdvancedProfiler::new())),
29 }
30 }
31
32 pub fn start_session(
34 &self,
35 session_id: String,
36 pipeline_id: String,
37 ) -> SklResult<DebugSessionHandle> {
38 let session =
39 DebugSession::new(session_id.clone(), pipeline_id, self.global_config.clone());
40
41 {
42 let mut sessions = self.sessions.write().unwrap_or_else(|e| e.into_inner());
43 sessions.insert(session_id.clone(), session);
44 }
45
46 self.log_event(DebugEvent::SessionStarted {
47 session_id: session_id.clone(),
48 timestamp: SystemTime::now(),
49 })?;
50
51 Ok(DebugSessionHandle {
52 session_id,
53 debugger: self.clone(),
54 })
55 }
56
57 #[must_use]
59 pub fn get_session(&self, session_id: &str) -> Option<DebugSessionHandle> {
60 let sessions = self.sessions.read().unwrap_or_else(|e| e.into_inner());
61 if sessions.contains_key(session_id) {
62 Some(DebugSessionHandle {
63 session_id: session_id.to_string(),
64 debugger: self.clone(),
65 })
66 } else {
67 None
68 }
69 }
70
71 #[must_use]
73 pub fn list_sessions(&self) -> Vec<String> {
74 let sessions = self.sessions.read().unwrap_or_else(|e| e.into_inner());
75 sessions.keys().cloned().collect()
76 }
77
78 #[must_use]
80 pub fn get_debug_statistics(&self) -> DebugStatistics {
81 let sessions = self.sessions.read().unwrap_or_else(|e| e.into_inner());
82 let profiler = self.profiler.lock().unwrap_or_else(|e| e.into_inner());
83
84 DebugStatistics {
86 active_sessions: sessions.len(),
87 total_events: self
88 .event_log
89 .lock()
90 .unwrap_or_else(|e| e.into_inner())
91 .len(),
92 memory_usage: profiler.get_memory_usage(),
93 cpu_usage: profiler.get_cpu_usage(),
94 uptime: profiler.get_uptime(),
95 }
96 }
97
98 fn log_event(&self, event: DebugEvent) -> SklResult<()> {
99 let mut log = self.event_log.lock().unwrap_or_else(|e| e.into_inner());
100 log.push_back(event);
101
102 while log.len() > self.global_config.max_event_history {
104 log.pop_front();
105 }
106
107 Ok(())
108 }
109}
110
111impl Clone for AdvancedPipelineDebugger {
112 fn clone(&self) -> Self {
113 Self {
114 sessions: Arc::clone(&self.sessions),
115 global_config: self.global_config.clone(),
116 event_log: Arc::clone(&self.event_log),
117 profiler: Arc::clone(&self.profiler),
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct DebugConfig {
125 pub enable_step_by_step: bool,
126 pub enable_breakpoints: bool,
127 pub enable_profiling: bool,
128 pub enable_memory_tracking: bool,
129 pub max_event_history: usize,
130 pub auto_save_state: bool,
131 pub verbose_logging: bool,
132}
133
134impl Default for DebugConfig {
135 fn default() -> Self {
136 Self {
137 enable_step_by_step: true,
138 enable_breakpoints: true,
139 enable_profiling: true,
140 enable_memory_tracking: true,
141 max_event_history: 10000,
142 auto_save_state: true,
143 verbose_logging: false,
144 }
145 }
146}
147
148pub struct DebugSession {
150 pub id: String,
151 pub pipeline_id: String,
152 pub config: DebugConfig,
153 pub state: DebugSessionState,
154 pub breakpoints: Vec<Breakpoint>,
155 pub watch_expressions: Vec<WatchExpression>,
156 pub execution_history: Vec<ExecutionStep>,
157 pub current_step: usize,
158 pub variable_inspector: VariableInspector,
159 pub call_stack: Vec<CallStackFrame>,
160}
161
162impl DebugSession {
163 #[must_use]
164 pub fn new(id: String, pipeline_id: String, config: DebugConfig) -> Self {
165 Self {
166 id,
167 pipeline_id,
168 config,
169 state: DebugSessionState::Ready,
170 breakpoints: Vec::new(),
171 watch_expressions: Vec::new(),
172 execution_history: Vec::new(),
173 current_step: 0,
174 variable_inspector: VariableInspector::new(),
175 call_stack: Vec::new(),
176 }
177 }
178
179 pub fn add_breakpoint(&mut self, breakpoint: Breakpoint) {
181 self.breakpoints.push(breakpoint);
182 }
183
184 pub fn add_watch_expression(&mut self, expression: WatchExpression) {
186 self.watch_expressions.push(expression);
187 }
188
189 pub fn step_next(&mut self) -> SklResult<StepResult> {
191 if self.current_step < self.execution_history.len() {
192 let step = &self.execution_history[self.current_step];
193 self.current_step += 1;
194
195 if self.should_break_at_step(step) {
197 self.state = DebugSessionState::Paused;
198 return Ok(StepResult::BreakpointHit(step.clone()));
199 }
200
201 self.state = DebugSessionState::Stepping;
202 Ok(StepResult::Completed(step.clone()))
203 } else {
204 self.state = DebugSessionState::Finished;
205 Ok(StepResult::ExecutionComplete)
206 }
207 }
208
209 pub fn continue_execution(&mut self) -> SklResult<StepResult> {
211 self.state = DebugSessionState::Running;
212
213 while self.current_step < self.execution_history.len() {
214 let step = &self.execution_history[self.current_step];
215 self.current_step += 1;
216
217 if self.should_break_at_step(step) {
218 self.state = DebugSessionState::Paused;
219 return Ok(StepResult::BreakpointHit(step.clone()));
220 }
221 }
222
223 self.state = DebugSessionState::Finished;
224 Ok(StepResult::ExecutionComplete)
225 }
226
227 #[must_use]
229 pub fn evaluate_watch_expressions(&self) -> Vec<WatchResult> {
230 self.watch_expressions
231 .iter()
232 .map(|expr| self.evaluate_expression(expr))
233 .collect()
234 }
235
236 #[must_use]
238 pub fn get_variable_values(&self) -> HashMap<String, VariableValue> {
239 self.variable_inspector.get_all_variables()
240 }
241
242 #[must_use]
244 pub fn get_call_stack(&self) -> &[CallStackFrame] {
245 &self.call_stack
246 }
247
248 fn should_break_at_step(&self, step: &ExecutionStep) -> bool {
249 self.breakpoints.iter().any(|bp| bp.matches_step(step))
250 }
251
252 fn evaluate_expression(&self, expression: &WatchExpression) -> WatchResult {
253 WatchResult {
256 expression: expression.clone(),
257 value: format!("Evaluated: {}", expression.expression),
258 error: None,
259 timestamp: SystemTime::now(),
260 }
261 }
262}
263
264pub struct DebugSessionHandle {
266 session_id: String,
267 debugger: AdvancedPipelineDebugger,
268}
269
270impl DebugSessionHandle {
271 pub fn add_breakpoint(&self, breakpoint: Breakpoint) -> SklResult<()> {
273 let mut sessions = self
274 .debugger
275 .sessions
276 .write()
277 .unwrap_or_else(|e| e.into_inner());
278 if let Some(session) = sessions.get_mut(&self.session_id) {
279 session.add_breakpoint(breakpoint);
280 Ok(())
281 } else {
282 Err(SklearsError::InvalidState("Session not found".to_string()))
283 }
284 }
285
286 pub fn step_next(&self) -> SklResult<StepResult> {
288 let mut sessions = self
289 .debugger
290 .sessions
291 .write()
292 .unwrap_or_else(|e| e.into_inner());
293 if let Some(session) = sessions.get_mut(&self.session_id) {
294 session.step_next()
295 } else {
296 Err(SklearsError::InvalidState("Session not found".to_string()))
297 }
298 }
299
300 pub fn continue_execution(&self) -> SklResult<StepResult> {
302 let mut sessions = self
303 .debugger
304 .sessions
305 .write()
306 .unwrap_or_else(|e| e.into_inner());
307 if let Some(session) = sessions.get_mut(&self.session_id) {
308 session.continue_execution()
309 } else {
310 Err(SklearsError::InvalidState("Session not found".to_string()))
311 }
312 }
313
314 #[must_use]
316 pub fn get_state(&self) -> Option<DebugSessionState> {
317 let sessions = self
318 .debugger
319 .sessions
320 .read()
321 .unwrap_or_else(|e| e.into_inner());
322 sessions.get(&self.session_id).map(|s| s.state.clone())
323 }
324
325 #[must_use]
327 pub fn get_execution_history(&self) -> Vec<ExecutionStep> {
328 let sessions = self
329 .debugger
330 .sessions
331 .read()
332 .unwrap_or_else(|e| e.into_inner());
333 sessions
334 .get(&self.session_id)
335 .map(|s| s.execution_history.clone())
336 .unwrap_or_default()
337 }
338}
339
340#[derive(Debug, Clone, PartialEq)]
342pub enum DebugSessionState {
343 Ready,
345 Running,
347 Stepping,
349 Paused,
351 Finished,
353 Error(String),
355}
356
357#[derive(Debug, Clone)]
359pub struct ExecutionStep {
360 pub step_id: String,
361 pub component: String,
362 pub operation: String,
363 pub input_shape: Option<(usize, usize)>,
364 pub output_shape: Option<(usize, usize)>,
365 pub duration: Duration,
366 pub memory_delta: i64,
367 pub timestamp: SystemTime,
368 pub metadata: HashMap<String, String>,
369}
370
371#[derive(Debug, Clone)]
373pub struct Breakpoint {
374 pub id: String,
375 pub condition: BreakpointCondition,
376 pub enabled: bool,
377 pub hit_count: usize,
378 pub hit_limit: Option<usize>,
379}
380
381impl Breakpoint {
382 #[must_use]
383 pub fn new(id: String, condition: BreakpointCondition) -> Self {
384 Self {
385 id,
386 condition,
387 enabled: true,
388 hit_count: 0,
389 hit_limit: None,
390 }
391 }
392
393 #[must_use]
394 pub fn matches_step(&self, step: &ExecutionStep) -> bool {
395 if !self.enabled {
396 return false;
397 }
398
399 if let Some(limit) = self.hit_limit {
400 if self.hit_count >= limit {
401 return false;
402 }
403 }
404
405 self.condition.matches(step)
406 }
407}
408
409#[derive(Debug, Clone)]
411pub enum BreakpointCondition {
412 ComponentName(String),
414 OperationType(String),
416 StepId(String),
418 MemoryThreshold(i64),
420 DurationThreshold(Duration),
422 Custom(String), }
425
426impl BreakpointCondition {
427 #[must_use]
428 pub fn matches(&self, step: &ExecutionStep) -> bool {
429 match self {
430 BreakpointCondition::ComponentName(name) => step.component.contains(name),
431 BreakpointCondition::OperationType(op) => step.operation.contains(op),
432 BreakpointCondition::StepId(id) => step.step_id == *id,
433 BreakpointCondition::MemoryThreshold(threshold) => {
434 step.memory_delta.abs() >= *threshold
435 }
436 BreakpointCondition::DurationThreshold(threshold) => step.duration >= *threshold,
437 BreakpointCondition::Custom(_expr) => {
438 false
440 }
441 }
442 }
443}
444
445#[derive(Debug, Clone)]
447pub struct WatchExpression {
448 pub id: String,
449 pub expression: String,
450 pub description: String,
451 pub enabled: bool,
452}
453
454#[derive(Debug, Clone)]
456pub struct WatchResult {
457 pub expression: WatchExpression,
458 pub value: String,
459 pub error: Option<String>,
460 pub timestamp: SystemTime,
461}
462
463#[derive(Debug, Clone)]
465pub enum StepResult {
466 Completed(ExecutionStep),
468 BreakpointHit(ExecutionStep),
470 ExecutionComplete,
472 Error(String),
474}
475
476pub struct VariableInspector {
478 variables: HashMap<String, VariableValue>,
479 history: VecDeque<VariableSnapshot>,
480}
481
482impl Default for VariableInspector {
483 fn default() -> Self {
484 Self::new()
485 }
486}
487
488impl VariableInspector {
489 #[must_use]
490 pub fn new() -> Self {
491 Self {
492 variables: HashMap::new(),
493 history: VecDeque::new(),
494 }
495 }
496
497 pub fn set_variable(&mut self, name: String, value: VariableValue) {
498 self.variables.insert(name, value);
499 }
500
501 #[must_use]
502 pub fn get_variable(&self, name: &str) -> Option<&VariableValue> {
503 self.variables.get(name)
504 }
505
506 #[must_use]
507 pub fn get_all_variables(&self) -> HashMap<String, VariableValue> {
508 self.variables.clone()
509 }
510
511 pub fn take_snapshot(&mut self) -> String {
512 let snapshot = VariableSnapshot {
513 timestamp: SystemTime::now(),
514 variables: self.variables.clone(),
515 };
516
517 let snapshot_id = format!("snapshot_{}", self.history.len());
518 self.history.push_back(snapshot);
519
520 while self.history.len() > 100 {
522 self.history.pop_front();
523 }
524
525 snapshot_id
526 }
527}
528
529#[derive(Debug, Clone)]
531pub enum VariableValue {
532 Scalar(f64),
534 Array1D(Vec<f64>),
536 Array2D {
538 shape: (usize, usize),
539 data: Vec<f64>,
540 },
541 String(String),
543 Boolean(bool),
545 Object(String), }
548
549impl fmt::Display for VariableValue {
550 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
551 match self {
552 VariableValue::Scalar(v) => write!(f, "{v}"),
553 VariableValue::Array1D(v) => write!(f, "Array1D[{}]", v.len()),
554 VariableValue::Array2D { shape, .. } => write!(f, "Array2D[{}x{}]", shape.0, shape.1),
555 VariableValue::String(s) => write!(f, "\"{s}\""),
556 VariableValue::Boolean(b) => write!(f, "{b}"),
557 VariableValue::Object(repr) => write!(f, "{repr}"),
558 }
559 }
560}
561
562#[derive(Debug, Clone)]
564struct VariableSnapshot {
565 timestamp: SystemTime,
566 variables: HashMap<String, VariableValue>,
567}
568
569#[derive(Debug, Clone)]
571pub struct CallStackFrame {
572 pub function_name: String,
573 pub component: String,
574 pub file: Option<String>,
575 pub line: Option<usize>,
576 pub local_variables: HashMap<String, VariableValue>,
577}
578
579#[derive(Debug, Clone)]
581pub enum DebugEvent {
582 SessionStarted {
584 session_id: String,
585 timestamp: SystemTime,
586 },
587 SessionEnded {
589 session_id: String,
590 timestamp: SystemTime,
591 },
592 BreakpointHit {
594 session_id: String,
595 breakpoint_id: String,
596 step: ExecutionStep,
597 },
598 StepExecuted {
600 session_id: String,
601 step: ExecutionStep,
602 },
603 Error {
605 session_id: String,
606 error: String,
607 timestamp: SystemTime,
608 },
609}
610
611pub struct AdvancedProfiler {
613 start_time: Instant,
614 memory_samples: VecDeque<MemorySample>,
615 cpu_samples: VecDeque<CpuSample>,
616}
617
618impl Default for AdvancedProfiler {
619 fn default() -> Self {
620 Self::new()
621 }
622}
623
624impl AdvancedProfiler {
625 #[must_use]
626 pub fn new() -> Self {
627 Self {
628 start_time: Instant::now(),
629 memory_samples: VecDeque::new(),
630 cpu_samples: VecDeque::new(),
631 }
632 }
633
634 pub fn sample_memory(&mut self) {
635 let sample = MemorySample {
636 timestamp: Instant::now(),
637 used_bytes: Self::get_current_memory_usage(),
638 peak_bytes: Self::get_peak_memory_usage(),
639 };
640
641 self.memory_samples.push_back(sample);
642
643 while self.memory_samples.len() > 3600 {
645 self.memory_samples.pop_front();
646 }
647 }
648
649 #[must_use]
650 pub fn get_memory_usage(&self) -> usize {
651 Self::get_current_memory_usage()
652 }
653
654 #[must_use]
655 pub fn get_cpu_usage(&self) -> f64 {
656 0.0
658 }
659
660 #[must_use]
661 pub fn get_uptime(&self) -> Duration {
662 self.start_time.elapsed()
663 }
664
665 fn get_current_memory_usage() -> usize {
666 0
668 }
669
670 fn get_peak_memory_usage() -> usize {
671 0
673 }
674}
675
676#[derive(Debug, Clone)]
678pub struct MemorySample {
679 pub timestamp: Instant,
680 pub used_bytes: usize,
681 pub peak_bytes: usize,
682}
683
684#[derive(Debug, Clone)]
686pub struct CpuSample {
687 pub timestamp: Instant,
688 pub usage_percent: f64,
689}
690
691#[derive(Debug, Clone)]
693pub struct DebugStatistics {
694 pub active_sessions: usize,
695 pub total_events: usize,
696 pub memory_usage: usize,
697 pub cpu_usage: f64,
698 pub uptime: Duration,
699}
700
701pub mod interactive {
703 use super::{
704 AdvancedPipelineDebugger, Breakpoint, BreakpointCondition, DebugSessionState, Duration,
705 ExecutionStep, HashMap, SklResult, SklearsError, StepResult, SystemTime, VecDeque,
706 };
707
708 pub struct DebugConsole {
710 debugger: AdvancedPipelineDebugger,
711 current_session: Option<String>,
712 command_history: VecDeque<String>,
713 }
714
715 impl DebugConsole {
716 #[must_use]
717 pub fn new(debugger: AdvancedPipelineDebugger) -> Self {
718 Self {
719 debugger,
720 current_session: None,
721 command_history: VecDeque::new(),
722 }
723 }
724
725 pub fn execute_command(&mut self, command: &str) -> SklResult<String> {
727 self.command_history.push_back(command.to_string());
728
729 let parts: Vec<&str> = command.split_whitespace().collect();
730 if parts.is_empty() {
731 return Ok(String::new());
732 }
733
734 match parts[0] {
735 "help" => Ok(self.show_help()),
736 "sessions" => Ok(self.list_sessions()),
737 "use" => self.use_session(parts.get(1).copied()),
738 "breakpoint" => self.add_breakpoint(&parts[1..]),
739 "watch" => self.add_watch(&parts[1..]),
740 "step" => self.step_execution(),
741 "continue" => self.continue_execution(),
742 "vars" => self.show_variables(),
743 "stack" => self.show_call_stack(),
744 "stats" => Ok(self.show_statistics()),
745 "visualize" => self.visualize_pipeline(),
746 "replay" => self.replay_execution(),
747 "profile" => self.show_profiling_data(),
748 "timeline" => self.show_execution_timeline(),
749 _ => Ok(format!("Unknown command: {}", parts[0])),
750 }
751 }
752
753 fn show_help(&self) -> String {
754 r"Available commands:
755 help - Show this help message
756 sessions - List all debug sessions
757 use <session_id> - Switch to a session
758 breakpoint <condition> - Add a breakpoint
759 watch <expression> - Add a watch expression
760 step - Step to next execution point
761 continue - Continue execution
762 vars - Show current variables
763 stack - Show call stack
764 stats - Show debug statistics
765 visualize - Visualize current pipeline structure
766 replay - Replay pipeline execution
767 profile - Show profiling data
768 timeline - Show execution timeline"
769 .to_string()
770 }
771
772 fn list_sessions(&self) -> String {
773 let sessions = self.debugger.list_sessions();
774 if sessions.is_empty() {
775 "No active sessions".to_string()
776 } else {
777 format!("Active sessions: {}", sessions.join(", "))
778 }
779 }
780
781 fn use_session(&mut self, session_id: Option<&str>) -> SklResult<String> {
782 match session_id {
783 Some(id) => {
784 if self.debugger.get_session(id).is_some() {
785 self.current_session = Some(id.to_string());
786 Ok(format!("Switched to session: {id}"))
787 } else {
788 Ok(format!("Session not found: {id}"))
789 }
790 }
791 None => Ok("Usage: use <session_id>".to_string()),
792 }
793 }
794
795 fn add_breakpoint(&self, args: &[&str]) -> SklResult<String> {
796 if let Some(session_id) = &self.current_session {
797 if let Some(handle) = self.debugger.get_session(session_id) {
798 let condition = if args.is_empty() {
799 BreakpointCondition::ComponentName("*".to_string())
800 } else {
801 BreakpointCondition::ComponentName(args.join(" "))
802 };
803
804 let breakpoint = Breakpoint::new(
805 format!("bp_{}", chrono::Utc::now().timestamp()),
806 condition,
807 );
808
809 handle.add_breakpoint(breakpoint)?;
810 Ok("Breakpoint added".to_string())
811 } else {
812 Ok("Session not found".to_string())
813 }
814 } else {
815 Ok("No active session. Use 'use <session_id>' first.".to_string())
816 }
817 }
818
819 fn add_watch(&self, args: &[&str]) -> SklResult<String> {
820 if args.is_empty() {
821 return Ok("Usage: watch <expression>".to_string());
822 }
823
824 Ok(format!("Watch added: {}", args.join(" ")))
826 }
827
828 fn step_execution(&self) -> SklResult<String> {
829 if let Some(session_id) = &self.current_session {
830 if let Some(handle) = self.debugger.get_session(session_id) {
831 match handle.step_next()? {
832 StepResult::Completed(step) => {
833 Ok(format!("Stepped: {} -> {}", step.component, step.operation))
834 }
835 StepResult::BreakpointHit(step) => Ok(format!(
836 "Breakpoint hit at: {} -> {}",
837 step.component, step.operation
838 )),
839 StepResult::ExecutionComplete => Ok("Execution completed".to_string()),
840 StepResult::Error(err) => Ok(format!("Error: {err}")),
841 }
842 } else {
843 Ok("Session not found".to_string())
844 }
845 } else {
846 Ok("No active session".to_string())
847 }
848 }
849
850 fn continue_execution(&self) -> SklResult<String> {
851 if let Some(session_id) = &self.current_session {
852 if let Some(handle) = self.debugger.get_session(session_id) {
853 match handle.continue_execution()? {
854 StepResult::BreakpointHit(step) => Ok(format!(
855 "Breakpoint hit at: {} -> {}",
856 step.component, step.operation
857 )),
858 StepResult::ExecutionComplete => Ok("Execution completed".to_string()),
859 StepResult::Error(err) => Ok(format!("Error: {err}")),
860 _ => Ok("Continued execution".to_string()),
861 }
862 } else {
863 Ok("Session not found".to_string())
864 }
865 } else {
866 Ok("No active session".to_string())
867 }
868 }
869
870 fn show_variables(&self) -> SklResult<String> {
871 if let Some(session_id) = &self.current_session {
872 let sessions = self
873 .debugger
874 .sessions
875 .read()
876 .unwrap_or_else(|e| e.into_inner());
877 if let Some(session) = sessions.get(session_id) {
878 let variables = session.get_variable_values();
879 if variables.is_empty() {
880 Ok("No variables in current scope".to_string())
881 } else {
882 let mut output = String::from("Variables:\n");
883 for (name, value) in variables {
884 output.push_str(&format!(" {name} = {value}\n"));
885 }
886 Ok(output)
887 }
888 } else {
889 Ok("Session not found".to_string())
890 }
891 } else {
892 Ok("No active session".to_string())
893 }
894 }
895
896 fn show_call_stack(&self) -> SklResult<String> {
897 if let Some(session_id) = &self.current_session {
898 let sessions = self
899 .debugger
900 .sessions
901 .read()
902 .unwrap_or_else(|e| e.into_inner());
903 if let Some(session) = sessions.get(session_id) {
904 let stack = session.get_call_stack();
905 if stack.is_empty() {
906 Ok("Empty call stack".to_string())
907 } else {
908 let mut output = String::from("Call Stack:\n");
909 for (i, frame) in stack.iter().enumerate() {
910 output.push_str(&format!(
911 " #{}: {} in {} ({}:{})\n",
912 i,
913 frame.function_name,
914 frame.component,
915 frame.file.as_ref().unwrap_or(&"unknown".to_string()),
916 frame.line.unwrap_or(0)
917 ));
918 }
919 Ok(output)
920 }
921 } else {
922 Ok("Session not found".to_string())
923 }
924 } else {
925 Ok("No active session".to_string())
926 }
927 }
928
929 fn show_statistics(&self) -> String {
930 let stats = self.debugger.get_debug_statistics();
931 format!(
932 "Statistics:\n Active sessions: {}\n Total events: {}\n Memory usage: {} bytes\n CPU usage: {:.1}%\n Uptime: {:.2}s",
933 stats.active_sessions,
934 stats.total_events,
935 stats.memory_usage,
936 stats.cpu_usage,
937 stats.uptime.as_secs_f64()
938 )
939 }
940
941 fn visualize_pipeline(&self) -> SklResult<String> {
943 if let Some(session_id) = &self.current_session {
944 let sessions = self
945 .debugger
946 .sessions
947 .read()
948 .unwrap_or_else(|e| e.into_inner());
949 if let Some(session) = sessions.get(session_id) {
950 let mut visualization = String::from("Pipeline Visualization:\n");
951
952 if session.execution_history.is_empty() {
954 visualization.push_str(" No execution history available\n");
955 } else {
956 visualization.push_str(" ┌─ Start\n");
957 for (i, step) in session.execution_history.iter().enumerate() {
958 let marker = if i == session.current_step.saturating_sub(1) {
959 "►"
960 } else {
961 " "
962 };
963
964 visualization.push_str(&format!(
965 " │{} Step {}: {} -> {}\n",
966 marker,
967 i + 1,
968 step.component,
969 step.operation
970 ));
971 }
972 visualization.push_str(" └─ End\n");
973 }
974
975 Ok(visualization)
976 } else {
977 Ok("Session not found".to_string())
978 }
979 } else {
980 Ok("No active session".to_string())
981 }
982 }
983
984 fn replay_execution(&self) -> SklResult<String> {
986 if let Some(session_id) = &self.current_session {
987 let mut sessions = self
988 .debugger
989 .sessions
990 .write()
991 .unwrap_or_else(|e| e.into_inner());
992 if let Some(session) = sessions.get_mut(session_id) {
993 session.current_step = 0;
994 session.state = DebugSessionState::Ready;
995 Ok(
996 "Execution replay started. Use 'step' or 'continue' to proceed."
997 .to_string(),
998 )
999 } else {
1000 Ok("Session not found".to_string())
1001 }
1002 } else {
1003 Ok("No active session".to_string())
1004 }
1005 }
1006
1007 fn show_profiling_data(&self) -> SklResult<String> {
1009 if let Some(session_id) = &self.current_session {
1010 let sessions = self
1011 .debugger
1012 .sessions
1013 .read()
1014 .unwrap_or_else(|e| e.into_inner());
1015 if let Some(session) = sessions.get(session_id) {
1016 let mut output = String::from("Profiling Data:\n");
1017
1018 if session.execution_history.is_empty() {
1019 output.push_str(" No profiling data available\n");
1020 return Ok(output);
1021 }
1022
1023 let total_duration: Duration = session
1025 .execution_history
1026 .iter()
1027 .map(|step| step.duration)
1028 .sum();
1029
1030 let total_memory_delta: i64 = session
1031 .execution_history
1032 .iter()
1033 .map(|step| step.memory_delta)
1034 .sum();
1035
1036 output.push_str(&format!(
1037 " Total Duration: {:.2}ms\n",
1038 total_duration.as_millis()
1039 ));
1040 output.push_str(&format!(
1041 " Total Memory Delta: {total_memory_delta} bytes\n"
1042 ));
1043 output.push_str(&format!(
1044 " Average Step Duration: {:.2}ms\n",
1045 total_duration.as_millis() as f64 / session.execution_history.len() as f64
1046 ));
1047
1048 let mut sorted_steps = session.execution_history.clone();
1050 sorted_steps.sort_by(|a, b| b.duration.cmp(&a.duration));
1051
1052 output.push_str("\n Slowest Steps:\n");
1053 for (i, step) in sorted_steps.iter().take(5).enumerate() {
1054 output.push_str(&format!(
1055 " {}. {} -> {} ({:.2}ms)\n",
1056 i + 1,
1057 step.component,
1058 step.operation,
1059 step.duration.as_millis()
1060 ));
1061 }
1062
1063 Ok(output)
1064 } else {
1065 Ok("Session not found".to_string())
1066 }
1067 } else {
1068 Ok("No active session".to_string())
1069 }
1070 }
1071
1072 fn show_execution_timeline(&self) -> SklResult<String> {
1074 if let Some(session_id) = &self.current_session {
1075 let sessions = self
1076 .debugger
1077 .sessions
1078 .read()
1079 .unwrap_or_else(|e| e.into_inner());
1080 if let Some(session) = sessions.get(session_id) {
1081 let mut output = String::from("Execution Timeline:\n");
1082
1083 if session.execution_history.is_empty() {
1084 output.push_str(" No execution history available\n");
1085 return Ok(output);
1086 }
1087
1088 let start_time = session
1090 .execution_history
1091 .first()
1092 .map(|s| s.timestamp)
1093 .unwrap_or_else(std::time::SystemTime::now);
1094
1095 for (i, step) in session.execution_history.iter().enumerate() {
1096 let elapsed = step
1097 .timestamp
1098 .duration_since(start_time)
1099 .unwrap_or(Duration::ZERO);
1100
1101 let marker = if i == session.current_step.saturating_sub(1) {
1102 "►"
1103 } else {
1104 "•"
1105 };
1106
1107 output.push_str(&format!(
1108 " {:>8.2}ms {} {} -> {} ({:.1}ms, {} bytes)\n",
1109 elapsed.as_millis(),
1110 marker,
1111 step.component,
1112 step.operation,
1113 step.duration.as_millis(),
1114 step.memory_delta
1115 ));
1116 }
1117
1118 Ok(output)
1119 } else {
1120 Ok("Session not found".to_string())
1121 }
1122 } else {
1123 Ok("No active session".to_string())
1124 }
1125 }
1126 }
1127
1128 pub struct AdvancedBreakpointCondition {
1130 pub condition_type: AdvancedConditionType,
1131 pub parameters: HashMap<String, String>,
1132 }
1133
1134 #[derive(Debug, Clone)]
1135 pub enum AdvancedConditionType {
1136 MemoryThreshold { threshold_mb: f64 },
1138 DurationThreshold { threshold_ms: f64 },
1140 DataShape { expected_shape: (usize, usize) },
1142 ValuePattern { pattern: String },
1144 ErrorCondition { error_type: String },
1146 PerformanceDegradation { baseline_ratio: f64 },
1148 Custom { expression: String },
1150 }
1151
1152 impl AdvancedBreakpointCondition {
1153 #[must_use]
1154 pub fn matches(&self, step: &ExecutionStep) -> bool {
1155 match &self.condition_type {
1156 AdvancedConditionType::MemoryThreshold { threshold_mb } => {
1157 (step.memory_delta.abs() as f64 / 1024.0 / 1024.0) >= *threshold_mb
1158 }
1159 AdvancedConditionType::DurationThreshold { threshold_ms } => {
1160 step.duration.as_millis() as f64 >= *threshold_ms
1161 }
1162 AdvancedConditionType::DataShape { expected_shape } => {
1163 step.input_shape == Some(*expected_shape)
1164 || step.output_shape == Some(*expected_shape)
1165 }
1166 AdvancedConditionType::ValuePattern { pattern } => {
1167 step.metadata.values().any(|v| v.contains(pattern))
1168 }
1169 AdvancedConditionType::ErrorCondition { error_type } => {
1170 step.metadata.get("error_type") == Some(error_type)
1171 }
1172 AdvancedConditionType::PerformanceDegradation { baseline_ratio } => {
1173 false }
1176 AdvancedConditionType::Custom {
1177 expression: _expression,
1178 } => {
1179 false }
1182 }
1183 }
1184 }
1185
1186 pub struct PipelineReplayManager {
1188 recorded_executions: HashMap<String, Vec<ExecutionStep>>,
1189 current_replay: Option<String>,
1190 replay_position: usize,
1191 }
1192
1193 impl PipelineReplayManager {
1194 #[must_use]
1195 pub fn new() -> Self {
1196 Self {
1197 recorded_executions: HashMap::new(),
1198 current_replay: None,
1199 replay_position: 0,
1200 }
1201 }
1202
1203 pub fn record_execution(&mut self, pipeline_id: String, steps: Vec<ExecutionStep>) {
1205 self.recorded_executions.insert(pipeline_id, steps);
1206 }
1207
1208 pub fn start_replay(&mut self, pipeline_id: &str) -> SklResult<()> {
1210 if self.recorded_executions.contains_key(pipeline_id) {
1211 self.current_replay = Some(pipeline_id.to_string());
1212 self.replay_position = 0;
1213 Ok(())
1214 } else {
1215 Err(SklearsError::InvalidState(format!(
1216 "No recording found for pipeline: {pipeline_id}"
1217 )))
1218 }
1219 }
1220
1221 pub fn next_replay_step(&mut self) -> Option<&ExecutionStep> {
1223 if let Some(pipeline_id) = &self.current_replay {
1224 if let Some(steps) = self.recorded_executions.get(pipeline_id) {
1225 if self.replay_position < steps.len() {
1226 let step = &steps[self.replay_position];
1227 self.replay_position += 1;
1228 return Some(step);
1229 }
1230 }
1231 }
1232 None
1233 }
1234
1235 pub fn seek_replay(&mut self, position: usize) -> SklResult<()> {
1237 if let Some(pipeline_id) = &self.current_replay {
1238 if let Some(steps) = self.recorded_executions.get(pipeline_id) {
1239 if position < steps.len() {
1240 self.replay_position = position;
1241 return Ok(());
1242 }
1243 }
1244 }
1245 Err(SklearsError::InvalidState(
1246 "Invalid replay position".to_string(),
1247 ))
1248 }
1249
1250 #[must_use]
1252 pub fn get_replay_stats(&self) -> Option<ReplayStatistics> {
1253 if let Some(pipeline_id) = &self.current_replay {
1254 if let Some(steps) = self.recorded_executions.get(pipeline_id) {
1255 return Some(ReplayStatistics {
1256 total_steps: steps.len(),
1257 current_position: self.replay_position,
1258 total_duration: steps.iter().map(|s| s.duration).sum(),
1259 total_memory_usage: steps.iter().map(|s| s.memory_delta).sum(),
1260 });
1261 }
1262 }
1263 None
1264 }
1265 }
1266
1267 #[derive(Debug, Clone)]
1269 pub struct ReplayStatistics {
1270 pub total_steps: usize,
1271 pub current_position: usize,
1272 pub total_duration: Duration,
1273 pub total_memory_usage: i64,
1274 }
1275
1276 impl Default for PipelineReplayManager {
1277 fn default() -> Self {
1278 Self::new()
1279 }
1280 }
1281
1282 pub struct BottleneckDetector {
1284 execution_profiles: HashMap<String, Vec<ExecutionStep>>,
1285 performance_baselines: HashMap<String, PerformanceBaseline>,
1286 }
1287
1288 impl BottleneckDetector {
1289 #[must_use]
1290 pub fn new() -> Self {
1291 Self {
1292 execution_profiles: HashMap::new(),
1293 performance_baselines: HashMap::new(),
1294 }
1295 }
1296
1297 pub fn add_profile(&mut self, pipeline_id: String, steps: Vec<ExecutionStep>) {
1299 self.execution_profiles.insert(pipeline_id, steps);
1300 }
1301
1302 #[must_use]
1304 pub fn detect_bottlenecks(&self, pipeline_id: &str) -> Vec<BottleneckReport> {
1305 let mut bottlenecks = Vec::new();
1306
1307 if let Some(steps) = self.execution_profiles.get(pipeline_id) {
1308 let total_duration: Duration = steps.iter().map(|s| s.duration).sum();
1310 let average_duration = total_duration / steps.len() as u32;
1311
1312 for step in steps {
1313 if step.duration > average_duration * 3 {
1314 bottlenecks.push(BottleneckReport {
1315 bottleneck_type: BottleneckType::Performance,
1316 component: step.component.clone(),
1317 severity: if step.duration > average_duration * 5 {
1318 BottleneckSeverity::Critical
1319 } else {
1320 BottleneckSeverity::Major
1321 },
1322 description: format!(
1323 "Step duration ({:.2}ms) significantly exceeds average ({:.2}ms)",
1324 step.duration.as_millis(),
1325 average_duration.as_millis()
1326 ),
1327 suggested_actions: vec![
1328 "Profile individual operations within this component".to_string(),
1329 "Consider algorithm optimization".to_string(),
1330 "Check for inefficient data structures".to_string(),
1331 ],
1332 });
1333 }
1334 }
1335
1336 for step in steps {
1338 if step.memory_delta.abs() > 100 * 1024 * 1024 {
1339 bottlenecks.push(BottleneckReport {
1341 bottleneck_type: BottleneckType::Memory,
1342 component: step.component.clone(),
1343 severity: if step.memory_delta.abs() > 500 * 1024 * 1024 {
1344 BottleneckSeverity::Critical
1345 } else {
1346 BottleneckSeverity::Major
1347 },
1348 description: format!(
1349 "Large memory allocation/deallocation: {} MB",
1350 step.memory_delta / 1024 / 1024
1351 ),
1352 suggested_actions: vec![
1353 "Review memory allocation patterns".to_string(),
1354 "Consider streaming or chunked processing".to_string(),
1355 "Implement memory pooling".to_string(),
1356 ],
1357 });
1358 }
1359 }
1360 }
1361
1362 bottlenecks
1363 }
1364
1365 pub fn set_baseline(&mut self, pipeline_id: String, baseline: PerformanceBaseline) {
1367 self.performance_baselines.insert(pipeline_id, baseline);
1368 }
1369 }
1370
1371 #[derive(Debug, Clone)]
1373 pub struct BottleneckReport {
1374 pub bottleneck_type: BottleneckType,
1375 pub component: String,
1376 pub severity: BottleneckSeverity,
1377 pub description: String,
1378 pub suggested_actions: Vec<String>,
1379 }
1380
1381 #[derive(Debug, Clone, PartialEq)]
1383 pub enum BottleneckType {
1384 Performance,
1386 Memory,
1388 IO,
1390 Synchronization,
1392 }
1393
1394 #[derive(Debug, Clone, PartialEq, PartialOrd)]
1396 pub enum BottleneckSeverity {
1397 Minor,
1399 Major,
1401 Critical,
1403 }
1404
1405 #[derive(Debug, Clone)]
1407 pub struct PerformanceBaseline {
1408 pub average_duration: Duration,
1409 pub average_memory_usage: i64,
1410 pub step_count: usize,
1411 pub recorded_at: SystemTime,
1412 }
1413
1414 impl Default for BottleneckDetector {
1415 fn default() -> Self {
1416 Self::new()
1417 }
1418 }
1419}
1420
1421#[allow(non_snake_case)]
1422#[cfg(test)]
1423mod tests {
1424 use super::*;
1425
1426 #[test]
1427 fn test_debug_session_creation() {
1428 let config = DebugConfig::default();
1429 let debugger = AdvancedPipelineDebugger::new(config);
1430
1431 let handle = debugger
1432 .start_session("test_session".to_string(), "test_pipeline".to_string())
1433 .expect("operation should succeed");
1434
1435 assert_eq!(debugger.list_sessions().len(), 1);
1436 assert!(debugger.get_session("test_session").is_some());
1437 }
1438
1439 #[test]
1440 fn test_breakpoint_creation() {
1441 let breakpoint = Breakpoint::new(
1442 "test_bp".to_string(),
1443 BreakpointCondition::ComponentName("TestComponent".to_string()),
1444 );
1445
1446 assert_eq!(breakpoint.id, "test_bp");
1447 assert!(breakpoint.enabled);
1448 assert_eq!(breakpoint.hit_count, 0);
1449 }
1450
1451 #[test]
1452 fn test_watch_expression() {
1453 let watch = WatchExpression {
1454 id: "test_watch".to_string(),
1455 expression: "x + y".to_string(),
1456 description: "Sum of x and y".to_string(),
1457 enabled: true,
1458 };
1459
1460 assert_eq!(watch.expression, "x + y");
1461 assert!(watch.enabled);
1462 }
1463
1464 #[test]
1465 fn test_variable_inspector() {
1466 let mut inspector = VariableInspector::new();
1467
1468 inspector.set_variable("test_var".to_string(), VariableValue::Scalar(42.0));
1469
1470 let value = inspector
1471 .get_variable("test_var")
1472 .expect("operation should succeed");
1473 match value {
1474 VariableValue::Scalar(v) => assert_eq!(*v, 42.0),
1475 _ => panic!("Expected scalar value"),
1476 }
1477 }
1478}