use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Once;
use libsla_sys::cxx::{CxxVector, UniquePtr, let_cxx_string};
use crate::opcodes::OpCode;
use libsla_sys::api;
use libsla_sys::rust;
use libsla_sys::sys;
static LIBSLA_INIT: Once = Once::new();
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("input invalid: {message}")]
InvalidInput { message: Cow<'static, str> },
#[error("insufficient data at varnode {0}")]
InsufficientData(VarnodeData),
#[error("dependency error: {message} caused by {source}")]
DependencyError {
message: Cow<'static, str>,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("internal error: {0}")]
InternalError(String),
}
pub type Result<T> = std::result::Result<T, Error>;
pub trait Sleigh {
#[must_use]
fn default_code_space(&self) -> AddressSpace;
#[must_use]
fn address_spaces(&self) -> Vec<AddressSpace>;
#[must_use]
fn address_space_by_name(&self, name: impl AsRef<str>) -> Option<AddressSpace> {
let name = name.as_ref();
self.address_spaces()
.into_iter()
.find(|addr_space| addr_space.name == name)
}
fn register_from_name(&self, name: impl AsRef<str>) -> Result<VarnodeData>;
fn disassemble_pcode(
&self,
loader: &dyn InstructionLoader,
address: Address,
) -> Result<PcodeDisassembly>;
fn disassemble_native(
&self,
loader: &dyn InstructionLoader,
address: Address,
) -> Result<NativeDisassembly>;
fn register_name(&self, target: &VarnodeData) -> Option<String>;
fn register_name_map(&self) -> BTreeMap<VarnodeData, String>;
}
#[derive(Ord, PartialOrd, PartialEq, Eq, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Address {
pub offset: u64,
pub address_space: AddressSpace,
}
impl Address {
pub fn new(address_space: AddressSpace, offset: u64) -> Self {
Self {
address_space,
offset,
}
}
}
impl std::fmt::Debug for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Address")
.field("offset", &format!("{offset:#016x}", offset = &self.offset))
.field("address_space", &self.address_space)
.finish()
}
}
impl std::fmt::Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{:0width$x}",
self.address_space,
self.offset,
width = 2 * self.address_space.address_size
)
}
}
impl From<&sys::Address> for Address {
fn from(address: &sys::Address) -> Self {
Self {
offset: address.offset(),
address_space: unsafe { &*address.address_space() }.into(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct VarnodeData {
pub address: Address,
pub size: usize,
}
impl Ord for VarnodeData {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.address.cmp(&other.address) {
std::cmp::Ordering::Equal => (),
ord => return ord,
}
other.size.cmp(&self.size)
}
}
impl PartialOrd for VarnodeData {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl std::fmt::Display for VarnodeData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}]#{}", self.address, self.size)
}
}
impl VarnodeData {
pub fn new(address: Address, size: usize) -> Self {
Self { address, size }
}
pub fn range(&self) -> std::ops::Range<u64> {
let offset = self.address.offset * self.address.address_space.word_size as u64;
let size: u64 = self
.size
.try_into()
.unwrap_or_else(|err| panic!("invalid varnode size {size}: {err}", size = self.size));
offset..offset + size
}
}
impl From<&sys::VarnodeData> for VarnodeData {
fn from(varnode: &sys::VarnodeData) -> Self {
let size = sys::varnode_size(varnode);
Self {
address: sys::varnode_address(varnode).as_ref().unwrap().into(),
size: size.try_into().unwrap_or_else(|err| {
panic!("unable to convert Ghidra varnode size: {size}. {err}")
}),
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AddressSpaceId(usize);
impl std::fmt::Debug for AddressSpaceId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("AddressSpaceId")
.field(&format!(
"{id:#0width$x}",
id = &self.0,
width = 2 * std::mem::size_of::<usize>()
))
.finish()
}
}
impl std::fmt::Display for AddressSpaceId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:#0width$x}",
self.0,
width = 2 * std::mem::size_of::<usize>()
)
}
}
impl AddressSpaceId {
pub const fn new(id: usize) -> Self {
Self(id)
}
pub const fn raw_id(self) -> usize {
self.0
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AddressSpace {
pub id: AddressSpaceId,
pub name: Cow<'static, str>,
pub word_size: usize,
pub address_size: usize,
pub space_type: AddressSpaceType,
pub big_endian: bool,
}
impl AddressSpace {
pub fn is_constant(&self) -> bool {
self.space_type == AddressSpaceType::Constant
}
pub unsafe fn from_ghidra_id(id: AddressSpaceId) -> AddressSpace {
AddressSpace::from(unsafe { &*(id.0 as *const sys::AddrSpace) })
}
}
impl std::fmt::Display for AddressSpace {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
impl From<&sys::AddrSpace> for AddressSpace {
fn from(address_space: &sys::AddrSpace) -> Self {
Self {
id: address_space.into(),
name: Cow::Owned(address_space.name().to_string()),
word_size: address_space.word_size().try_into().unwrap(),
address_size: address_space.address_size().try_into().unwrap(),
space_type: address_space.space_type().into(),
big_endian: address_space.big_endian(),
}
}
}
impl From<&sys::AddrSpace> for AddressSpaceId {
fn from(address_space: &sys::AddrSpace) -> Self {
Self::new((address_space as *const _) as usize)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum AddressSpaceType {
Constant,
Processor,
BaseRegister,
Internal,
FuncCallSpecs,
PcodeOp,
Join,
}
impl From<sys::spacetype> for AddressSpaceType {
fn from(space_type: sys::spacetype) -> Self {
match space_type {
sys::spacetype::IPTR_CONSTANT => Self::Constant,
sys::spacetype::IPTR_PROCESSOR => Self::Processor,
sys::spacetype::IPTR_SPACEBASE => Self::BaseRegister,
sys::spacetype::IPTR_INTERNAL => Self::Internal,
sys::spacetype::IPTR_FSPEC => Self::FuncCallSpecs,
sys::spacetype::IPTR_IOP => Self::PcodeOp,
sys::spacetype::IPTR_JOIN => Self::Join,
_ => panic!("Unknown address space type: {space_type:?}"),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PcodeInstruction {
pub address: Address,
pub op_code: OpCode,
pub inputs: Vec<VarnodeData>,
pub output: Option<VarnodeData>,
}
impl std::fmt::Display for PcodeInstruction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {:?} ", self.address, self.op_code)?;
if let Some(output) = &self.output {
write!(f, "{output} <- ")?;
}
for input in self.inputs.iter() {
write!(f, "{input} ")?;
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AssemblyInstruction {
pub address: Address,
pub mnemonic: String,
pub body: String,
}
impl std::fmt::Display for AssemblyInstruction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{address}] {mnemonic} {body}",
address = self.address,
mnemonic = self.mnemonic,
body = self.body
)?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PcodeDisassembly {
pub instructions: Vec<PcodeInstruction>,
pub origin: VarnodeData,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct NativeDisassembly {
pub instruction: AssemblyInstruction,
pub origin: VarnodeData,
}
impl std::fmt::Display for NativeDisassembly {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"[{origin}]: {instruction}",
origin = self.origin,
instruction = self.instruction,
)?;
Ok(())
}
}
impl std::fmt::Display for PcodeDisassembly {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"[{origin}]: {count} instructions",
origin = self.origin,
count = self.instructions.len()
)?;
for instr in &self.instructions {
writeln!(f, "{instr}")?;
}
Ok(())
}
}
#[derive(Default)]
struct NativeDisassemblyOutput {
instruction: Option<AssemblyInstruction>,
}
impl api::AssemblyEmit for NativeDisassemblyOutput {
fn dump(
&mut self,
address: &sys::Address,
mnemonic: &libsla_sys::cxx::CxxString,
body: &libsla_sys::cxx::CxxString,
) {
assert!(
self.instruction.is_none(),
"native disassembly should dump 1 instruction"
);
self.instruction = Some(AssemblyInstruction {
address: address.into(),
mnemonic: mnemonic.to_string(),
body: body.to_string(),
});
}
}
#[derive(Default)]
struct PcodeDisassemblyOutput {
instructions: Vec<PcodeInstruction>,
}
impl api::PcodeEmit for PcodeDisassemblyOutput {
fn dump(
&mut self,
address: &sys::Address,
op_code: sys::OpCode,
output_variable: Option<&sys::VarnodeData>,
input_variables: &CxxVector<sys::VarnodeData>,
) {
self.instructions.push(PcodeInstruction {
address: address.into(),
op_code: op_code.into(),
inputs: input_variables
.into_iter()
.map(Into::<VarnodeData>::into)
.collect(),
output: output_variable.map(Into::<VarnodeData>::into),
});
}
}
pub struct InstructionBytes(Vec<u8>);
impl InstructionBytes {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
}
impl FromIterator<u8> for InstructionBytes {
fn from_iter<T: IntoIterator<Item = u8>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}
impl InstructionLoader for InstructionBytes {
fn load_instruction_bytes(&self, data: &VarnodeData) -> std::result::Result<Vec<u8>, String> {
let start = usize::try_from(data.address.offset)
.map_err(|err| format!("offset should convert to usize: {err:?}"))?;
if start >= self.0.len() {
return Err(format!(
"Offset {start} exceeds count of instruction bytes {len}",
len = self.0.len()
));
}
let end = start.saturating_add(data.size);
let end = usize::min(end, self.0.len());
Ok(self.0[start..end].to_vec())
}
}
struct InstructionLoaderWrapper<'a>(&'a dyn InstructionLoader);
impl InstructionLoaderWrapper<'_> {
fn is_readable(&self, varnode: &VarnodeData) -> bool {
self.0
.load_instruction_bytes(varnode)
.is_ok_and(|data| data.len() == varnode.size)
}
}
impl api::LoadImage for InstructionLoaderWrapper<'_> {
fn load_fill(
&self,
data: &mut [u8],
address: &sys::Address,
) -> std::result::Result<(), String> {
let varnode = VarnodeData {
size: data.len(),
address: address.into(),
};
let loaded_data = self.0.load_instruction_bytes(&varnode)?;
data[..loaded_data.len()].copy_from_slice(&loaded_data);
Ok(())
}
}
pub trait InstructionLoader {
fn load_instruction_bytes(&self, source: &VarnodeData) -> std::result::Result<Vec<u8>, String>;
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
pub enum SlaDataEncoding {
#[default]
Sla,
Raw,
}
pub enum MissingSpec {}
pub enum HasSpec {}
pub struct GhidraSleighBuilder<P> {
store: UniquePtr<sys::DocumentStorage>,
sla_encoding: SlaDataEncoding,
processor_spec: std::marker::PhantomData<P>,
}
impl Default for GhidraSleighBuilder<MissingSpec> {
fn default() -> Self {
LIBSLA_INIT.call_once(|| {
sys::initialize_element_id();
sys::initialize_attribute_id();
});
Self {
store: sys::new_document_storage(),
sla_encoding: Default::default(),
processor_spec: Default::default(),
}
}
}
impl GhidraSleighBuilder<MissingSpec> {
pub fn processor_spec(mut self, processor_spec: &str) -> Result<GhidraSleighBuilder<HasSpec>> {
let_cxx_string!(processor_spec = processor_spec);
sys::parse_document_and_register_root(self.store.pin_mut(), &processor_spec).map_err(
|err| Error::DependencyError {
message: Cow::Borrowed("failed to parse processor specification"),
source: Box::new(err),
},
)?;
Ok(GhidraSleighBuilder::<HasSpec> {
store: self.store,
sla_encoding: self.sla_encoding,
processor_spec: std::marker::PhantomData,
})
}
}
impl GhidraSleighBuilder<HasSpec> {
pub fn sla_encoding(self, encoding: SlaDataEncoding) -> Self {
Self {
store: self.store,
sla_encoding: encoding,
processor_spec: std::marker::PhantomData,
}
}
pub fn build(self, sla: impl AsRef<[u8]>) -> Result<GhidraSleigh> {
let_cxx_string!(sla = sla);
let mut sleigh = sys::new_sleigh(sys::new_context_internal());
let pin = sleigh.pin_mut();
let init_result = match self.sla_encoding {
SlaDataEncoding::Sla => pin.initialize_from_sla(&sla),
SlaDataEncoding::Raw => pin.initialize_from_raw_sla(&sla),
};
init_result.map_err(|err| Error::DependencyError {
message: Cow::Borrowed("failed to initialize Ghidra sleigh"),
source: Box::new(err),
})?;
sleigh
.pin_mut()
.parse_processor_config(&self.store)
.map_err(|err| Error::DependencyError {
message: Cow::Borrowed("failed to import processor config"),
source: Box::new(err),
})?;
Ok(GhidraSleigh { sleigh })
}
}
pub struct GhidraSleigh {
sleigh: UniquePtr<sys::SleighProxy>,
}
impl GhidraSleigh {
pub fn builder() -> GhidraSleighBuilder<MissingSpec> {
Default::default()
}
fn sys_address(&self, address: &Address) -> Option<UniquePtr<sys::Address>> {
let sys_addr_space = self.sys_address_space(&address.address_space)?;
Some(unsafe { sys::new_address(sys_addr_space, address.offset) })
}
fn sys_address_space(&self, address_space: &AddressSpace) -> Option<*mut sys::AddrSpace> {
for i in 0..self.sleigh.num_spaces() {
let sys_addr_space = self.sleigh.address_space(i);
if unsafe { (&*sys_addr_space).name() } == address_space.name.as_ref() {
return Some(sys_addr_space);
}
}
None
}
}
impl Sleigh for GhidraSleigh {
fn default_code_space(&self) -> AddressSpace {
unsafe { &*self.sleigh.default_code_space() }.into()
}
fn address_spaces(&self) -> Vec<AddressSpace> {
let num_spaces = self.sleigh.num_spaces();
let mut addr_spaces = Vec::with_capacity(num_spaces as usize);
for i in 0..num_spaces {
let raw_addr_space = unsafe { &*self.sleigh.address_space(i) };
addr_spaces.push(raw_addr_space.into());
}
addr_spaces
}
fn register_name(&self, target: &VarnodeData) -> Option<String> {
let base = self.sys_address_space(&target.address.address_space)?;
let _ = target.address.offset.checked_add(target.size as u64)?;
let register_name = unsafe {
self.sleigh
.register_name(base, target.address.offset, target.size as i32)
};
let register_name = register_name.to_string();
if register_name.is_empty() {
None
} else {
Some(register_name)
}
}
fn register_from_name(&self, name: impl AsRef<str>) -> Result<VarnodeData> {
let_cxx_string!(name = name.as_ref());
self.sleigh
.register_from_name(&name)
.map(VarnodeData::from)
.map_err(|err| Error::DependencyError {
message: Cow::Owned(format!("failed to get register {name}")),
source: Box::new(err),
})
}
fn disassemble_pcode(
&self,
loader: &dyn InstructionLoader,
address: Address,
) -> Result<PcodeDisassembly> {
let sys_address = self.sys_address(&address).expect("invalid address");
let loader = InstructionLoaderWrapper(loader);
let rust_loader = rust::RustLoadImage(&loader);
let mut output = PcodeDisassemblyOutput::default();
let mut emitter = rust::RustPcodeEmit(&mut output);
let response = self.sleigh.disassemble_pcode(
&rust_loader,
&mut emitter,
sys_address.as_ref().unwrap(),
);
Ok(PcodeDisassembly {
origin: handle_disassembly_response(response, loader, address)?,
instructions: output.instructions,
})
}
fn disassemble_native(
&self,
loader: &dyn InstructionLoader,
address: Address,
) -> Result<NativeDisassembly> {
let sys_address = self.sys_address(&address).expect("invalid address");
let loader = InstructionLoaderWrapper(loader);
let rust_loader = rust::RustLoadImage(&loader);
let mut output = NativeDisassemblyOutput::default();
let mut emitter = rust::RustAssemblyEmit(&mut output);
let response = self.sleigh.disassemble_native(
&rust_loader,
&mut emitter,
sys_address.as_ref().unwrap(),
);
Ok(NativeDisassembly {
origin: handle_disassembly_response(response, loader, address)?,
instruction: output.instruction.ok_or_else(|| {
Error::InternalError("ghidra did not disassemble an instruction".to_owned())
})?,
})
}
fn register_name_map(&self) -> BTreeMap<VarnodeData, String> {
self.sleigh
.all_register_names()
.into_iter()
.map(|data| (data.register().into(), data.name().to_string()))
.collect()
}
}
fn handle_disassembly_response(
response: std::result::Result<i32, libsla_sys::cxx::Exception>,
loader: InstructionLoaderWrapper,
address: Address,
) -> Result<VarnodeData> {
let source = VarnodeData {
address,
size: num_bytes_disassembled(response)?,
};
if !loader.is_readable(&source) {
return Err(Error::InsufficientData(source));
}
Ok(source)
}
fn num_bytes_disassembled(
response: std::result::Result<i32, libsla_sys::cxx::Exception>,
) -> Result<usize> {
let bytes_consumed = response
.map_err(|err| Error::DependencyError {
message: Cow::Borrowed("failed to decode instruction"),
source: Box::new(err),
})?
.try_into()
.map_err(|err| {
Error::InternalError(format!("instruction origin size is too large: {err}"))
})?;
Ok(bytes_consumed)
}