Skip to main content

python_assembler/formats/pyc/reader/
mod.rs

1#![doc = include_str!("readme.md")]
2
3use crate::{
4    formats::pyc::PycReadConfig,
5    program::{PycHeader, PythonCodeObject, PythonObject, PythonProgram, PythonVersion},
6};
7use byteorder::{LittleEndian, ReadBytesExt};
8use gaia_types::{BinaryReader, GaiaDiagnostics, GaiaError};
9use std::{
10    cell::RefCell,
11    io::{Cursor, Read, Seek},
12    sync::OnceLock,
13};
14
15// Marshal 类型常量
16#[allow(dead_code)]
17const TYPE_NULL: u8 = b'0';
18#[allow(dead_code)]
19const TYPE_NONE: u8 = b'N';
20#[allow(dead_code)]
21const TYPE_FALSE: u8 = b'F';
22#[allow(dead_code)]
23const TYPE_TRUE: u8 = b'T';
24#[allow(dead_code)]
25const TYPE_STOPITER: u8 = b'S';
26#[allow(dead_code)]
27const TYPE_ELLIPSIS: u8 = b'.';
28#[allow(dead_code)]
29const TYPE_INT: u8 = b'i';
30#[allow(dead_code)]
31const TYPE_INT64: u8 = b'I';
32#[allow(dead_code)]
33const TYPE_FLOAT: u8 = b'f';
34#[allow(dead_code)]
35const TYPE_BINARY_FLOAT: u8 = b'g';
36#[allow(dead_code)]
37const TYPE_COMPLEX: u8 = b'x';
38#[allow(dead_code)]
39const TYPE_BINARY_COMPLEX: u8 = b'y';
40#[allow(dead_code)]
41const TYPE_LONG: u8 = b'l';
42const TYPE_STRING: u8 = b's';
43const TYPE_INTERNED: u8 = b't';
44#[allow(dead_code)]
45const TYPE_REF: u8 = b'r';
46const TYPE_TUPLE: u8 = b'(';
47const TYPE_LIST: u8 = b'[';
48#[allow(dead_code)]
49const TYPE_DICT: u8 = b'{';
50const TYPE_CODE: u8 = b'c';
51const TYPE_UNICODE: u8 = b'u';
52#[allow(dead_code)]
53const TYPE_UNKNOWN: u8 = b'?';
54#[allow(dead_code)]
55const TYPE_SET: u8 = b'<';
56#[allow(dead_code)]
57const TYPE_FROZENSET: u8 = b'>';
58const TYPE_ASCII: u8 = b'a';
59const TYPE_ASCII_INTERNED: u8 = b'A';
60const TYPE_SMALL_TUPLE: u8 = b')';
61const TYPE_SHORT_ASCII: u8 = b'z';
62const TYPE_SHORT_ASCII_INTERNED: u8 = b'Z';
63
64/// PycInfo 表示 .pyc 文件的基本信息视图
65#[derive(Debug, Clone, Copy)]
66pub struct PycInfo {
67    /// .pyc 文件头信息
68    pub header: PycHeader,
69    /// Python 版本信息
70    pub version: PythonVersion,
71}
72
73/// 现代化的惰性 .pyc 文件读取器
74#[derive(Debug)]
75pub struct PycReader<'config, R> {
76    config: &'config PycReadConfig,
77    reader: RefCell<BinaryReader<R, LittleEndian>>,
78    info: OnceLock<PycInfo>,
79    program: OnceLock<PythonProgram>,
80}
81
82impl PycReadConfig {
83    /// 创建一个新的 PycReader 实例
84    pub fn as_reader<R: Read + Seek>(&self, reader: R) -> PycReader<'_, R> {
85        PycReader::new(reader, self)
86    }
87}
88
89impl<'config, R> PycReader<'config, R> {
90    /// 创建一个新的 PycReader 实例
91    pub fn new(reader: R, config: &'config PycReadConfig) -> Self {
92        Self { config, reader: RefCell::new(BinaryReader::new(reader)), info: Default::default(), program: Default::default() }
93    }
94
95    /// 完成读取并返回 PythonProgram 结果
96    pub fn finish(self) -> GaiaDiagnostics<PythonProgram>
97    where
98        R: Read + Seek,
99    {
100        match self.get_program() {
101            Ok(program) => {
102                let errors = self.reader.borrow_mut().take_errors();
103                GaiaDiagnostics { result: Ok(program.clone()), diagnostics: errors }
104            }
105            Err(e) => {
106                let errors = self.reader.borrow_mut().take_errors();
107                GaiaDiagnostics { result: Err(e), diagnostics: errors }
108            }
109        }
110    }
111}
112
113impl<'config, R: Read + Seek> PycReader<'config, R> {
114    /// 获取解析后的 PythonProgram
115    pub fn get_program(&self) -> Result<&PythonProgram, GaiaError> {
116        Ok(self.program.get_or_init(|| self.read_program().unwrap_or_else(|_| PythonProgram::default())))
117    }
118
119    /// 获取 .pyc 文件的基本信息
120    pub fn get_info(&self) -> Result<&PycInfo, GaiaError> {
121        Ok(self.info.get_or_init(|| {
122            self.read_info().unwrap_or_else(|_| PycInfo { header: PycHeader::default(), version: PythonVersion::Unknown })
123        }))
124    }
125
126    fn read_info(&self) -> Result<PycInfo, GaiaError> {
127        let mut reader = self.reader.borrow_mut();
128
129        // 重新定位到文件开头
130        reader.seek(std::io::SeekFrom::Start(0))?;
131
132        // 读取 .pyc 文件头
133        let header = self.read_header(&mut reader)?;
134
135        // 从配置或头部确定版本
136        let version = if self.config.version != PythonVersion::Unknown {
137            self.config.version
138        }
139        else {
140            PythonVersion::from_magic(header.magic)
141        };
142
143        Ok(PycInfo { header, version })
144    }
145
146    fn read_program(&self) -> Result<PythonProgram, GaiaError> {
147        let mut reader = self.reader.borrow_mut();
148
149        // 重新定位到文件开头
150        reader.seek(std::io::SeekFrom::Start(0))?;
151
152        // 读取头部信息
153        let header = self.read_header(&mut reader)?;
154
155        // 确定 Python 版本
156        let version = if self.config.version != PythonVersion::Unknown {
157            self.config.version
158        }
159        else {
160            PythonVersion::from_magic(header.magic)
161        };
162
163        // 读取 marshal 数据
164        let mut code_object_bytes = Vec::new();
165        reader.read_to_end(&mut code_object_bytes)?;
166
167        // 解析 marshal 数据
168        let code_object = if !code_object_bytes.is_empty() {
169            self.parse_code_object(&code_object_bytes)?
170        }
171        else {
172            PythonCodeObject::default()
173        };
174
175        // 构建 PythonProgram
176        let program = PythonProgram { header, code_object, version };
177
178        Ok(program)
179    }
180
181    fn read_header(&self, reader: &mut BinaryReader<R, LittleEndian>) -> Result<PycHeader, GaiaError> {
182        let mut magic = [0u8; 4];
183        reader.read_exact(&mut magic)?;
184
185        let flags = reader.read_u32()?;
186        let timestamp = reader.read_u32()?;
187        let size = reader.read_u32()?;
188
189        Ok(PycHeader { magic, flags, timestamp, size })
190    }
191
192    // 集成的 marshal 解析功能
193    fn parse_code_object(&self, data: &[u8]) -> Result<PythonCodeObject, GaiaError> {
194        let mut cursor = Cursor::new(data);
195
196        // 首先检查是否是 CODE 类型
197        let type_byte = cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read type byte".to_string()))?;
198        if (type_byte & 0x7F) != TYPE_CODE {
199            return Err(GaiaError::custom_error(format!("Expected code object, got type {}", type_byte)));
200        }
201
202        // Python 3.11+ marshal format
203        let argcount =
204            cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read argcount".to_string()))?;
205        let posonlyargcount = cursor
206            .read_u32::<LittleEndian>()
207            .map_err(|_| GaiaError::custom_error("Failed to read posonlyargcount".to_string()))?;
208        let kwonlyargcount = cursor
209            .read_u32::<LittleEndian>()
210            .map_err(|_| GaiaError::custom_error("Failed to read kwonlyargcount".to_string()))?;
211        let stacksize =
212            cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read stacksize".to_string()))?;
213        let flags =
214            cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read flags".to_string()))?;
215
216        // co_code
217        let code_obj = self.parse_object(&mut cursor)?;
218        let code_bytes = match code_obj {
219            PythonObject::Bytes(b) => b,
220            _ => return Err(GaiaError::custom_error("Expected bytes for code".to_string())),
221        };
222
223        // co_consts
224        let constants_obj = self.parse_object(&mut cursor)?;
225        let constants = match constants_obj {
226            PythonObject::Tuple(items) | PythonObject::List(items) => items,
227            _ => vec![constants_obj],
228        };
229
230        // co_names
231        let names_obj = self.parse_object(&mut cursor)?;
232        let names = self.extract_string_list(names_obj)?;
233
234        // co_localsplusnames
235        let localsplusnames_obj = self.parse_object(&mut cursor)?;
236        let localsplusnames = self.extract_string_list(localsplusnames_obj)?;
237
238        // co_localspluskinds
239        let localspluskinds_obj = self.parse_object(&mut cursor)?;
240        let localspluskinds = match localspluskinds_obj {
241            PythonObject::Bytes(b) => b,
242            _ => Vec::new(),
243        };
244
245        // co_filename
246        let filename_obj = self.parse_object(&mut cursor)?;
247        let filename = match filename_obj {
248            PythonObject::String(s) | PythonObject::Str(s) => s,
249            _ => String::new(),
250        };
251
252        // co_name
253        let name_obj = self.parse_object(&mut cursor)?;
254        let name = match name_obj {
255            PythonObject::String(s) | PythonObject::Str(s) => s,
256            _ => String::new(),
257        };
258
259        // co_qualname
260        let qualname_obj = self.parse_object(&mut cursor)?;
261        let qualname = match qualname_obj {
262            PythonObject::String(s) | PythonObject::Str(s) => s,
263            _ => String::new(),
264        };
265
266        let firstlineno =
267            cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read firstlineno".to_string()))?;
268
269        // co_linetable
270        let lnotab_obj = self.parse_object(&mut cursor)?;
271        let lnotab = match lnotab_obj {
272            PythonObject::Bytes(b) => b,
273            _ => Vec::new(),
274        };
275
276        // co_exceptiontable
277        let exceptiontable_obj = self.parse_object(&mut cursor)?;
278        let exceptiontable = match exceptiontable_obj {
279            PythonObject::Bytes(b) => b,
280            _ => Vec::new(),
281        };
282
283        // TODO: 将字节码解析为指令
284        let instructions = Vec::new();
285
286        Ok(PythonCodeObject {
287            name,
288            qualname,
289            source_name: filename,
290            first_line: firstlineno,
291            last_line: firstlineno, // TODO: 从 lnotab 计算
292            co_argcount: argcount as u8,
293            co_posonlyargcount: posonlyargcount as u8,
294            co_kwonlyargcount: kwonlyargcount as u8,
295            co_nlocals: localsplusnames.len() as u8, // 估计值
296            co_stacksize: stacksize as u8,
297            co_flags: flags,
298            co_code: instructions,
299            co_consts: constants,
300            co_names: names,
301            co_localsplusnames: localsplusnames,
302            co_localspluskinds: localspluskinds,
303            co_linetable: lnotab,
304            co_exceptiontable: exceptiontable,
305        })
306    }
307
308    fn parse_object(&self, cursor: &mut Cursor<&[u8]>) -> Result<PythonObject, GaiaError> {
309        let type_byte = cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read marshal type".to_string()))?;
310        // 忽略 FLAG_REF (0x80)
311        self.parse_object_with_type(cursor, type_byte & 0x7F)
312    }
313
314    fn parse_object_with_type(&self, cursor: &mut Cursor<&[u8]>, type_byte: u8) -> Result<PythonObject, GaiaError> {
315        match type_byte {
316            TYPE_NONE => Ok(PythonObject::None),
317            TYPE_TRUE => Ok(PythonObject::Bool(true)),
318            TYPE_FALSE => Ok(PythonObject::Bool(false)),
319            TYPE_INT => {
320                let value = cursor
321                    .read_i32::<LittleEndian>()
322                    .map_err(|_| GaiaError::custom_error("Failed to read integer".to_string()))?;
323                Ok(PythonObject::Int(value))
324            }
325            TYPE_INT64 => {
326                let value = cursor
327                    .read_i64::<LittleEndian>()
328                    .map_err(|_| GaiaError::custom_error("Failed to read int64".to_string()))?;
329                Ok(PythonObject::Integer(value))
330            }
331            TYPE_STRING | TYPE_INTERNED => {
332                let length = cursor
333                    .read_u32::<LittleEndian>()
334                    .map_err(|_| GaiaError::custom_error("Failed to read string length".to_string()))?;
335                let mut buffer = vec![0u8; length as usize];
336                cursor
337                    .read_exact(&mut buffer)
338                    .map_err(|_| GaiaError::custom_error("Failed to read string data".to_string()))?;
339                Ok(PythonObject::Bytes(buffer))
340            }
341            TYPE_UNICODE | TYPE_ASCII | TYPE_ASCII_INTERNED => {
342                let length = cursor
343                    .read_u32::<LittleEndian>()
344                    .map_err(|_| GaiaError::custom_error("Failed to read unicode length".to_string()))?;
345                let mut buffer = vec![0u8; length as usize];
346                cursor
347                    .read_exact(&mut buffer)
348                    .map_err(|_| GaiaError::custom_error("Failed to read unicode data".to_string()))?;
349                let string = String::from_utf8_lossy(&buffer).to_string();
350                Ok(PythonObject::Str(string))
351            }
352            TYPE_SHORT_ASCII | TYPE_SHORT_ASCII_INTERNED => {
353                let length =
354                    cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read short ascii length".to_string()))?;
355                let mut buffer = vec![0u8; length as usize];
356                cursor
357                    .read_exact(&mut buffer)
358                    .map_err(|_| GaiaError::custom_error("Failed to read short ascii data".to_string()))?;
359                let string = String::from_utf8_lossy(&buffer).to_string();
360                Ok(PythonObject::Str(string))
361            }
362            TYPE_TUPLE | TYPE_SMALL_TUPLE => {
363                let length = if type_byte == TYPE_SMALL_TUPLE {
364                    cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read small tuple length".to_string()))?
365                        as u32
366                }
367                else {
368                    cursor
369                        .read_u32::<LittleEndian>()
370                        .map_err(|_| GaiaError::custom_error("Failed to read tuple length".to_string()))?
371                };
372
373                let mut items = Vec::new();
374                for _ in 0..length {
375                    items.push(self.parse_object(cursor)?);
376                }
377                Ok(PythonObject::Tuple(items))
378            }
379            TYPE_LIST => {
380                let length = cursor
381                    .read_u32::<LittleEndian>()
382                    .map_err(|_| GaiaError::custom_error("Failed to read list length".to_string()))?;
383
384                let mut items = Vec::new();
385                for _ in 0..length {
386                    items.push(self.parse_object(cursor)?);
387                }
388                Ok(PythonObject::List(items))
389            }
390            _ => {
391                // 对于未知类型,返回 None
392                Ok(PythonObject::None)
393            }
394        }
395    }
396
397    fn extract_string_list(&self, obj: PythonObject) -> Result<Vec<String>, GaiaError> {
398        match obj {
399            PythonObject::Tuple(items) | PythonObject::List(items) => {
400                let mut strings = Vec::new();
401                for item in items {
402                    match item {
403                        PythonObject::String(s) | PythonObject::Str(s) => strings.push(s),
404                        _ => strings.push(String::new()),
405                    }
406                }
407                Ok(strings)
408            }
409            _ => Ok(Vec::new()),
410        }
411    }
412}