1use sklears_core::{error::Result as SklResult, prelude::SklearsError};
8use std::collections::{HashMap, VecDeque};
9use std::fmt;
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, Instant, SystemTime};
12
13pub struct AdvancedPipelineDebugger {
15 sessions: Arc<RwLock<HashMap<String, DebugSession>>>,
16 global_config: DebugConfig,
17 event_log: Arc<Mutex<VecDeque<DebugEvent>>>,
18 profiler: Arc<Mutex<AdvancedProfiler>>,
19}
20
21impl AdvancedPipelineDebugger {
22 #[must_use]
23 pub fn new(config: DebugConfig) -> Self {
24 Self {
25 sessions: Arc::new(RwLock::new(HashMap::new())),
26 global_config: config,
27 event_log: Arc::new(Mutex::new(VecDeque::new())),
28 profiler: Arc::new(Mutex::new(AdvancedProfiler::new())),
29 }
30 }
31
32 pub fn start_session(
34 &self,
35 session_id: String,
36 pipeline_id: String,
37 ) -> SklResult<DebugSessionHandle> {
38 let session =
39 DebugSession::new(session_id.clone(), pipeline_id, self.global_config.clone());
40
41 {
42 let mut sessions = self.sessions.write().unwrap();
43 sessions.insert(session_id.clone(), session);
44 }
45
46 self.log_event(DebugEvent::SessionStarted {
47 session_id: session_id.clone(),
48 timestamp: SystemTime::now(),
49 })?;
50
51 Ok(DebugSessionHandle {
52 session_id,
53 debugger: self.clone(),
54 })
55 }
56
57 #[must_use]
59 pub fn get_session(&self, session_id: &str) -> Option<DebugSessionHandle> {
60 let sessions = self.sessions.read().unwrap();
61 if sessions.contains_key(session_id) {
62 Some(DebugSessionHandle {
63 session_id: session_id.to_string(),
64 debugger: self.clone(),
65 })
66 } else {
67 None
68 }
69 }
70
71 #[must_use]
73 pub fn list_sessions(&self) -> Vec<String> {
74 let sessions = self.sessions.read().unwrap();
75 sessions.keys().cloned().collect()
76 }
77
78 #[must_use]
80 pub fn get_debug_statistics(&self) -> DebugStatistics {
81 let sessions = self.sessions.read().unwrap();
82 let profiler = self.profiler.lock().unwrap();
83
84 DebugStatistics {
86 active_sessions: sessions.len(),
87 total_events: self.event_log.lock().unwrap().len(),
88 memory_usage: profiler.get_memory_usage(),
89 cpu_usage: profiler.get_cpu_usage(),
90 uptime: profiler.get_uptime(),
91 }
92 }
93
94 fn log_event(&self, event: DebugEvent) -> SklResult<()> {
95 let mut log = self.event_log.lock().unwrap();
96 log.push_back(event);
97
98 while log.len() > self.global_config.max_event_history {
100 log.pop_front();
101 }
102
103 Ok(())
104 }
105}
106
107impl Clone for AdvancedPipelineDebugger {
108 fn clone(&self) -> Self {
109 Self {
110 sessions: Arc::clone(&self.sessions),
111 global_config: self.global_config.clone(),
112 event_log: Arc::clone(&self.event_log),
113 profiler: Arc::clone(&self.profiler),
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct DebugConfig {
121 pub enable_step_by_step: bool,
122 pub enable_breakpoints: bool,
123 pub enable_profiling: bool,
124 pub enable_memory_tracking: bool,
125 pub max_event_history: usize,
126 pub auto_save_state: bool,
127 pub verbose_logging: bool,
128}
129
130impl Default for DebugConfig {
131 fn default() -> Self {
132 Self {
133 enable_step_by_step: true,
134 enable_breakpoints: true,
135 enable_profiling: true,
136 enable_memory_tracking: true,
137 max_event_history: 10000,
138 auto_save_state: true,
139 verbose_logging: false,
140 }
141 }
142}
143
144pub struct DebugSession {
146 pub id: String,
147 pub pipeline_id: String,
148 pub config: DebugConfig,
149 pub state: DebugSessionState,
150 pub breakpoints: Vec<Breakpoint>,
151 pub watch_expressions: Vec<WatchExpression>,
152 pub execution_history: Vec<ExecutionStep>,
153 pub current_step: usize,
154 pub variable_inspector: VariableInspector,
155 pub call_stack: Vec<CallStackFrame>,
156}
157
158impl DebugSession {
159 #[must_use]
160 pub fn new(id: String, pipeline_id: String, config: DebugConfig) -> Self {
161 Self {
162 id,
163 pipeline_id,
164 config,
165 state: DebugSessionState::Ready,
166 breakpoints: Vec::new(),
167 watch_expressions: Vec::new(),
168 execution_history: Vec::new(),
169 current_step: 0,
170 variable_inspector: VariableInspector::new(),
171 call_stack: Vec::new(),
172 }
173 }
174
175 pub fn add_breakpoint(&mut self, breakpoint: Breakpoint) {
177 self.breakpoints.push(breakpoint);
178 }
179
180 pub fn add_watch_expression(&mut self, expression: WatchExpression) {
182 self.watch_expressions.push(expression);
183 }
184
185 pub fn step_next(&mut self) -> SklResult<StepResult> {
187 if self.current_step < self.execution_history.len() {
188 let step = &self.execution_history[self.current_step];
189 self.current_step += 1;
190
191 if self.should_break_at_step(step) {
193 self.state = DebugSessionState::Paused;
194 return Ok(StepResult::BreakpointHit(step.clone()));
195 }
196
197 self.state = DebugSessionState::Stepping;
198 Ok(StepResult::Completed(step.clone()))
199 } else {
200 self.state = DebugSessionState::Finished;
201 Ok(StepResult::ExecutionComplete)
202 }
203 }
204
205 pub fn continue_execution(&mut self) -> SklResult<StepResult> {
207 self.state = DebugSessionState::Running;
208
209 while self.current_step < self.execution_history.len() {
210 let step = &self.execution_history[self.current_step];
211 self.current_step += 1;
212
213 if self.should_break_at_step(step) {
214 self.state = DebugSessionState::Paused;
215 return Ok(StepResult::BreakpointHit(step.clone()));
216 }
217 }
218
219 self.state = DebugSessionState::Finished;
220 Ok(StepResult::ExecutionComplete)
221 }
222
223 #[must_use]
225 pub fn evaluate_watch_expressions(&self) -> Vec<WatchResult> {
226 self.watch_expressions
227 .iter()
228 .map(|expr| self.evaluate_expression(expr))
229 .collect()
230 }
231
232 #[must_use]
234 pub fn get_variable_values(&self) -> HashMap<String, VariableValue> {
235 self.variable_inspector.get_all_variables()
236 }
237
238 #[must_use]
240 pub fn get_call_stack(&self) -> &[CallStackFrame] {
241 &self.call_stack
242 }
243
244 fn should_break_at_step(&self, step: &ExecutionStep) -> bool {
245 self.breakpoints.iter().any(|bp| bp.matches_step(step))
246 }
247
248 fn evaluate_expression(&self, expression: &WatchExpression) -> WatchResult {
249 WatchResult {
252 expression: expression.clone(),
253 value: format!("Evaluated: {}", expression.expression),
254 error: None,
255 timestamp: SystemTime::now(),
256 }
257 }
258}
259
260pub struct DebugSessionHandle {
262 session_id: String,
263 debugger: AdvancedPipelineDebugger,
264}
265
266impl DebugSessionHandle {
267 pub fn add_breakpoint(&self, breakpoint: Breakpoint) -> SklResult<()> {
269 let mut sessions = self.debugger.sessions.write().unwrap();
270 if let Some(session) = sessions.get_mut(&self.session_id) {
271 session.add_breakpoint(breakpoint);
272 Ok(())
273 } else {
274 Err(SklearsError::InvalidState("Session not found".to_string()))
275 }
276 }
277
278 pub fn step_next(&self) -> SklResult<StepResult> {
280 let mut sessions = self.debugger.sessions.write().unwrap();
281 if let Some(session) = sessions.get_mut(&self.session_id) {
282 session.step_next()
283 } else {
284 Err(SklearsError::InvalidState("Session not found".to_string()))
285 }
286 }
287
288 pub fn continue_execution(&self) -> SklResult<StepResult> {
290 let mut sessions = self.debugger.sessions.write().unwrap();
291 if let Some(session) = sessions.get_mut(&self.session_id) {
292 session.continue_execution()
293 } else {
294 Err(SklearsError::InvalidState("Session not found".to_string()))
295 }
296 }
297
298 #[must_use]
300 pub fn get_state(&self) -> Option<DebugSessionState> {
301 let sessions = self.debugger.sessions.read().unwrap();
302 sessions.get(&self.session_id).map(|s| s.state.clone())
303 }
304
305 #[must_use]
307 pub fn get_execution_history(&self) -> Vec<ExecutionStep> {
308 let sessions = self.debugger.sessions.read().unwrap();
309 sessions
310 .get(&self.session_id)
311 .map(|s| s.execution_history.clone())
312 .unwrap_or_default()
313 }
314}
315
316#[derive(Debug, Clone, PartialEq)]
318pub enum DebugSessionState {
319 Ready,
321 Running,
323 Stepping,
325 Paused,
327 Finished,
329 Error(String),
331}
332
333#[derive(Debug, Clone)]
335pub struct ExecutionStep {
336 pub step_id: String,
337 pub component: String,
338 pub operation: String,
339 pub input_shape: Option<(usize, usize)>,
340 pub output_shape: Option<(usize, usize)>,
341 pub duration: Duration,
342 pub memory_delta: i64,
343 pub timestamp: SystemTime,
344 pub metadata: HashMap<String, String>,
345}
346
347#[derive(Debug, Clone)]
349pub struct Breakpoint {
350 pub id: String,
351 pub condition: BreakpointCondition,
352 pub enabled: bool,
353 pub hit_count: usize,
354 pub hit_limit: Option<usize>,
355}
356
357impl Breakpoint {
358 #[must_use]
359 pub fn new(id: String, condition: BreakpointCondition) -> Self {
360 Self {
361 id,
362 condition,
363 enabled: true,
364 hit_count: 0,
365 hit_limit: None,
366 }
367 }
368
369 #[must_use]
370 pub fn matches_step(&self, step: &ExecutionStep) -> bool {
371 if !self.enabled {
372 return false;
373 }
374
375 if let Some(limit) = self.hit_limit {
376 if self.hit_count >= limit {
377 return false;
378 }
379 }
380
381 self.condition.matches(step)
382 }
383}
384
385#[derive(Debug, Clone)]
387pub enum BreakpointCondition {
388 ComponentName(String),
390 OperationType(String),
392 StepId(String),
394 MemoryThreshold(i64),
396 DurationThreshold(Duration),
398 Custom(String), }
401
402impl BreakpointCondition {
403 #[must_use]
404 pub fn matches(&self, step: &ExecutionStep) -> bool {
405 match self {
406 BreakpointCondition::ComponentName(name) => step.component.contains(name),
407 BreakpointCondition::OperationType(op) => step.operation.contains(op),
408 BreakpointCondition::StepId(id) => step.step_id == *id,
409 BreakpointCondition::MemoryThreshold(threshold) => {
410 step.memory_delta.abs() >= *threshold
411 }
412 BreakpointCondition::DurationThreshold(threshold) => step.duration >= *threshold,
413 BreakpointCondition::Custom(_expr) => {
414 false
416 }
417 }
418 }
419}
420
421#[derive(Debug, Clone)]
423pub struct WatchExpression {
424 pub id: String,
425 pub expression: String,
426 pub description: String,
427 pub enabled: bool,
428}
429
430#[derive(Debug, Clone)]
432pub struct WatchResult {
433 pub expression: WatchExpression,
434 pub value: String,
435 pub error: Option<String>,
436 pub timestamp: SystemTime,
437}
438
439#[derive(Debug, Clone)]
441pub enum StepResult {
442 Completed(ExecutionStep),
444 BreakpointHit(ExecutionStep),
446 ExecutionComplete,
448 Error(String),
450}
451
452pub struct VariableInspector {
454 variables: HashMap<String, VariableValue>,
455 history: VecDeque<VariableSnapshot>,
456}
457
458impl Default for VariableInspector {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464impl VariableInspector {
465 #[must_use]
466 pub fn new() -> Self {
467 Self {
468 variables: HashMap::new(),
469 history: VecDeque::new(),
470 }
471 }
472
473 pub fn set_variable(&mut self, name: String, value: VariableValue) {
474 self.variables.insert(name, value);
475 }
476
477 #[must_use]
478 pub fn get_variable(&self, name: &str) -> Option<&VariableValue> {
479 self.variables.get(name)
480 }
481
482 #[must_use]
483 pub fn get_all_variables(&self) -> HashMap<String, VariableValue> {
484 self.variables.clone()
485 }
486
487 pub fn take_snapshot(&mut self) -> String {
488 let snapshot = VariableSnapshot {
489 timestamp: SystemTime::now(),
490 variables: self.variables.clone(),
491 };
492
493 let snapshot_id = format!("snapshot_{}", self.history.len());
494 self.history.push_back(snapshot);
495
496 while self.history.len() > 100 {
498 self.history.pop_front();
499 }
500
501 snapshot_id
502 }
503}
504
505#[derive(Debug, Clone)]
507pub enum VariableValue {
508 Scalar(f64),
510 Array1D(Vec<f64>),
512 Array2D {
514 shape: (usize, usize),
515 data: Vec<f64>,
516 },
517 String(String),
519 Boolean(bool),
521 Object(String), }
524
525impl fmt::Display for VariableValue {
526 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
527 match self {
528 VariableValue::Scalar(v) => write!(f, "{v}"),
529 VariableValue::Array1D(v) => write!(f, "Array1D[{}]", v.len()),
530 VariableValue::Array2D { shape, .. } => write!(f, "Array2D[{}x{}]", shape.0, shape.1),
531 VariableValue::String(s) => write!(f, "\"{s}\""),
532 VariableValue::Boolean(b) => write!(f, "{b}"),
533 VariableValue::Object(repr) => write!(f, "{repr}"),
534 }
535 }
536}
537
538#[derive(Debug, Clone)]
540struct VariableSnapshot {
541 timestamp: SystemTime,
542 variables: HashMap<String, VariableValue>,
543}
544
545#[derive(Debug, Clone)]
547pub struct CallStackFrame {
548 pub function_name: String,
549 pub component: String,
550 pub file: Option<String>,
551 pub line: Option<usize>,
552 pub local_variables: HashMap<String, VariableValue>,
553}
554
555#[derive(Debug, Clone)]
557pub enum DebugEvent {
558 SessionStarted {
560 session_id: String,
561 timestamp: SystemTime,
562 },
563 SessionEnded {
565 session_id: String,
566 timestamp: SystemTime,
567 },
568 BreakpointHit {
570 session_id: String,
571 breakpoint_id: String,
572 step: ExecutionStep,
573 },
574 StepExecuted {
576 session_id: String,
577 step: ExecutionStep,
578 },
579 Error {
581 session_id: String,
582 error: String,
583 timestamp: SystemTime,
584 },
585}
586
587pub struct AdvancedProfiler {
589 start_time: Instant,
590 memory_samples: VecDeque<MemorySample>,
591 cpu_samples: VecDeque<CpuSample>,
592}
593
594impl Default for AdvancedProfiler {
595 fn default() -> Self {
596 Self::new()
597 }
598}
599
600impl AdvancedProfiler {
601 #[must_use]
602 pub fn new() -> Self {
603 Self {
604 start_time: Instant::now(),
605 memory_samples: VecDeque::new(),
606 cpu_samples: VecDeque::new(),
607 }
608 }
609
610 pub fn sample_memory(&mut self) {
611 let sample = MemorySample {
612 timestamp: Instant::now(),
613 used_bytes: Self::get_current_memory_usage(),
614 peak_bytes: Self::get_peak_memory_usage(),
615 };
616
617 self.memory_samples.push_back(sample);
618
619 while self.memory_samples.len() > 3600 {
621 self.memory_samples.pop_front();
622 }
623 }
624
625 #[must_use]
626 pub fn get_memory_usage(&self) -> usize {
627 Self::get_current_memory_usage()
628 }
629
630 #[must_use]
631 pub fn get_cpu_usage(&self) -> f64 {
632 0.0
634 }
635
636 #[must_use]
637 pub fn get_uptime(&self) -> Duration {
638 self.start_time.elapsed()
639 }
640
641 fn get_current_memory_usage() -> usize {
642 0
644 }
645
646 fn get_peak_memory_usage() -> usize {
647 0
649 }
650}
651
652#[derive(Debug, Clone)]
654pub struct MemorySample {
655 pub timestamp: Instant,
656 pub used_bytes: usize,
657 pub peak_bytes: usize,
658}
659
660#[derive(Debug, Clone)]
662pub struct CpuSample {
663 pub timestamp: Instant,
664 pub usage_percent: f64,
665}
666
667#[derive(Debug, Clone)]
669pub struct DebugStatistics {
670 pub active_sessions: usize,
671 pub total_events: usize,
672 pub memory_usage: usize,
673 pub cpu_usage: f64,
674 pub uptime: Duration,
675}
676
677pub mod interactive {
679 use super::{
680 AdvancedPipelineDebugger, Breakpoint, BreakpointCondition, DebugSessionState, Duration,
681 ExecutionStep, HashMap, SklResult, SklearsError, StepResult, SystemTime, VecDeque,
682 };
683
684 pub struct DebugConsole {
686 debugger: AdvancedPipelineDebugger,
687 current_session: Option<String>,
688 command_history: VecDeque<String>,
689 }
690
691 impl DebugConsole {
692 #[must_use]
693 pub fn new(debugger: AdvancedPipelineDebugger) -> Self {
694 Self {
695 debugger,
696 current_session: None,
697 command_history: VecDeque::new(),
698 }
699 }
700
701 pub fn execute_command(&mut self, command: &str) -> SklResult<String> {
703 self.command_history.push_back(command.to_string());
704
705 let parts: Vec<&str> = command.split_whitespace().collect();
706 if parts.is_empty() {
707 return Ok(String::new());
708 }
709
710 match parts[0] {
711 "help" => Ok(self.show_help()),
712 "sessions" => Ok(self.list_sessions()),
713 "use" => self.use_session(parts.get(1).copied()),
714 "breakpoint" => self.add_breakpoint(&parts[1..]),
715 "watch" => self.add_watch(&parts[1..]),
716 "step" => self.step_execution(),
717 "continue" => self.continue_execution(),
718 "vars" => self.show_variables(),
719 "stack" => self.show_call_stack(),
720 "stats" => Ok(self.show_statistics()),
721 "visualize" => self.visualize_pipeline(),
722 "replay" => self.replay_execution(),
723 "profile" => self.show_profiling_data(),
724 "timeline" => self.show_execution_timeline(),
725 _ => Ok(format!("Unknown command: {}", parts[0])),
726 }
727 }
728
729 fn show_help(&self) -> String {
730 r"Available commands:
731 help - Show this help message
732 sessions - List all debug sessions
733 use <session_id> - Switch to a session
734 breakpoint <condition> - Add a breakpoint
735 watch <expression> - Add a watch expression
736 step - Step to next execution point
737 continue - Continue execution
738 vars - Show current variables
739 stack - Show call stack
740 stats - Show debug statistics
741 visualize - Visualize current pipeline structure
742 replay - Replay pipeline execution
743 profile - Show profiling data
744 timeline - Show execution timeline"
745 .to_string()
746 }
747
748 fn list_sessions(&self) -> String {
749 let sessions = self.debugger.list_sessions();
750 if sessions.is_empty() {
751 "No active sessions".to_string()
752 } else {
753 format!("Active sessions: {}", sessions.join(", "))
754 }
755 }
756
757 fn use_session(&mut self, session_id: Option<&str>) -> SklResult<String> {
758 match session_id {
759 Some(id) => {
760 if self.debugger.get_session(id).is_some() {
761 self.current_session = Some(id.to_string());
762 Ok(format!("Switched to session: {id}"))
763 } else {
764 Ok(format!("Session not found: {id}"))
765 }
766 }
767 None => Ok("Usage: use <session_id>".to_string()),
768 }
769 }
770
771 fn add_breakpoint(&self, args: &[&str]) -> SklResult<String> {
772 if let Some(session_id) = &self.current_session {
773 if let Some(handle) = self.debugger.get_session(session_id) {
774 let condition = if args.is_empty() {
775 BreakpointCondition::ComponentName("*".to_string())
776 } else {
777 BreakpointCondition::ComponentName(args.join(" "))
778 };
779
780 let breakpoint = Breakpoint::new(
781 format!("bp_{}", chrono::Utc::now().timestamp()),
782 condition,
783 );
784
785 handle.add_breakpoint(breakpoint)?;
786 Ok("Breakpoint added".to_string())
787 } else {
788 Ok("Session not found".to_string())
789 }
790 } else {
791 Ok("No active session. Use 'use <session_id>' first.".to_string())
792 }
793 }
794
795 fn add_watch(&self, args: &[&str]) -> SklResult<String> {
796 if args.is_empty() {
797 return Ok("Usage: watch <expression>".to_string());
798 }
799
800 Ok(format!("Watch added: {}", args.join(" ")))
802 }
803
804 fn step_execution(&self) -> SklResult<String> {
805 if let Some(session_id) = &self.current_session {
806 if let Some(handle) = self.debugger.get_session(session_id) {
807 match handle.step_next()? {
808 StepResult::Completed(step) => {
809 Ok(format!("Stepped: {} -> {}", step.component, step.operation))
810 }
811 StepResult::BreakpointHit(step) => Ok(format!(
812 "Breakpoint hit at: {} -> {}",
813 step.component, step.operation
814 )),
815 StepResult::ExecutionComplete => Ok("Execution completed".to_string()),
816 StepResult::Error(err) => Ok(format!("Error: {err}")),
817 }
818 } else {
819 Ok("Session not found".to_string())
820 }
821 } else {
822 Ok("No active session".to_string())
823 }
824 }
825
826 fn continue_execution(&self) -> SklResult<String> {
827 if let Some(session_id) = &self.current_session {
828 if let Some(handle) = self.debugger.get_session(session_id) {
829 match handle.continue_execution()? {
830 StepResult::BreakpointHit(step) => Ok(format!(
831 "Breakpoint hit at: {} -> {}",
832 step.component, step.operation
833 )),
834 StepResult::ExecutionComplete => Ok("Execution completed".to_string()),
835 StepResult::Error(err) => Ok(format!("Error: {err}")),
836 _ => Ok("Continued execution".to_string()),
837 }
838 } else {
839 Ok("Session not found".to_string())
840 }
841 } else {
842 Ok("No active session".to_string())
843 }
844 }
845
846 fn show_variables(&self) -> SklResult<String> {
847 if let Some(session_id) = &self.current_session {
848 let sessions = self.debugger.sessions.read().unwrap();
849 if let Some(session) = sessions.get(session_id) {
850 let variables = session.get_variable_values();
851 if variables.is_empty() {
852 Ok("No variables in current scope".to_string())
853 } else {
854 let mut output = String::from("Variables:\n");
855 for (name, value) in variables {
856 output.push_str(&format!(" {name} = {value}\n"));
857 }
858 Ok(output)
859 }
860 } else {
861 Ok("Session not found".to_string())
862 }
863 } else {
864 Ok("No active session".to_string())
865 }
866 }
867
868 fn show_call_stack(&self) -> SklResult<String> {
869 if let Some(session_id) = &self.current_session {
870 let sessions = self.debugger.sessions.read().unwrap();
871 if let Some(session) = sessions.get(session_id) {
872 let stack = session.get_call_stack();
873 if stack.is_empty() {
874 Ok("Empty call stack".to_string())
875 } else {
876 let mut output = String::from("Call Stack:\n");
877 for (i, frame) in stack.iter().enumerate() {
878 output.push_str(&format!(
879 " #{}: {} in {} ({}:{})\n",
880 i,
881 frame.function_name,
882 frame.component,
883 frame.file.as_ref().unwrap_or(&"unknown".to_string()),
884 frame.line.unwrap_or(0)
885 ));
886 }
887 Ok(output)
888 }
889 } else {
890 Ok("Session not found".to_string())
891 }
892 } else {
893 Ok("No active session".to_string())
894 }
895 }
896
897 fn show_statistics(&self) -> String {
898 let stats = self.debugger.get_debug_statistics();
899 format!(
900 "Statistics:\n Active sessions: {}\n Total events: {}\n Memory usage: {} bytes\n CPU usage: {:.1}%\n Uptime: {:.2}s",
901 stats.active_sessions,
902 stats.total_events,
903 stats.memory_usage,
904 stats.cpu_usage,
905 stats.uptime.as_secs_f64()
906 )
907 }
908
909 fn visualize_pipeline(&self) -> SklResult<String> {
911 if let Some(session_id) = &self.current_session {
912 let sessions = self.debugger.sessions.read().unwrap();
913 if let Some(session) = sessions.get(session_id) {
914 let mut visualization = String::from("Pipeline Visualization:\n");
915
916 if session.execution_history.is_empty() {
918 visualization.push_str(" No execution history available\n");
919 } else {
920 visualization.push_str(" ┌─ Start\n");
921 for (i, step) in session.execution_history.iter().enumerate() {
922 let marker = if i == session.current_step.saturating_sub(1) {
923 "►"
924 } else {
925 " "
926 };
927
928 visualization.push_str(&format!(
929 " │{} Step {}: {} -> {}\n",
930 marker,
931 i + 1,
932 step.component,
933 step.operation
934 ));
935 }
936 visualization.push_str(" └─ End\n");
937 }
938
939 Ok(visualization)
940 } else {
941 Ok("Session not found".to_string())
942 }
943 } else {
944 Ok("No active session".to_string())
945 }
946 }
947
948 fn replay_execution(&self) -> SklResult<String> {
950 if let Some(session_id) = &self.current_session {
951 let mut sessions = self.debugger.sessions.write().unwrap();
952 if let Some(session) = sessions.get_mut(session_id) {
953 session.current_step = 0;
954 session.state = DebugSessionState::Ready;
955 Ok(
956 "Execution replay started. Use 'step' or 'continue' to proceed."
957 .to_string(),
958 )
959 } else {
960 Ok("Session not found".to_string())
961 }
962 } else {
963 Ok("No active session".to_string())
964 }
965 }
966
967 fn show_profiling_data(&self) -> SklResult<String> {
969 if let Some(session_id) = &self.current_session {
970 let sessions = self.debugger.sessions.read().unwrap();
971 if let Some(session) = sessions.get(session_id) {
972 let mut output = String::from("Profiling Data:\n");
973
974 if session.execution_history.is_empty() {
975 output.push_str(" No profiling data available\n");
976 return Ok(output);
977 }
978
979 let total_duration: Duration = session
981 .execution_history
982 .iter()
983 .map(|step| step.duration)
984 .sum();
985
986 let total_memory_delta: i64 = session
987 .execution_history
988 .iter()
989 .map(|step| step.memory_delta)
990 .sum();
991
992 output.push_str(&format!(
993 " Total Duration: {:.2}ms\n",
994 total_duration.as_millis()
995 ));
996 output.push_str(&format!(
997 " Total Memory Delta: {total_memory_delta} bytes\n"
998 ));
999 output.push_str(&format!(
1000 " Average Step Duration: {:.2}ms\n",
1001 total_duration.as_millis() as f64 / session.execution_history.len() as f64
1002 ));
1003
1004 let mut sorted_steps = session.execution_history.clone();
1006 sorted_steps.sort_by(|a, b| b.duration.cmp(&a.duration));
1007
1008 output.push_str("\n Slowest Steps:\n");
1009 for (i, step) in sorted_steps.iter().take(5).enumerate() {
1010 output.push_str(&format!(
1011 " {}. {} -> {} ({:.2}ms)\n",
1012 i + 1,
1013 step.component,
1014 step.operation,
1015 step.duration.as_millis()
1016 ));
1017 }
1018
1019 Ok(output)
1020 } else {
1021 Ok("Session not found".to_string())
1022 }
1023 } else {
1024 Ok("No active session".to_string())
1025 }
1026 }
1027
1028 fn show_execution_timeline(&self) -> SklResult<String> {
1030 if let Some(session_id) = &self.current_session {
1031 let sessions = self.debugger.sessions.read().unwrap();
1032 if let Some(session) = sessions.get(session_id) {
1033 let mut output = String::from("Execution Timeline:\n");
1034
1035 if session.execution_history.is_empty() {
1036 output.push_str(" No execution history available\n");
1037 return Ok(output);
1038 }
1039
1040 let start_time = session.execution_history.first().unwrap().timestamp;
1042
1043 for (i, step) in session.execution_history.iter().enumerate() {
1044 let elapsed = step
1045 .timestamp
1046 .duration_since(start_time)
1047 .unwrap_or(Duration::ZERO);
1048
1049 let marker = if i == session.current_step.saturating_sub(1) {
1050 "►"
1051 } else {
1052 "•"
1053 };
1054
1055 output.push_str(&format!(
1056 " {:>8.2}ms {} {} -> {} ({:.1}ms, {} bytes)\n",
1057 elapsed.as_millis(),
1058 marker,
1059 step.component,
1060 step.operation,
1061 step.duration.as_millis(),
1062 step.memory_delta
1063 ));
1064 }
1065
1066 Ok(output)
1067 } else {
1068 Ok("Session not found".to_string())
1069 }
1070 } else {
1071 Ok("No active session".to_string())
1072 }
1073 }
1074 }
1075
1076 pub struct AdvancedBreakpointCondition {
1078 pub condition_type: AdvancedConditionType,
1079 pub parameters: HashMap<String, String>,
1080 }
1081
1082 #[derive(Debug, Clone)]
1083 pub enum AdvancedConditionType {
1084 MemoryThreshold { threshold_mb: f64 },
1086 DurationThreshold { threshold_ms: f64 },
1088 DataShape { expected_shape: (usize, usize) },
1090 ValuePattern { pattern: String },
1092 ErrorCondition { error_type: String },
1094 PerformanceDegradation { baseline_ratio: f64 },
1096 Custom { expression: String },
1098 }
1099
1100 impl AdvancedBreakpointCondition {
1101 #[must_use]
1102 pub fn matches(&self, step: &ExecutionStep) -> bool {
1103 match &self.condition_type {
1104 AdvancedConditionType::MemoryThreshold { threshold_mb } => {
1105 (step.memory_delta.abs() as f64 / 1024.0 / 1024.0) >= *threshold_mb
1106 }
1107 AdvancedConditionType::DurationThreshold { threshold_ms } => {
1108 step.duration.as_millis() as f64 >= *threshold_ms
1109 }
1110 AdvancedConditionType::DataShape { expected_shape } => {
1111 step.input_shape == Some(*expected_shape)
1112 || step.output_shape == Some(*expected_shape)
1113 }
1114 AdvancedConditionType::ValuePattern { pattern } => {
1115 step.metadata.values().any(|v| v.contains(pattern))
1116 }
1117 AdvancedConditionType::ErrorCondition { error_type } => {
1118 step.metadata.get("error_type") == Some(error_type)
1119 }
1120 AdvancedConditionType::PerformanceDegradation { baseline_ratio } => {
1121 false }
1124 AdvancedConditionType::Custom {
1125 expression: _expression,
1126 } => {
1127 false }
1130 }
1131 }
1132 }
1133
1134 pub struct PipelineReplayManager {
1136 recorded_executions: HashMap<String, Vec<ExecutionStep>>,
1137 current_replay: Option<String>,
1138 replay_position: usize,
1139 }
1140
1141 impl PipelineReplayManager {
1142 #[must_use]
1143 pub fn new() -> Self {
1144 Self {
1145 recorded_executions: HashMap::new(),
1146 current_replay: None,
1147 replay_position: 0,
1148 }
1149 }
1150
1151 pub fn record_execution(&mut self, pipeline_id: String, steps: Vec<ExecutionStep>) {
1153 self.recorded_executions.insert(pipeline_id, steps);
1154 }
1155
1156 pub fn start_replay(&mut self, pipeline_id: &str) -> SklResult<()> {
1158 if self.recorded_executions.contains_key(pipeline_id) {
1159 self.current_replay = Some(pipeline_id.to_string());
1160 self.replay_position = 0;
1161 Ok(())
1162 } else {
1163 Err(SklearsError::InvalidState(format!(
1164 "No recording found for pipeline: {pipeline_id}"
1165 )))
1166 }
1167 }
1168
1169 pub fn next_replay_step(&mut self) -> Option<&ExecutionStep> {
1171 if let Some(pipeline_id) = &self.current_replay {
1172 if let Some(steps) = self.recorded_executions.get(pipeline_id) {
1173 if self.replay_position < steps.len() {
1174 let step = &steps[self.replay_position];
1175 self.replay_position += 1;
1176 return Some(step);
1177 }
1178 }
1179 }
1180 None
1181 }
1182
1183 pub fn seek_replay(&mut self, position: usize) -> SklResult<()> {
1185 if let Some(pipeline_id) = &self.current_replay {
1186 if let Some(steps) = self.recorded_executions.get(pipeline_id) {
1187 if position < steps.len() {
1188 self.replay_position = position;
1189 return Ok(());
1190 }
1191 }
1192 }
1193 Err(SklearsError::InvalidState(
1194 "Invalid replay position".to_string(),
1195 ))
1196 }
1197
1198 #[must_use]
1200 pub fn get_replay_stats(&self) -> Option<ReplayStatistics> {
1201 if let Some(pipeline_id) = &self.current_replay {
1202 if let Some(steps) = self.recorded_executions.get(pipeline_id) {
1203 return Some(ReplayStatistics {
1204 total_steps: steps.len(),
1205 current_position: self.replay_position,
1206 total_duration: steps.iter().map(|s| s.duration).sum(),
1207 total_memory_usage: steps.iter().map(|s| s.memory_delta).sum(),
1208 });
1209 }
1210 }
1211 None
1212 }
1213 }
1214
1215 #[derive(Debug, Clone)]
1217 pub struct ReplayStatistics {
1218 pub total_steps: usize,
1219 pub current_position: usize,
1220 pub total_duration: Duration,
1221 pub total_memory_usage: i64,
1222 }
1223
1224 impl Default for PipelineReplayManager {
1225 fn default() -> Self {
1226 Self::new()
1227 }
1228 }
1229
1230 pub struct BottleneckDetector {
1232 execution_profiles: HashMap<String, Vec<ExecutionStep>>,
1233 performance_baselines: HashMap<String, PerformanceBaseline>,
1234 }
1235
1236 impl BottleneckDetector {
1237 #[must_use]
1238 pub fn new() -> Self {
1239 Self {
1240 execution_profiles: HashMap::new(),
1241 performance_baselines: HashMap::new(),
1242 }
1243 }
1244
1245 pub fn add_profile(&mut self, pipeline_id: String, steps: Vec<ExecutionStep>) {
1247 self.execution_profiles.insert(pipeline_id, steps);
1248 }
1249
1250 #[must_use]
1252 pub fn detect_bottlenecks(&self, pipeline_id: &str) -> Vec<BottleneckReport> {
1253 let mut bottlenecks = Vec::new();
1254
1255 if let Some(steps) = self.execution_profiles.get(pipeline_id) {
1256 let total_duration: Duration = steps.iter().map(|s| s.duration).sum();
1258 let average_duration = total_duration / steps.len() as u32;
1259
1260 for step in steps {
1261 if step.duration > average_duration * 3 {
1262 bottlenecks.push(BottleneckReport {
1263 bottleneck_type: BottleneckType::Performance,
1264 component: step.component.clone(),
1265 severity: if step.duration > average_duration * 5 {
1266 BottleneckSeverity::Critical
1267 } else {
1268 BottleneckSeverity::Major
1269 },
1270 description: format!(
1271 "Step duration ({:.2}ms) significantly exceeds average ({:.2}ms)",
1272 step.duration.as_millis(),
1273 average_duration.as_millis()
1274 ),
1275 suggested_actions: vec![
1276 "Profile individual operations within this component".to_string(),
1277 "Consider algorithm optimization".to_string(),
1278 "Check for inefficient data structures".to_string(),
1279 ],
1280 });
1281 }
1282 }
1283
1284 for step in steps {
1286 if step.memory_delta.abs() > 100 * 1024 * 1024 {
1287 bottlenecks.push(BottleneckReport {
1289 bottleneck_type: BottleneckType::Memory,
1290 component: step.component.clone(),
1291 severity: if step.memory_delta.abs() > 500 * 1024 * 1024 {
1292 BottleneckSeverity::Critical
1293 } else {
1294 BottleneckSeverity::Major
1295 },
1296 description: format!(
1297 "Large memory allocation/deallocation: {} MB",
1298 step.memory_delta / 1024 / 1024
1299 ),
1300 suggested_actions: vec![
1301 "Review memory allocation patterns".to_string(),
1302 "Consider streaming or chunked processing".to_string(),
1303 "Implement memory pooling".to_string(),
1304 ],
1305 });
1306 }
1307 }
1308 }
1309
1310 bottlenecks
1311 }
1312
1313 pub fn set_baseline(&mut self, pipeline_id: String, baseline: PerformanceBaseline) {
1315 self.performance_baselines.insert(pipeline_id, baseline);
1316 }
1317 }
1318
1319 #[derive(Debug, Clone)]
1321 pub struct BottleneckReport {
1322 pub bottleneck_type: BottleneckType,
1323 pub component: String,
1324 pub severity: BottleneckSeverity,
1325 pub description: String,
1326 pub suggested_actions: Vec<String>,
1327 }
1328
1329 #[derive(Debug, Clone, PartialEq)]
1331 pub enum BottleneckType {
1332 Performance,
1334 Memory,
1336 IO,
1338 Synchronization,
1340 }
1341
1342 #[derive(Debug, Clone, PartialEq, PartialOrd)]
1344 pub enum BottleneckSeverity {
1345 Minor,
1347 Major,
1349 Critical,
1351 }
1352
1353 #[derive(Debug, Clone)]
1355 pub struct PerformanceBaseline {
1356 pub average_duration: Duration,
1357 pub average_memory_usage: i64,
1358 pub step_count: usize,
1359 pub recorded_at: SystemTime,
1360 }
1361
1362 impl Default for BottleneckDetector {
1363 fn default() -> Self {
1364 Self::new()
1365 }
1366 }
1367}
1368
1369#[allow(non_snake_case)]
1370#[cfg(test)]
1371mod tests {
1372 use super::*;
1373
1374 #[test]
1375 fn test_debug_session_creation() {
1376 let config = DebugConfig::default();
1377 let debugger = AdvancedPipelineDebugger::new(config);
1378
1379 let handle = debugger
1380 .start_session("test_session".to_string(), "test_pipeline".to_string())
1381 .unwrap();
1382
1383 assert_eq!(debugger.list_sessions().len(), 1);
1384 assert!(debugger.get_session("test_session").is_some());
1385 }
1386
1387 #[test]
1388 fn test_breakpoint_creation() {
1389 let breakpoint = Breakpoint::new(
1390 "test_bp".to_string(),
1391 BreakpointCondition::ComponentName("TestComponent".to_string()),
1392 );
1393
1394 assert_eq!(breakpoint.id, "test_bp");
1395 assert!(breakpoint.enabled);
1396 assert_eq!(breakpoint.hit_count, 0);
1397 }
1398
1399 #[test]
1400 fn test_watch_expression() {
1401 let watch = WatchExpression {
1402 id: "test_watch".to_string(),
1403 expression: "x + y".to_string(),
1404 description: "Sum of x and y".to_string(),
1405 enabled: true,
1406 };
1407
1408 assert_eq!(watch.expression, "x + y");
1409 assert!(watch.enabled);
1410 }
1411
1412 #[test]
1413 fn test_variable_inspector() {
1414 let mut inspector = VariableInspector::new();
1415
1416 inspector.set_variable("test_var".to_string(), VariableValue::Scalar(42.0));
1417
1418 let value = inspector.get_variable("test_var").unwrap();
1419 match value {
1420 VariableValue::Scalar(v) => assert_eq!(*v, 42.0),
1421 _ => panic!("Expected scalar value"),
1422 }
1423 }
1424}