use crate::analysis::pcode_store::PcodeStore;
use crate::modeling::machine::cpu::concrete::ConcretePcodeAddress;
use jingle_sleigh::{PcodeOperation, SleighArchInfo};
pub use model::{CfgState, CfgStateModel, ModelTransition};
use petgraph::Direction;
use petgraph::graph::NodeIndex;
use petgraph::prelude::StableDiGraph;
use petgraph::visit::EdgeRef;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter, LowerHex};
use std::rc::Rc;
pub(crate) mod model;
#[derive(Debug, Default, Copy, Clone, Hash)]
pub struct EmptyEdge;
impl Display for EmptyEdge {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl LowerHex for EmptyEdge {
fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[derive(Debug)]
pub struct PcodeCfg<N: CfgState = ConcretePcodeAddress, D = PcodeOperation> {
pub(crate) graph: StableDiGraph<N, EmptyEdge>,
pub(crate) ops: HashMap<N, D>,
pub(crate) indices: HashMap<N, NodeIndex>,
}
#[derive(Debug)]
pub struct ModeledPcodeCfg<N: CfgState = ConcretePcodeAddress, D = PcodeOperation> {
pub(crate) cfg: PcodeCfg<N, D>,
#[allow(unused)]
pub(crate) info: SleighArchInfo,
pub(crate) models: HashMap<N, N::Model>,
}
#[derive(Clone)]
pub struct PcodeCfgVisitor<'a, N: CfgState = ConcretePcodeAddress, D = PcodeOperation> {
cfg: &'a ModeledPcodeCfg<N, D>,
location: N,
pub(crate) visited_locations: Rc<RefCell<HashSet<N>>>,
}
impl<'a, N: CfgState, D: ModelTransition<N::Model>> PcodeCfgVisitor<'a, N, D> {
pub(crate) fn successors(&mut self) -> impl Iterator<Item = Self> {
self.cfg
.cfg
.successors(&self.location)
.into_iter()
.flatten()
.flat_map(|n| {
let is_repeat = {
let mut set = self.visited_locations.borrow_mut();
if set.contains(n) {
true
} else {
set.insert(n.clone());
false
}
};
if is_repeat {
None
} else {
Some(Self {
cfg: self.cfg,
location: n.clone(),
visited_locations: self.visited_locations.clone(),
})
}
})
}
pub(crate) fn transition(&self) -> Option<&D> {
self.cfg.cfg.ops.get(&self.location)
}
pub fn location(&self) -> &N {
&self.location
}
pub fn state(&self) -> Option<&N::Model> {
self.cfg.models.get(&self.location)
}
}
impl<N: CfgState, D: ModelTransition<N::Model>> Default for PcodeCfg<N, D> {
fn default() -> Self {
Self::new()
}
}
impl<N: CfgState, D: ModelTransition<N::Model>> PcodeCfg<N, D> {
pub fn new() -> Self {
Self {
graph: Default::default(),
ops: Default::default(),
indices: Default::default(),
}
}
pub fn graph(&self) -> &StableDiGraph<N, EmptyEdge> {
&self.graph
}
pub fn nodes(&self) -> impl Iterator<Item = &N> {
self.indices.keys()
}
pub fn has_node<T: Borrow<N>>(&self, node: T) -> bool {
self.indices.contains_key(node.borrow())
}
pub fn get_op_at<T: Borrow<N>>(&self, addr: T) -> Option<&D> {
self.ops.get(addr.borrow())
}
pub fn add_node<T: Borrow<N>>(&mut self, node: T) {
let node = node.borrow();
if !self.indices.contains_key(node) {
let idx = self.graph.add_node(node.clone());
self.indices.insert(node.clone(), idx);
}
}
pub fn replace_and_combine_nodes<T: Borrow<N>, S: Borrow<N>>(
&mut self,
old_weight: T,
new_weight: S,
) {
let old_idx = self.indices.get(old_weight.borrow()).copied();
let new_idx = self.indices.get(new_weight.borrow()).copied();
tracing::debug!(
"replace_and_combine_nodes called: old_idx={:?}, new_idx={:?}",
old_idx,
new_idx
);
if let (Some(old_idx), Some(new_idx)) = (old_idx, new_idx) {
if old_idx == new_idx {
tracing::debug!("Indices are identical, skipping merge");
return;
}
tracing::debug!("Both nodes found with different indices, proceeding with merge");
let incoming: Vec<_> = self
.graph
.edges_directed(new_idx, Direction::Incoming)
.map(|edge| edge.source())
.collect();
for source in incoming {
if !self.graph.contains_edge(source, old_idx) {
self.graph.add_edge(source, old_idx, EmptyEdge);
}
}
let outgoing: Vec<_> = self
.graph
.edges_directed(new_idx, Direction::Outgoing)
.map(|edge| edge.target())
.collect();
for target in &outgoing {
if !self.graph.contains_edge(old_idx, *target) {
self.graph.add_edge(old_idx, *target, EmptyEdge);
}
}
self.graph.remove_node(new_idx);
if let Some(node_weight) = self.graph.node_weight_mut(old_idx) {
*node_weight = new_weight.borrow().clone();
}
self.indices.insert(new_weight.borrow().clone(), old_idx);
self.indices.remove(old_weight.borrow());
let op_to_keep = self
.ops
.get(new_weight.borrow())
.or_else(|| self.ops.get(old_weight.borrow()))
.cloned();
self.ops.remove(old_weight.borrow());
self.ops.remove(new_weight.borrow());
if let Some(op) = op_to_keep {
self.ops.insert(new_weight.borrow().clone(), op);
} else {
dbg!("Missing op!");
}
}
}
pub fn add_edge<A, B, C>(&mut self, from: A, to: B, op: C)
where
A: Borrow<N>,
B: Borrow<N>,
C: Borrow<D>,
{
let from = from.borrow();
let to = to.borrow();
let op = op.borrow();
self.add_node(from);
self.add_node(to);
self.ops.insert(from.clone(), op.clone());
let from_idx = *self.indices.get(from).unwrap();
let to_idx = *self.indices.get(to).unwrap();
if self.graph.find_edge(from_idx, to_idx).is_some() {
return;
}
self.graph.add_edge(from_idx, to_idx, EmptyEdge);
}
pub fn successors<T: Borrow<N>>(&self, node: T) -> Option<Vec<&N>> {
let n = node.borrow();
let idx = *self.indices.get(n)?;
let succs: Vec<&N> = self
.graph
.edges_directed(idx, Direction::Outgoing)
.map(|e| self.graph.node_weight(e.target()).unwrap())
.collect();
Some(succs)
}
pub fn predecessors<T: Borrow<N>>(&self, node: T) -> Option<Vec<&N>> {
let n = node.borrow();
let idx = *self.indices.get(n)?;
let preds: Vec<&N> = self
.graph
.edges_directed(idx, Direction::Incoming)
.map(|e| self.graph.node_weight(e.source()).unwrap())
.collect();
Some(preds)
}
pub fn leaf_nodes(&self) -> impl Iterator<Item = &N> {
self.graph
.externals(Direction::Outgoing)
.map(move |idx| self.graph.node_weight(idx).unwrap())
}
pub fn edge_weights(&self) -> impl Iterator<Item = &D> {
self.ops.values()
}
pub fn nodes_for_location<S: PartialEq<N>>(&self, location: S) -> impl Iterator<Item = &N> {
self.nodes().filter(move |a| location == **a)
}
pub fn smt_model(self, info: SleighArchInfo) -> ModeledPcodeCfg<N, D> {
ModeledPcodeCfg::new(self, info)
}
}
impl PcodeStore for PcodeCfg<ConcretePcodeAddress, PcodeOperation> {
fn get_pcode_op_at<'a, T: Borrow<ConcretePcodeAddress>>(
&'a self,
addr: T,
) -> Option<crate::analysis::pcode_store::PcodeOpRef<'a>> {
let addr = *addr.borrow();
self.get_op_at(addr)
.map(crate::analysis::pcode_store::PcodeOpRef::from)
}
}
impl<N: CfgState> PcodeCfg<N, PcodeOperation> {
pub fn basic_blocks(&self) -> PcodeCfg<N, Vec<PcodeOperation>> {
use petgraph::visit::EdgeRef;
let mut graph = StableDiGraph::<N, EmptyEdge>::default();
let mut ops: HashMap<N, Vec<PcodeOperation>> = HashMap::new();
let mut indices: HashMap<N, NodeIndex> = HashMap::new();
for node in self.graph.node_indices() {
let n = self.graph.node_weight(node).unwrap().clone();
let op = self
.ops
.get(&n)
.map(|op| vec![op.clone()])
.unwrap_or_default();
let idx = graph.add_node(n.clone());
graph.add_node(n.clone());
indices.insert(n.clone(), idx);
ops.insert(n, op);
}
for edge in self.graph.edge_indices() {
let (fromidx, toidx) = self.graph.edge_endpoints(edge).unwrap();
let from = self.graph.node_weight(fromidx).unwrap();
let to = self.graph.node_weight(toidx).unwrap();
let from_idx = *indices.get(from).unwrap();
let to_idx = *indices.get(to).unwrap();
graph.add_edge(from_idx, to_idx, EmptyEdge);
}
let mut changed = true;
while changed {
changed = false;
for node in graph.node_indices() {
let out_edges: Vec<_> = graph.edges_directed(node, Direction::Outgoing).collect();
if out_edges.len() != 1 {
continue;
}
let edge = out_edges[0];
let target = edge.target();
let in_edges: Vec<_> = graph.edges_directed(target, Direction::Incoming).collect();
if in_edges.len() != 1 {
continue;
}
let src_n = graph.node_weight(node).unwrap().clone();
let tgt_n = graph.node_weight(target).unwrap().clone();
let tgt_ops = ops.get(&tgt_n).cloned().unwrap_or_default();
if !tgt_ops.is_empty() {
ops.entry(src_n.clone()).or_default().extend(tgt_ops);
}
let tgt_out_edges: Vec<_> = graph
.edges_directed(target, Direction::Outgoing)
.map(|e| e.target())
.collect();
for tgt_out in tgt_out_edges {
graph.add_edge(node, tgt_out, EmptyEdge);
}
graph.remove_node(target);
ops.remove(&tgt_n);
indices.remove(&tgt_n);
changed = true;
break; }
}
let mut new_graph = StableDiGraph::<N, EmptyEdge>::default();
let mut new_ops: HashMap<N, Vec<PcodeOperation>> = HashMap::new();
let mut new_indices: HashMap<N, NodeIndex> = HashMap::new();
let connected_nodes: Vec<_> = graph
.node_indices()
.filter(|&node| {
graph
.edges_directed(node, Direction::Incoming)
.next()
.is_some()
|| graph
.edges_directed(node, Direction::Outgoing)
.next()
.is_some()
})
.collect();
for node in connected_nodes.iter() {
let n = graph.node_weight(*node).unwrap().clone();
let idx = new_graph.add_node(n.clone());
new_indices.insert(n.clone(), idx);
if let Some(op) = ops.get(&n) {
new_ops.insert(n, op.clone());
}
}
for edge in graph.edge_indices() {
let (fromidx, toidx) = graph.edge_endpoints(edge).unwrap();
if connected_nodes.contains(&fromidx) && connected_nodes.contains(&toidx) {
let from = graph.node_weight(fromidx).unwrap();
let to = graph.node_weight(toidx).unwrap();
let from_idx = *new_indices.get(from).unwrap();
let to_idx = *new_indices.get(to).unwrap();
new_graph.add_edge(from_idx, to_idx, EmptyEdge);
}
}
PcodeCfg {
graph: new_graph,
ops: new_ops,
indices: new_indices,
}
}
}
impl<N: CfgState, D: ModelTransition<N::Model>> ModeledPcodeCfg<N, D> {
pub fn new(cfg: PcodeCfg<N, D>, info: SleighArchInfo) -> Self {
let mut models = HashMap::new();
for node in cfg.nodes() {
let model = node.new_const(&info);
models.insert(node.clone(), model);
}
Self { cfg, models, info }
}
pub fn cfg(&self) -> &PcodeCfg<N, D> {
&self.cfg
}
pub fn models(&self) -> &HashMap<N, N::Model> {
&self.models
}
pub fn graph(&self) -> &StableDiGraph<N, EmptyEdge> {
self.cfg.graph()
}
pub fn nodes(&self) -> impl Iterator<Item = &N> {
self.cfg.nodes()
}
pub fn leaf_nodes(&self) -> impl Iterator<Item = &N> {
self.cfg.leaf_nodes()
}
pub fn edge_weights(&self) -> impl Iterator<Item = &D> {
self.cfg.edge_weights()
}
pub fn nodes_for_location<S: PartialEq<N>>(&self, location: S) -> impl Iterator<Item = &N> {
self.cfg.nodes_for_location(location)
}
}