use std::collections::HashSet;
use std::fmt;
use std::fmt::Write;
use log::error;
use serde_json::Value;
use flowcore::model::output_connection::Source::{Input, Output};
use crate::block::Block;
use crate::debug_command::DebugCommand;
use crate::debug_command::DebugCommand::{Ack, Breakpoint, Continue, DebugClientStarting, Delete,
Error, ExitDebugger, Inspect, InspectBlock, InspectFunction, InspectInput,
InspectOutput, Invalid, List, RunReset, Step, Validate
};
use crate::job::Job;
use crate::param::Param;
use crate::run_state::RunState;
use crate::server::DebugServer;
pub struct Debugger<'a> {
debug_server: &'a mut dyn DebugServer,
input_breakpoints: HashSet<(usize, usize)>,
block_breakpoints: HashSet<(usize, usize)>,
output_breakpoints: HashSet<(usize, String)>,
break_at_job: usize,
function_breakpoints: HashSet<usize>,
}
#[derive(Debug, Clone)]
enum BlockType {
OutputBlocked,
UnreadySender, }
#[derive(Debug, Clone)]
struct BlockerNode {
function_id: usize,
block_type: BlockType,
blockers: Vec<BlockerNode>,
}
impl BlockerNode {
fn new(process_id: usize, block_type: BlockType) -> Self {
BlockerNode {
function_id: process_id,
block_type,
blockers: vec![],
}
}
}
impl fmt::Display for BlockerNode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.block_type {
BlockType::OutputBlocked => write!(f, " -> #{}", self.function_id),
BlockType::UnreadySender => write!(f, " <- #{}", self.function_id),
}
}
}
impl<'a> Debugger<'a> {
pub fn new(
debug_server: &'a mut dyn DebugServer,
) -> Self {
Debugger {
debug_server,
input_breakpoints: HashSet::<(usize, usize)>::new(),
block_breakpoints: HashSet::<(usize, usize)>::new(),
output_breakpoints: HashSet::<(usize, String)>::new(),
break_at_job: usize::MAX,
function_breakpoints: HashSet::<usize>::new(),
}
}
pub fn start(&mut self) {
self.debug_server.start();
}
pub fn check_prior_to_job(
&mut self,
state: &RunState,
job: &Job,
) -> (bool, bool, bool) {
if self.break_at_job == job.job_id || self.function_breakpoints.contains(&job.function_id) {
self.debug_server.job_breakpoint(job, state.get_function(job.function_id),
state.get_function_states(job.function_id));
return self.wait_for_command(state);
}
(false, false, false)
}
pub fn check_on_block_creation(
&mut self,
state: &RunState,
block: &Block,
) -> (bool, bool, bool) {
if self
.block_breakpoints
.contains(&(block.blocked_function_id, block.blocking_function_id))
{
self.debug_server.block_breakpoint(block);
return self.wait_for_command(state);
}
(false, false, false)
}
pub fn check_prior_to_send(
&mut self,
state: &RunState,
source_function_id: usize,
output_route: &str,
value: &Value,
destination_id: usize,
input_number: usize,
) -> (bool, bool, bool) {
if self
.output_breakpoints
.contains(&(source_function_id, output_route.to_string()))
|| self
.input_breakpoints
.contains(&(destination_id, input_number))
{
let source_function = state.get_function(source_function_id);
let destination_function = state.get_function(destination_id);
let io_name = destination_function.input(input_number).name();
self.debug_server.send_breakpoint(source_function.name(), source_function_id, output_route, value,
destination_id, destination_function.name(),
io_name, input_number);
return self.wait_for_command(state);
}
(false, false, false)
}
pub fn job_error(&mut self, state: &RunState, job: &Job) -> (bool, bool, bool) {
self.debug_server.job_error(job);
self.wait_for_command(state)
}
pub fn job_completed(&mut self, state: &RunState, job: &Job) -> (bool, bool, bool) {
if job.result.is_err() {
if state.debug {
let _ = self.job_error(state, job);
}
} else {
self.debug_server.job_completed(job);
}
(false, false, false)
}
pub fn panic(&mut self, state: &RunState, error_message: String) -> (bool, bool, bool) {
self.debug_server.panic(state, error_message);
self.wait_for_command(state)
}
pub fn execution_ended(&mut self, state: &RunState) -> (bool, bool, bool) {
self.debug_server.execution_ended();
self.deadlock_check(state);
self.wait_for_command(state)
}
pub fn wait_for_command(&mut self, state: &RunState) -> (bool, bool, bool) {
loop {
match self.debug_server.get_command(state)
{
Ok(Breakpoint(param)) => {
let message = self.add_breakpoint(state, param);
self.debug_server.message(message);
},
Ok(Delete(param)) => {
let message = self.delete_breakpoint(param);
self.debug_server.message(message);
},
Ok(Validate) => {
let message = self.validate(state);
self.debug_server.message(message);
},
Ok(List) => {
let message = self.list_breakpoints();
self.debug_server.message(message);
},
Ok(DebugCommand::FunctionList) => {
self.debug_server.function_list(state.get_functions());
},
Ok(Inspect) => self.debug_server.run_state(state),
Ok(InspectFunction(function_id)) => {
if function_id < state.num_functions() {
self.debug_server.function_states(state.get_function(function_id).clone(),
state.get_function_states(function_id));
} else {
self.debug_server.debugger_error(format!("No function with id = {}", function_id));
};
}
Ok(InspectInput(function_id, input_number)) => {
if function_id < state.num_functions() {
let function = state.get_function(function_id);
if input_number < function.inputs().len() {
self.debug_server.input(function.input(input_number).clone());
} else {
self.debug_server.debugger_error(format!(
"Function #{} has no input number {}", function_id, input_number
));
}
} else {
self.debug_server.debugger_error(format!("No function with id = {}", function_id));
};
}
Ok(InspectOutput(function_id, sub_route)) => {
if function_id < state.num_functions() {
let function = state.get_function(function_id);
let mut output_connections = vec![];
for output_connection in function.get_output_connections() {
match &output_connection.source {
Output(source_route) => {
if *source_route == sub_route {
output_connections.push(output_connection.clone())
}
}
Input(_) => {
if sub_route.is_empty() {
output_connections.push(output_connection.clone())
}
}
}
}
self.debug_server.outputs(output_connections);
} else {
self.debug_server.debugger_error(format!("No function with id = {}", function_id));
};
}
Ok(InspectBlock(from_function_id, to_function_id)) => {
let blocks = Self::inspect_blocks(state, from_function_id, to_function_id);
self.debug_server.blocks(blocks);
}
Ok(ExitDebugger) => {
self.debug_server.debugger_exiting();
return (false, false, true);
}
Ok(Continue) => {
if state.get_number_of_jobs_created() > 0 {
return (false, false, false);
}
}
Ok(RunReset) => {
return if state.get_number_of_jobs_created() > 0 {
self.reset();
self.debug_server.debugger_resetting();
(false, true, false)
} else {
self.debug_server.execution_starting();
(false, false, false)
}
}
Ok(Step(param)) => {
self.step(state, param);
return (true, false, false);
}
Ok(Ack) => {}
Ok(DebugClientStarting) => { error!("Unexpected message 'DebugClientStarting' after started")
}
Ok(Error(_)) => { }
Ok(Invalid) => {}
Err(e) => error!("Error in Debug server getting command; {}", e),
};
}
}
fn inspect_blocks(
run_state: &RunState,
from: Option<usize>,
to: Option<usize>,
) -> Vec<Block> {
let mut matching_blocks = vec![];
for block in run_state.get_blocks() {
if (from.is_none() || from == Some(block.blocked_function_id))
&& (to.is_none() || to == Some(block.blocking_function_id))
{
matching_blocks.push(block.clone());
}
}
matching_blocks
}
fn add_breakpoint(&mut self, state: &RunState, param: Option<Param>) -> String {
let mut response = String::new();
match param {
None => response.push_str("'break' command must specify a breakpoint\n"),
Some(Param::Numeric(process_id)) => {
if process_id > state.num_functions() {
let _ = writeln!(response,
"There is no Function with id '{}' to set a breakpoint on",
process_id
);
} else {
self.function_breakpoints.insert(process_id);
let function = state.get_function(process_id);
let _ = writeln!(response,
"Breakpoint set on Function #{} ({}) @ '{}'",
process_id, function.name(), function.route()
);
}
}
Some(Param::Input((destination_id, input_number))) => {
let function = state.get_function(destination_id);
let io_name = function.input(input_number).name();
let _ = writeln!(response,
"Data breakpoint set on Function #{}:{} '{}' receiving data on input '{}'",
destination_id, input_number, function.name(), io_name);
self.input_breakpoints
.insert((destination_id, input_number));
}
Some(Param::Block((Some(blocked_id), Some(blocking_id)))) => {
let _ = writeln!(response,
"Block breakpoint set on Function #{} being blocked by Function #{}",
blocked_id, blocking_id
);
self.block_breakpoints.insert((blocked_id, blocking_id));
}
Some(Param::Block(_)) => {
response.push_str("Invalid format to set a breakpoint on a block\n");
}
Some(Param::Output((source_id, source_output_route))) => {
let _ = writeln!(response,
"Data breakpoint set on Function #{} sending data via output: '{}'",
source_id, source_output_route
);
self.output_breakpoints
.insert((source_id, source_output_route));
}
Some(Param::Wildcard) => {
response.push_str(
"To break on every Function, you can just single step using 's' command\n",
);
}
}
response
}
fn delete_breakpoint(&mut self, param: Option<Param>) -> String {
let mut response = String::new();
match param {
None => response.push_str("No process id specified\n"),
Some(Param::Numeric(process_number)) => {
if self.function_breakpoints.remove(&process_number) {
let _ = writeln!(response,
"Breakpoint on process #{} was deleted",
process_number
);
} else {
response.push_str("No breakpoint number '{}' exists\n");
}
}
Some(Param::Input((destination_id, input_number))) => {
self.input_breakpoints
.remove(&(destination_id, input_number));
response.push_str("Inputs breakpoint removed\n");
}
Some(Param::Block((Some(blocked_id), Some(blocking_id)))) => {
self.input_breakpoints.remove(&(blocked_id, blocking_id));
response.push_str("Inputs breakpoint removed\n");
}
Some(Param::Block(_)) => {
response.push_str("Invalid format to remove breakpoint\n");
}
Some(Param::Output((source_id, source_output_route))) => {
self.output_breakpoints
.remove(&(source_id, source_output_route));
response.push_str("Output breakpoint removed\n");
}
Some(Param::Wildcard) => {
self.output_breakpoints.clear();
self.input_breakpoints.clear();
self.function_breakpoints.clear();
response.push_str("Deleted all breakpoints\n");
}
}
response
}
fn list_breakpoints(&self) -> String {
let mut response = String::new();
let mut breakpoints = false;
if !self.function_breakpoints.is_empty() {
breakpoints = true;
response.push_str("Function Breakpoints: \n");
for process_id in &self.function_breakpoints {
let _ = writeln!(response, "\tFunction #{}", process_id);
}
}
if !self.output_breakpoints.is_empty() {
breakpoints = true;
response.push_str("Output Breakpoints: \n");
for (process_id, route) in &self.output_breakpoints {
let _ = writeln!(response, "\tOutput #{}/{}", process_id, route);
}
}
if !self.input_breakpoints.is_empty() {
breakpoints = true;
response.push_str("Input Breakpoints: \n");
for (process_id, input_number) in &self.input_breakpoints {
let _ = writeln!(response, "\tInput #{}:{}", process_id, input_number);
}
}
if !self.block_breakpoints.is_empty() {
breakpoints = true;
response.push_str("Block Breakpoints: \n");
for (blocked_id, blocking_id) in &self.block_breakpoints {
let _ = writeln!(response, "\tBlock #{}->#{}", blocked_id, blocking_id);
}
}
if !breakpoints {
response.push_str(
"No Breakpoints set. Use the 'b' command to set a breakpoint. Use 'h' for help.\n",
);
}
response
}
fn validate(&self, state: &RunState) -> String {
let mut response = String::new();
response.push_str("Validating flow state\n");
response.push_str("Running deadlock check\n");
response.push_str(&self.deadlock_check(state));
response
}
fn reset(&mut self) {
self.break_at_job = usize::MAX;
}
fn step(&mut self, state: &RunState, steps: Option<Param>) {
match steps {
None => {
self.break_at_job = state.get_number_of_jobs_created() + 1;
}
Some(Param::Numeric(steps)) => {
self.break_at_job = state.get_number_of_jobs_created() + steps;
}
_ => self.debug_server.debugger_error(
"Did not understand step command parameter\n".into()),
}
}
fn find_blockers(&self, state: &RunState, process_id: usize) -> Vec<BlockerNode> {
let mut blockers: Vec<BlockerNode> = state
.get_output_blockers(process_id)
.iter()
.map(|(id, _)| BlockerNode::new(*id, BlockType::OutputBlocked))
.collect();
let input_blockers: Vec<BlockerNode> = state
.get_input_blockers(process_id)
.iter()
.map(|(id, _)| BlockerNode::new(*id, BlockType::UnreadySender))
.collect();
blockers.extend(input_blockers);
blockers
}
fn traverse_blocker_tree(
&self,
state: &RunState,
visited_nodes: &mut Vec<usize>,
root_node_id: usize,
node: &mut BlockerNode,
) -> Vec<BlockerNode> {
visited_nodes.push(node.function_id);
node.blockers = self.find_blockers(state, node.function_id);
for blocker in &mut node.blockers {
if blocker.function_id == root_node_id {
return vec![blocker.clone()]; }
if !visited_nodes.contains(&blocker.function_id) {
let mut blocker_subtree =
self.traverse_blocker_tree(state, visited_nodes, root_node_id, blocker);
if !blocker_subtree.is_empty() {
blocker_subtree.insert(0, blocker.clone());
return blocker_subtree;
}
}
}
vec![]
}
fn display_set(root_node: &BlockerNode, node_set: Vec<BlockerNode>) -> String {
let mut display_string = String::new();
let _ = write!(display_string, "#{}", root_node.function_id);
for node in node_set {
let _ = write!(display_string, "{node}");
}
display_string
}
fn deadlock_check(&self, state: &RunState) -> String {
let mut response = String::new();
for blocked_process_id in state.get_blocked() {
let mut root_node = BlockerNode::new(*blocked_process_id, BlockType::OutputBlocked);
let mut visited_nodes = vec![];
let deadlock_set = self.traverse_blocker_tree(
state,
&mut visited_nodes,
*blocked_process_id,
&mut root_node,
);
if !deadlock_set.is_empty() {
let _ = writeln!(response, "{}", Self::display_set(&root_node, deadlock_set));
}
}
response
}
}