pyc 0.1.0

Read compiled Python files
Documentation
use failure::Error;
use log::debug;
use std::cell::RefCell;

mod dis;

use python_object::Object;

const TYPE_NULL: u8 = '0' as u8;
const TYPE_NONE: u8 = 'N' as u8;
const TYPE_FALSE: u8 = 'F' as u8;
const TYPE_TRUE: u8 = 'T' as u8;
const TYPE_STOPITER: u8 = 'S' as u8;
const TYPE_ELLIPSIS: u8 = '.' as u8;
const TYPE_INT: u8 = 'i' as u8;
/* TYPE_INT64 is not generated anymore.
 *    Supported for backward compatibility only. */
const TYPE_INT64: u8 = 'I' as u8;
const TYPE_FLOAT: u8 = 'f' as u8;
const TYPE_BINARY_FLOAT: u8 = 'g' as u8;
const TYPE_COMPLEX: u8 = 'x' as u8;
const TYPE_BINARY_COMPLEX: u8 = 'y' as u8;
const TYPE_LONG: u8 = 'l' as u8;
const TYPE_STRING: u8 = 's' as u8;
const TYPE_INTERNED: u8 = 't' as u8;
const TYPE_REF: u8 = 'r' as u8;
const TYPE_TUPLE: u8 = '(' as u8;
const TYPE_LIST: u8 = '[' as u8;
const TYPE_DICT: u8 = '{' as u8;
const TYPE_CODE: u8 = 'c' as u8;
const TYPE_UNICODE: u8 = 'u' as u8;
const TYPE_UNKNOWN: u8 = '?' as u8;
const TYPE_SET: u8 = '<' as u8;
const TYPE_FROZENSET: u8 = '>' as u8;
const FLAG_REF: u8 = 0x80 as u8; /* with a type, add obj to index */

const TYPE_ASCII: u8 = 'a' as u8;
const TYPE_ASCII_INTERNED: u8 = 'A' as u8;
const TYPE_SMALL_TUPLE: u8 = ')' as u8;
const TYPE_SHORT_ASCII: u8 = 'z' as u8;
const TYPE_SHORT_ASCII_INTERNED: u8 = 'Z' as u8;

// FIXME: replace by u32 from bytes le std
fn as_u32_le(bytes: &[u8]) -> u32 {
    (bytes[0] as u32)
        | ((bytes[1] as u32) << 8)
        | ((bytes[2] as u32) << 16)
        | ((bytes[3] as u32) << 24)
}

pub struct Bytes {
    data: Vec<u8>,
    cursor: usize,
}
impl Bytes {
    pub fn new(data: Vec<u8>) -> Bytes {
        Bytes { data, cursor: 0 }
    }

    pub fn eat_n(&mut self, n: usize) -> &[u8] {
        let slice = &self.data[self.cursor..self.cursor + n];
        self.cursor += n;
        slice
    }

    pub fn eat_4(&mut self) -> &[u8] {
        self.eat_n(4)
    }

    pub fn eat_byte(&mut self) -> u8 {
        self.eat_n(1)[0]
    }

    pub fn is_eof(&self) -> bool {
        return self.cursor >= self.data.len();
    }
}

fn find_ref(refs: &Vec<python_object::Object>, v: &python_object::Object) -> python_object::Object {
    if let python_object::Object::Ref(id) = v {
        refs[*id as usize].clone()
    } else {
        panic!()
    }
}

