use std;
use anyhow::Error;
use remoteprocess::ProcessMemory;
use crate::python_interpreters::{StringObject, BytesObject, InterpreterState, Object, TypeObject, TupleObject, ListObject};
use crate::version::Version;
pub fn copy_string<T: StringObject, P: ProcessMemory>(ptr: * const T, process: &P) -> Result<String, Error> {
let obj = process.copy_pointer(ptr)?;
if obj.size() >= 4096 {
return Err(format_err!("Refusing to copy {} chars of a string", obj.size()));
}
let kind = obj.kind();
let bytes = process.copy(obj.address(ptr as usize), obj.size() * kind as usize)?;
match (kind, obj.ascii()) {
(4, _) => {
#[allow(clippy::cast_ptr_alignment)]
let chars = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as * const char, bytes.len() / 4) };
Ok(chars.iter().collect())
},
(2, _) => {
Err(format_err!("ucs2 strings aren't supported yet!"))
},
(1, true) => Ok(String::from_utf8(bytes)?),
(1, false) => Ok(bytes.iter().map(|&b| { b as char }).collect()),
_ => Err(format_err!("Unknown string kind {}", kind))
}
}
pub fn copy_bytes<T: BytesObject, P: ProcessMemory>(ptr: * const T, process: &P) -> Result<Vec<u8>, Error> {
let obj = process.copy_pointer(ptr)?;
let size = obj.size();
if size >= 65536 {
return Err(format_err!("Refusing to copy {} bytes", size));
}
Ok(process.copy(obj.address(ptr as usize), size as usize)?)
}
pub fn copy_long(process: &remoteprocess::Process, addr: usize) -> Result<(i64, bool), Error> {
let value = process.copy_pointer(addr as *const crate::python_bindings::v3_7_0::PyLongObject)?;
let negative: i64 = if value.ob_base.ob_size < 0 { -1 } else { 1 };
let size = value.ob_base.ob_size * (negative as isize);
match size {
0 => Ok((0, false)),
1 => Ok((negative * (value.ob_digit[0] as i64), false)),
#[cfg(target_pointer_width = "64")]
2 => {
let digits: [u32; 2] = process.copy_struct(addr + std::mem::size_of_val(&value) - 8)?;
let mut ret: i64 = 0;
for i in 0..size {
ret += (digits[i as usize] as i64) << (30 * i);
}
Ok((negative * ret, false))
}
#[cfg(target_pointer_width = "32")]
2..=4 => {
let digits: [u16; 4] = process.copy_struct(addr + std::mem::size_of_val(&value) - 4)?;
let mut ret: i64 = 0;
for i in 0..size {
ret += (digits[i as usize] as i64) << (15 * i);
}
Ok((negative * ret, false))
}
_ => Ok((value.ob_base.ob_size as i64, true))
}
}
pub fn copy_int(process: &remoteprocess::Process, addr: usize) -> Result<i64, Error> {
let value = process.copy_pointer(addr as *const crate::python_bindings::v2_7_15::PyIntObject)?;
Ok(value.ob_ival as i64)
}
pub struct DictIterator<'a> {
process: &'a remoteprocess::Process,
entries_addr: usize,
kind: u8,
index: usize,
entries: usize,
values: usize
}
impl<'a> DictIterator<'a> {
pub fn from_managed_dict(process: &'a remoteprocess::Process, version: &'a Version, addr: usize, tp_addr: usize) -> Result<DictIterator<'a>, Error> {
let values_addr: usize = process.copy_struct(addr - 4 * std::mem::size_of::<usize>())?;
let dict_addr: usize = process.copy_struct(addr - 3 * std::mem::size_of::<usize>())?;
if values_addr != 0 {
let ht: crate::python_bindings::v3_11_0::PyHeapTypeObject = process.copy_struct(tp_addr)?;
let keys: crate::python_bindings::v3_11_0::PyDictKeysObject = process.copy_struct(ht.ht_cached_keys as usize)?;
let entries_addr = ht.ht_cached_keys as usize + (1 << keys.dk_log2_index_bytes) + std::mem::size_of_val(&keys);
Ok(DictIterator{process, entries_addr, index: 0, kind: keys.dk_kind, entries: keys.dk_nentries as usize, values: values_addr})
} else if dict_addr != 0 {
DictIterator::from(process, version, dict_addr)
} else {
Err(format_err!("neither values or dict is set in managed dict"))
}
}
pub fn from(process: &'a remoteprocess::Process, version: &'a Version, addr: usize) -> Result<DictIterator<'a>, Error> {
match version {
Version{major: 3, minor: 11, ..} => {
let dict: crate::python_bindings::v3_11_0::PyDictObject = process.copy_struct(addr)?;
let keys = process.copy_pointer(dict.ma_keys)?;
let entries_addr = dict.ma_keys as usize + (1 << keys.dk_log2_index_bytes) + std::mem::size_of_val(&keys);
Ok(DictIterator{process, entries_addr, index: 0, kind: keys.dk_kind, entries: keys.dk_nentries as usize, values: dict.ma_values as usize})
},
_ => {
let dict: crate::python_bindings::v3_7_0::PyDictObject = process.copy_struct(addr)?;
let keys = process.copy_pointer(dict.ma_keys)?;
let index_size = match keys.dk_size {
0..=0xff => 1,
0..=0xffff => 2,
#[cfg(target_pointer_width = "64")]
0..=0xffffffff => 4,
#[cfg(target_pointer_width = "64")]
_ => 8,
#[cfg(not(target_pointer_width = "64"))]
_ => 4
};
let byteoffset = (keys.dk_size * index_size) as usize;
let entries_addr = dict.ma_keys as usize + byteoffset + std::mem::size_of_val(&keys);
Ok(DictIterator{process, entries_addr, index: 0, kind: 0, entries: keys.dk_nentries as usize, values: dict.ma_values as usize})
}
}
}
}
impl<'a> Iterator for DictIterator<'a> {
type Item = Result<(usize, usize), Error>;
fn next(&mut self) -> Option<Self::Item> {
while self.index < self.entries {
let index = self.index;
self.index += 1;
let entry = match self.kind {
0 => {
let addr = index * std::mem::size_of::<crate::python_bindings::v3_7_0::PyDictKeyEntry>() + self.entries_addr;
let ret = self.process.copy_struct::<crate::python_bindings::v3_7_0::PyDictKeyEntry>(addr);
ret.map(|entry| (entry.me_key as usize, entry.me_value as usize))
},
_ => {
let addr = index * std::mem::size_of::<crate::python_bindings::v3_11_0::PyDictUnicodeEntry>() + self.entries_addr;
let ret = self.process.copy_struct::<crate::python_bindings::v3_11_0::PyDictUnicodeEntry>(addr);
ret.map(|entry| (entry.me_key as usize, entry.me_value as usize))
}
};
match entry {
Ok((key, value)) => {
if key == 0 {
continue;
}
let value = if self.values != 0 {
let valueaddr = self.values + index * std::mem::size_of::<* mut crate::python_bindings::v3_7_0::PyObject>();
match self.process.copy_struct(valueaddr) {
Ok(addr) => addr,
Err(e) => { return Some(Err(e.into())); }
}
} else {
value
};
return Some(Ok((key, value)))
},
Err(e) => {
return Some(Err(e.into()))
}
}
}
None
}
}
pub const PY_TPFLAGS_MANAGED_DICT: usize = 1 << 4;
const PY_TPFLAGS_INT_SUBCLASS: usize = 1 << 23;
const PY_TPFLAGS_LONG_SUBCLASS: usize = 1 << 24;
const PY_TPFLAGS_LIST_SUBCLASS: usize = 1 << 25;
const PY_TPFLAGS_TUPLE_SUBCLASS: usize = 1 << 26;
const PY_TPFLAGS_BYTES_SUBCLASS: usize = 1 << 27;
const PY_TPFLAGS_STRING_SUBCLASS: usize = 1 << 28;
const PY_TPFLAGS_DICT_SUBCLASS: usize = 1 << 29;
pub fn format_variable<I>(process: &remoteprocess::Process, version: &Version, addr: usize, max_length: isize)
-> Result<String, Error> where I: InterpreterState {
if max_length <= 5 {
return Ok("...".to_owned());
}
let value: I::Object = process.copy_struct(addr)?;
let value_type = process.copy_pointer(value.ob_type())?;
let max_type_len = 128;
let value_type_name = process.copy(value_type.name() as usize, max_type_len)?;
let length = value_type_name.iter().position(|&x| x == 0).unwrap_or(max_type_len);
let value_type_name = std::str::from_utf8(&value_type_name[..length])?;
let format_int = |value: i64| {
if value_type_name == "bool" {
(if value > 0 { "True" } else { "False" }).to_owned()
} else {
format!("{}", value)
}
};
let flags = value_type.flags();
let formatted = if flags & PY_TPFLAGS_INT_SUBCLASS != 0 {
format_int(copy_int(process, addr)?)
} else if flags & PY_TPFLAGS_LONG_SUBCLASS != 0 {
let (value, overflowed) = copy_long(process, addr)?;
if overflowed {
if value > 0 { "+bigint".to_owned() } else { "-bigint".to_owned() }
} else {
format_int(value)
}
} else if flags & PY_TPFLAGS_STRING_SUBCLASS != 0 ||
(version.major == 2 && (flags & PY_TPFLAGS_BYTES_SUBCLASS != 0)) {
let value = copy_string(addr as *const I::StringObject, process)?.replace("\"", "\\\"").replace("\n", "\\n");
if value.len() as isize >= max_length - 5 {
format!("\"{}...\"", &value[..(max_length - 5) as usize])
} else {
format!("\"{}\"", value)
}
} else if flags & PY_TPFLAGS_DICT_SUBCLASS != 0 {
if version.major == 3 && version.minor >= 6 {
let mut values = Vec::new();
let mut remaining = max_length - 2;
for entry in DictIterator::from(process, version, addr)? {
let (key, value) = entry?;
let key = format_variable::<I>(process, version, key, remaining)?;
let value = format_variable::<I>(process, version, value, remaining)?;
remaining -= (key.len() + value.len()) as isize + 4;
if remaining <= 5 {
values.push("...".to_owned());
break;
}
values.push(format!("{}: {}", key, value));
}
format!("{{{}}}", values.join(", "))
} else {
"dict".to_owned()
}
} else if flags & PY_TPFLAGS_LIST_SUBCLASS != 0 {
let object: I::ListObject = process.copy_struct(addr)?;
let addr = object.item() as usize;
let mut values = Vec::new();
let mut remaining = max_length - 2;
for i in 0..object.size() {
let valueptr: *mut I::Object = process.copy_struct(addr + i * std::mem::size_of::<* mut I::Object>())?;
let value = format_variable::<I>(process, version, valueptr as usize, remaining)?;
remaining -= value.len() as isize + 2;
if remaining <= 5 {
values.push("...".to_owned());
break;
}
values.push(value);
}
format!("[{}]", values.join(", "))
} else if flags & PY_TPFLAGS_TUPLE_SUBCLASS != 0 {
let object: I::TupleObject = process.copy_struct(addr)?;
let mut values = Vec::new();
let mut remaining = max_length - 2;
for i in 0..object.size() {
let value_addr: *mut I::Object = process.copy_struct(object.address(addr, i))?;
let value = format_variable::<I>(process, version, value_addr as usize, remaining)?;
remaining -= value.len() as isize + 2;
if remaining <= 5 {
values.push("...".to_owned());
break;
}
values.push(value);
}
format!("({})", values.join(", "))
} else if value_type_name == "float" {
let value = process.copy_pointer(addr as *const crate::python_bindings::v3_7_0::PyFloatObject)?;
format!("{}", value.ob_fval)
} else if value_type_name == "NoneType" {
"None".to_owned()
} else {
format!("<{} at 0x{:x}>", value_type_name, addr)
};
Ok(formatted)
}
#[cfg(test)]
pub mod tests {
use super::*;
use remoteprocess::LocalProcess;
use crate::python_bindings::v3_7_0::{PyBytesObject, PyVarObject, PyUnicodeObject, PyASCIIObject};
use std::ptr::copy_nonoverlapping;
#[allow(dead_code)]
pub struct AllocatedPyByteObject {
pub base: PyBytesObject,
pub storage: [u8; 4096]
}
#[allow(dead_code)]
pub struct AllocatedPyASCIIObject {
pub base: PyASCIIObject,
pub storage: [u8; 4096]
}
pub fn to_byteobject(bytes: &[u8]) -> AllocatedPyByteObject {
let ob_size = bytes.len() as isize;
let base = PyBytesObject{ob_base: PyVarObject{ob_size, ..Default::default()}, ..Default::default()};
let mut ret = AllocatedPyByteObject{base, storage: [0 as u8; 4096]};
unsafe { copy_nonoverlapping(bytes.as_ptr(), ret.base.ob_sval.as_mut_ptr() as *mut u8, bytes.len()); }
ret
}
pub fn to_asciiobject(input: &str) -> AllocatedPyASCIIObject {
let bytes: Vec<u8> = input.bytes().collect();
let mut base = PyASCIIObject{length: bytes.len() as isize, ..Default::default()};
base.state.set_compact(1);
base.state.set_kind(1);
base.state.set_ascii(1);
let mut ret = AllocatedPyASCIIObject{base, storage: [0 as u8; 4096]};
unsafe {
let ptr = &mut ret as *mut AllocatedPyASCIIObject as *mut u8;
let dst = ptr.offset(std::mem::size_of::<PyASCIIObject>() as isize);
copy_nonoverlapping(bytes.as_ptr(), dst, bytes.len());
}
ret
}
#[test]
fn test_copy_string() {
let original = "function_name";
let obj = to_asciiobject(original);
let unicode: &PyUnicodeObject = unsafe{ std::mem::transmute(&obj.base) };
let copied = copy_string(unicode, &LocalProcess).unwrap();
assert_eq!(copied, original);
}
#[test]
fn test_copy_bytes() {
let original = [10_u8, 20, 30, 40, 50, 70, 80];
let bytes = to_byteobject(&original);
let copied = copy_bytes(&bytes.base, &LocalProcess).unwrap();
assert_eq!(copied, original);
}
}