python_assembler/formats/pyc/reader/
mod.rs1#![doc = include_str!("readme.md")]
2
3use crate::{
4 formats::pyc::PycReadConfig,
5 program::{PycHeader, PythonCodeObject, PythonObject, PythonProgram, PythonVersion},
6};
7use byteorder::{LittleEndian, ReadBytesExt};
8use gaia_types::{BinaryReader, GaiaDiagnostics, GaiaError};
9use std::{
10 cell::RefCell,
11 io::{Cursor, Read, Seek},
12 sync::OnceLock,
13};
14
15#[allow(dead_code)]
17const TYPE_NULL: u8 = b'0';
18#[allow(dead_code)]
19const TYPE_NONE: u8 = b'N';
20#[allow(dead_code)]
21const TYPE_FALSE: u8 = b'F';
22#[allow(dead_code)]
23const TYPE_TRUE: u8 = b'T';
24#[allow(dead_code)]
25const TYPE_STOPITER: u8 = b'S';
26#[allow(dead_code)]
27const TYPE_ELLIPSIS: u8 = b'.';
28#[allow(dead_code)]
29const TYPE_INT: u8 = b'i';
30#[allow(dead_code)]
31const TYPE_INT64: u8 = b'I';
32#[allow(dead_code)]
33const TYPE_FLOAT: u8 = b'f';
34#[allow(dead_code)]
35const TYPE_BINARY_FLOAT: u8 = b'g';
36#[allow(dead_code)]
37const TYPE_COMPLEX: u8 = b'x';
38#[allow(dead_code)]
39const TYPE_BINARY_COMPLEX: u8 = b'y';
40#[allow(dead_code)]
41const TYPE_LONG: u8 = b'l';
42const TYPE_STRING: u8 = b's';
43const TYPE_INTERNED: u8 = b't';
44#[allow(dead_code)]
45const TYPE_REF: u8 = b'r';
46const TYPE_TUPLE: u8 = b'(';
47const TYPE_LIST: u8 = b'[';
48#[allow(dead_code)]
49const TYPE_DICT: u8 = b'{';
50const TYPE_CODE: u8 = b'c';
51const TYPE_UNICODE: u8 = b'u';
52#[allow(dead_code)]
53const TYPE_UNKNOWN: u8 = b'?';
54#[allow(dead_code)]
55const TYPE_SET: u8 = b'<';
56#[allow(dead_code)]
57const TYPE_FROZENSET: u8 = b'>';
58const TYPE_ASCII: u8 = b'a';
59const TYPE_ASCII_INTERNED: u8 = b'A';
60const TYPE_SMALL_TUPLE: u8 = b')';
61const TYPE_SHORT_ASCII: u8 = b'z';
62const TYPE_SHORT_ASCII_INTERNED: u8 = b'Z';
63
64#[derive(Debug, Clone, Copy)]
66pub struct PycInfo {
67 pub header: PycHeader,
69 pub version: PythonVersion,
71}
72
73#[derive(Debug)]
75pub struct PycReader<'config, R> {
76 config: &'config PycReadConfig,
77 reader: RefCell<BinaryReader<R, LittleEndian>>,
78 info: OnceLock<PycInfo>,
79 program: OnceLock<PythonProgram>,
80}
81
82impl PycReadConfig {
83 pub fn as_reader<R: Read + Seek>(&self, reader: R) -> PycReader<'_, R> {
85 PycReader::new(reader, self)
86 }
87}
88
89impl<'config, R> PycReader<'config, R> {
90 pub fn new(reader: R, config: &'config PycReadConfig) -> Self {
92 Self { config, reader: RefCell::new(BinaryReader::new(reader)), info: Default::default(), program: Default::default() }
93 }
94
95 pub fn finish(self) -> GaiaDiagnostics<PythonProgram>
97 where
98 R: Read + Seek,
99 {
100 match self.get_program() {
101 Ok(program) => {
102 let errors = self.reader.borrow_mut().take_errors();
103 GaiaDiagnostics { result: Ok(program.clone()), diagnostics: errors }
104 }
105 Err(e) => {
106 let errors = self.reader.borrow_mut().take_errors();
107 GaiaDiagnostics { result: Err(e), diagnostics: errors }
108 }
109 }
110 }
111}
112
113impl<'config, R: Read + Seek> PycReader<'config, R> {
114 pub fn get_program(&self) -> Result<&PythonProgram, GaiaError> {
116 Ok(self.program.get_or_init(|| self.read_program().unwrap_or_else(|_| PythonProgram::default())))
117 }
118
119 pub fn get_info(&self) -> Result<&PycInfo, GaiaError> {
121 Ok(self.info.get_or_init(|| {
122 self.read_info().unwrap_or_else(|_| PycInfo { header: PycHeader::default(), version: PythonVersion::Unknown })
123 }))
124 }
125
126 fn read_info(&self) -> Result<PycInfo, GaiaError> {
127 let mut reader = self.reader.borrow_mut();
128
129 reader.seek(std::io::SeekFrom::Start(0))?;
131
132 let header = self.read_header(&mut reader)?;
134
135 let version = if self.config.version != PythonVersion::Unknown {
137 self.config.version
138 }
139 else {
140 PythonVersion::from_magic(header.magic)
141 };
142
143 Ok(PycInfo { header, version })
144 }
145
146 fn read_program(&self) -> Result<PythonProgram, GaiaError> {
147 let mut reader = self.reader.borrow_mut();
148
149 reader.seek(std::io::SeekFrom::Start(0))?;
151
152 let header = self.read_header(&mut reader)?;
154
155 let version = if self.config.version != PythonVersion::Unknown {
157 self.config.version
158 }
159 else {
160 PythonVersion::from_magic(header.magic)
161 };
162
163 let mut code_object_bytes = Vec::new();
165 reader.read_to_end(&mut code_object_bytes)?;
166
167 let code_object = if !code_object_bytes.is_empty() {
169 self.parse_code_object(&code_object_bytes)?
170 }
171 else {
172 PythonCodeObject::default()
173 };
174
175 let program = PythonProgram { header, code_object, version };
177
178 Ok(program)
179 }
180
181 fn read_header(&self, reader: &mut BinaryReader<R, LittleEndian>) -> Result<PycHeader, GaiaError> {
182 let mut magic = [0u8; 4];
183 reader.read_exact(&mut magic)?;
184
185 let flags = reader.read_u32()?;
186 let timestamp = reader.read_u32()?;
187 let size = reader.read_u32()?;
188
189 Ok(PycHeader { magic, flags, timestamp, size })
190 }
191
192 fn parse_code_object(&self, data: &[u8]) -> Result<PythonCodeObject, GaiaError> {
194 let mut cursor = Cursor::new(data);
195
196 let type_byte = cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read type byte".to_string()))?;
198 if (type_byte & 0x7F) != TYPE_CODE {
199 return Err(GaiaError::custom_error(format!("Expected code object, got type {}", type_byte)));
200 }
201
202 let argcount =
204 cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read argcount".to_string()))?;
205 let posonlyargcount = cursor
206 .read_u32::<LittleEndian>()
207 .map_err(|_| GaiaError::custom_error("Failed to read posonlyargcount".to_string()))?;
208 let kwonlyargcount = cursor
209 .read_u32::<LittleEndian>()
210 .map_err(|_| GaiaError::custom_error("Failed to read kwonlyargcount".to_string()))?;
211 let stacksize =
212 cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read stacksize".to_string()))?;
213 let flags =
214 cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read flags".to_string()))?;
215
216 let code_obj = self.parse_object(&mut cursor)?;
218 let code_bytes = match code_obj {
219 PythonObject::Bytes(b) => b,
220 _ => return Err(GaiaError::custom_error("Expected bytes for code".to_string())),
221 };
222
223 let constants_obj = self.parse_object(&mut cursor)?;
225 let constants = match constants_obj {
226 PythonObject::Tuple(items) | PythonObject::List(items) => items,
227 _ => vec![constants_obj],
228 };
229
230 let names_obj = self.parse_object(&mut cursor)?;
232 let names = self.extract_string_list(names_obj)?;
233
234 let localsplusnames_obj = self.parse_object(&mut cursor)?;
236 let localsplusnames = self.extract_string_list(localsplusnames_obj)?;
237
238 let localspluskinds_obj = self.parse_object(&mut cursor)?;
240 let localspluskinds = match localspluskinds_obj {
241 PythonObject::Bytes(b) => b,
242 _ => Vec::new(),
243 };
244
245 let filename_obj = self.parse_object(&mut cursor)?;
247 let filename = match filename_obj {
248 PythonObject::String(s) | PythonObject::Str(s) => s,
249 _ => String::new(),
250 };
251
252 let name_obj = self.parse_object(&mut cursor)?;
254 let name = match name_obj {
255 PythonObject::String(s) | PythonObject::Str(s) => s,
256 _ => String::new(),
257 };
258
259 let qualname_obj = self.parse_object(&mut cursor)?;
261 let qualname = match qualname_obj {
262 PythonObject::String(s) | PythonObject::Str(s) => s,
263 _ => String::new(),
264 };
265
266 let firstlineno =
267 cursor.read_u32::<LittleEndian>().map_err(|_| GaiaError::custom_error("Failed to read firstlineno".to_string()))?;
268
269 let lnotab_obj = self.parse_object(&mut cursor)?;
271 let lnotab = match lnotab_obj {
272 PythonObject::Bytes(b) => b,
273 _ => Vec::new(),
274 };
275
276 let exceptiontable_obj = self.parse_object(&mut cursor)?;
278 let exceptiontable = match exceptiontable_obj {
279 PythonObject::Bytes(b) => b,
280 _ => Vec::new(),
281 };
282
283 let instructions = Vec::new();
285
286 Ok(PythonCodeObject {
287 name,
288 qualname,
289 source_name: filename,
290 first_line: firstlineno,
291 last_line: firstlineno, co_argcount: argcount as u8,
293 co_posonlyargcount: posonlyargcount as u8,
294 co_kwonlyargcount: kwonlyargcount as u8,
295 co_nlocals: localsplusnames.len() as u8, co_stacksize: stacksize as u8,
297 co_flags: flags,
298 co_code: instructions,
299 co_consts: constants,
300 co_names: names,
301 co_localsplusnames: localsplusnames,
302 co_localspluskinds: localspluskinds,
303 co_linetable: lnotab,
304 co_exceptiontable: exceptiontable,
305 })
306 }
307
308 fn parse_object(&self, cursor: &mut Cursor<&[u8]>) -> Result<PythonObject, GaiaError> {
309 let type_byte = cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read marshal type".to_string()))?;
310 self.parse_object_with_type(cursor, type_byte & 0x7F)
312 }
313
314 fn parse_object_with_type(&self, cursor: &mut Cursor<&[u8]>, type_byte: u8) -> Result<PythonObject, GaiaError> {
315 match type_byte {
316 TYPE_NONE => Ok(PythonObject::None),
317 TYPE_TRUE => Ok(PythonObject::Bool(true)),
318 TYPE_FALSE => Ok(PythonObject::Bool(false)),
319 TYPE_INT => {
320 let value = cursor
321 .read_i32::<LittleEndian>()
322 .map_err(|_| GaiaError::custom_error("Failed to read integer".to_string()))?;
323 Ok(PythonObject::Int(value))
324 }
325 TYPE_INT64 => {
326 let value = cursor
327 .read_i64::<LittleEndian>()
328 .map_err(|_| GaiaError::custom_error("Failed to read int64".to_string()))?;
329 Ok(PythonObject::Integer(value))
330 }
331 TYPE_STRING | TYPE_INTERNED => {
332 let length = cursor
333 .read_u32::<LittleEndian>()
334 .map_err(|_| GaiaError::custom_error("Failed to read string length".to_string()))?;
335 let mut buffer = vec![0u8; length as usize];
336 cursor
337 .read_exact(&mut buffer)
338 .map_err(|_| GaiaError::custom_error("Failed to read string data".to_string()))?;
339 Ok(PythonObject::Bytes(buffer))
340 }
341 TYPE_UNICODE | TYPE_ASCII | TYPE_ASCII_INTERNED => {
342 let length = cursor
343 .read_u32::<LittleEndian>()
344 .map_err(|_| GaiaError::custom_error("Failed to read unicode length".to_string()))?;
345 let mut buffer = vec![0u8; length as usize];
346 cursor
347 .read_exact(&mut buffer)
348 .map_err(|_| GaiaError::custom_error("Failed to read unicode data".to_string()))?;
349 let string = String::from_utf8_lossy(&buffer).to_string();
350 Ok(PythonObject::Str(string))
351 }
352 TYPE_SHORT_ASCII | TYPE_SHORT_ASCII_INTERNED => {
353 let length =
354 cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read short ascii length".to_string()))?;
355 let mut buffer = vec![0u8; length as usize];
356 cursor
357 .read_exact(&mut buffer)
358 .map_err(|_| GaiaError::custom_error("Failed to read short ascii data".to_string()))?;
359 let string = String::from_utf8_lossy(&buffer).to_string();
360 Ok(PythonObject::Str(string))
361 }
362 TYPE_TUPLE | TYPE_SMALL_TUPLE => {
363 let length = if type_byte == TYPE_SMALL_TUPLE {
364 cursor.read_u8().map_err(|_| GaiaError::custom_error("Failed to read small tuple length".to_string()))?
365 as u32
366 }
367 else {
368 cursor
369 .read_u32::<LittleEndian>()
370 .map_err(|_| GaiaError::custom_error("Failed to read tuple length".to_string()))?
371 };
372
373 let mut items = Vec::new();
374 for _ in 0..length {
375 items.push(self.parse_object(cursor)?);
376 }
377 Ok(PythonObject::Tuple(items))
378 }
379 TYPE_LIST => {
380 let length = cursor
381 .read_u32::<LittleEndian>()
382 .map_err(|_| GaiaError::custom_error("Failed to read list length".to_string()))?;
383
384 let mut items = Vec::new();
385 for _ in 0..length {
386 items.push(self.parse_object(cursor)?);
387 }
388 Ok(PythonObject::List(items))
389 }
390 _ => {
391 Ok(PythonObject::None)
393 }
394 }
395 }
396
397 fn extract_string_list(&self, obj: PythonObject) -> Result<Vec<String>, GaiaError> {
398 match obj {
399 PythonObject::Tuple(items) | PythonObject::List(items) => {
400 let mut strings = Vec::new();
401 for item in items {
402 match item {
403 PythonObject::String(s) | PythonObject::Str(s) => strings.push(s),
404 _ => strings.push(String::new()),
405 }
406 }
407 Ok(strings)
408 }
409 _ => Ok(Vec::new()),
410 }
411 }
412}