fn resolve_refs_func(refs: &Vec<python_object::Object>, code_object: &python_object::CodeObject) {
    // name
    {
        let new_name = if code_object.name.clone().into_inner().is_ref() {
            Some(find_ref(refs, &code_object.name.clone().into_inner()))
        } else {
            None
        };

        if let Some(new_name) = new_name {
            code_object.name.replace(new_name);
        }
    }

    // names
    {
        let new_names = match code_object.names.clone().into_inner() {
            python_object::Object::Tuple(tuple_object) => {
                let mut new_data = vec![];

                for item in tuple_object.data {
                    if item.is_ref() {
                        let v = find_ref(refs, &item);
                        debug!("resolve ref {:?} to {:?}", item, v);
                        new_data.push(v);
                    } else {
                        new_data.push(item);
                    }
                }

                Some(python_object::Object::Tuple(python_object::TupleObject {
                    size: tuple_object.size,
                    data: new_data,
                }))
            }
            _ => None,
        };
        if let Some(new_names) = new_names {
            code_object.names.replace(new_names);
        }
    }

    // consts
    {
        let new_consts = {
            let mut new_data = vec![];

            for item in code_object.consts.clone().into_inner().data {
                match item {
                    python_object::Object::Code(ref code_object) => {
                        resolve_refs_func(refs, code_object);
                        new_data.push(python_object::Object::Code(code_object.clone()));
                        continue;
                    }
                    python_object::Object::Tuple(ref tuple_object) => {
                        let mut tuple_object = tuple_object.clone();
                        for item in &mut tuple_object.data {
                            if item.is_ref() {
                                let v = find_ref(refs, &item);
                                debug!("resolve ref {:?} to {:?}", item, v);
                                *item = v;
                            }
                        }
                        new_data.push(python_object::Object::Tuple(tuple_object));
                        continue;
                    }
                    _ => {
                        if item.is_ref() {
                            let v = find_ref(refs, &item);
                            debug!("resolve ref {:?} to {:?}", item, v);
                            new_data.push(v);
                        } else {
                            new_data.push(item);
                        }
                    }
                }
            }

            Some(python_object::TupleObject {
                size: code_object.consts.clone().into_inner().size,
                data: new_data,
            })
        };
        if let Some(new_consts) = new_consts {
            code_object.consts.replace(new_consts);
        }
    }
}

fn resolve_refs(refs: &Vec<python_object::Object>, program: &mut python_object::Program) {
    for func in &program.funcs {
        resolve_refs_func(refs, func);
    }

    // debug!("program.funcs {:#?}", program.funcs);
}

pub fn decode(bytes: &mut Bytes) -> Result<python_object::Program, Error> {
    let magic = bytes.eat_4();
    debug!("magic {:x?}", magic);
    // TODO: check magic

    let mtime = bytes.eat_4();
    debug!("mtime {:x?}", mtime);

    // code size
    bytes.eat_4();

    let mut funcs = vec![];
    let mut refs = vec![];

    loop {
        let object = decode_object(bytes, &mut refs)?;
        match object {
            Object::Code(c) => funcs.push(c),
            _ => panic!(),
        }

        if bytes.is_eof() {
            break;
        }
    }

    let mut program = python_object::Program { funcs };
    resolve_refs(&refs, &mut program);

    Ok(program)
}

fn decode_object(bytes: &mut Bytes, refs: &mut Vec<Object>) -> Result<Object, Error> {
    let code = bytes.eat_byte();
    let flag = code & FLAG_REF;
    let _type = code & !FLAG_REF;

    macro_rules! as_ref {
        ($o:expr) => {{
            if flag != 0 {
                let o = $o;
                refs.push(o.clone());
                o
            } else {
                $o
            }
        }};
    }

    Ok(match _type {
        TYPE_NULL => unimplemented!("TYPE_NULL"),
        TYPE_NONE => Object::None,
        TYPE_STOPITER => unimplemented!("TYPE_STOPITER"),
        TYPE_ELLIPSIS => unimplemented!("TYPE_ELLIPSIS"),
        TYPE_FALSE => Object::False,
        TYPE_TRUE => Object::True,
        TYPE_INT => {
            let v = as_u32_le(bytes.eat_4()) as i32;
            as_ref!(Object::Int(v))
        }
        TYPE_INT64 => unimplemented!("TYPE_INT64"),
        TYPE_LONG => unimplemented!("TYPE_LONG"),
        TYPE_FLOAT => unimplemented!("TYPE_FLOAT"),
        TYPE_BINARY_FLOAT => unimplemented!("TYPE_BINARY_FLOAT"),
        TYPE_COMPLEX => unimplemented!("TYPE_COMPLEX"),
        TYPE_BINARY_COMPLEX => unimplemented!("TYPE_BINARY_COMPLEX"),
        TYPE_STRING => as_ref!(Object::Chars(decode_string(bytes).to_vec())),
        TYPE_ASCII_INTERNED => unimplemented!("TYPE_ASCII_INTERNED"),
        TYPE_ASCII => {
            let n = as_u32_le(bytes.eat_4());
            let string = decode_ascii(bytes, false, n as u32);
            as_ref!(Object::Ascii(string))
        }
        TYPE_SHORT_ASCII_INTERNED => {
            let n = bytes.eat_byte();
            let string = decode_ascii(bytes, true, n as u32);
            as_ref!(Object::Ascii(string))
        }
        TYPE_SHORT_ASCII => {
            let n = bytes.eat_byte();
            let string = decode_ascii(bytes, false, n as u32);
            as_ref!(Object::Ascii(string))
        }
        TYPE_INTERNED => unimplemented!("TYPE_INTERNED"),
        TYPE_UNICODE => unimplemented!("TYPE_UNICODE"),
        TYPE_SMALL_TUPLE => {
            let n = bytes.eat_byte();
            let objects = decode_tuple(bytes, n as u32, refs)?;
            as_ref!(Object::Tuple(python_object::TupleObject {
                size: n,
                data: objects,
            }))
        }
        TYPE_TUPLE => unimplemented!("TYPE_TUPLE"),
        TYPE_LIST => unimplemented!("TYPE_LIST"),
        TYPE_DICT => unimplemented!("TYPE_DICT"),
        TYPE_SET => unimplemented!("TYPE_SET"),
        TYPE_FROZENSET => unimplemented!("TYPE_FROZENSET"),
        TYPE_CODE => {
            let ref_id = if flag != 0 {
                let ref_id = refs.len();
                refs.push(Object::None);
                Some(ref_id)
            } else {
                None
            };

            let argcount = as_u32_le(bytes.eat_4());
            let kwonlyargcount = as_u32_le(bytes.eat_4());
            let nlocals = as_u32_le(bytes.eat_4());
            let stacksize = as_u32_le(bytes.eat_4());
            let flags = as_u32_le(bytes.eat_4());

            let code = match decode_object(bytes, refs)? {
                Object::Chars(code) => dis::dis(&mut Bytes::new(code.to_vec())),
                _ => panic!(),
            };
            let consts = match decode_object(bytes, refs)? {
                Object::Tuple(v) => RefCell::new(v),
                _ => panic!(),
            };
            let names = Box::new(RefCell::new(decode_object(bytes, refs)?));
            let varnames = Box::new(decode_object(bytes, refs)?);
            let _freevars = decode_object(bytes, refs)?;
            let _cellvars = decode_object(bytes, refs)?;
            let _filename = decode_object(bytes, refs)?;
            let name = Box::new(RefCell::new(decode_object(bytes, refs)?));
            let _firstlineno = as_u32_le(bytes.eat_4());
            let _lnotab = decode_object(bytes, refs)?;

            let o = Object::Code(python_object::CodeObject {
                argcount,
                kwonlyargcount,
                nlocals,
                stacksize,
                flags,
                code,
                consts,
                names,
                name,
                varnames,
            });
            if let Some(ref_id) = ref_id {
                refs[ref_id] = o.clone();
            }
            o
        }
        TYPE_REF => {
            let p = decode_ref(bytes);
            if p as usize >= refs.len() {
                failure::bail!("ref out of bounds");
            }

            Object::Ref(p)
        }
        b => unimplemented!("unknown type code {:x?}", b),
    })
}

fn decode_string(bytes: &mut Bytes) -> &[u8] {
    let size = as_u32_le(bytes.eat_4());
    // TODO: check size

    bytes.eat_n(size as usize)
}

fn decode_tuple(bytes: &mut Bytes, n: u32, refs: &mut Vec<Object>) -> Result<Vec<Object>, Error> {
    let mut objects = vec![];
    for _ in 0..n {
        objects.push(decode_object(bytes, refs)?);
    }
    Ok(objects)
}

fn decode_ascii(bytes: &mut Bytes, _is_interned: bool, n: u32) -> String {
    let mut string = "".to_string();
    for _ in 0..n {
        string.push(bytes.eat_byte() as char)
    }
    string
}

fn decode_ref(bytes: &mut Bytes) -> u32 {
    as_u32_le(bytes.eat_4())
}