poe_data_tools_cli/commands/
dump_tables_csv.rs1use std::{
2 fs::{File, create_dir_all},
3 path::{Path, PathBuf},
4 sync::Arc,
5};
6
7use anyhow::{Context, Result, anyhow, bail, ensure};
8use arrow_array::{
9 ArrayRef, BooleanArray, Float32Array, Int16Array, Int32Array, RecordBatch, StringArray,
10 UInt16Array, UInt32Array, UInt64Array,
11 builder::{
12 Float32Builder, Int16Builder, Int32Builder, ListBuilder, StringBuilder, UInt16Builder,
13 UInt32Builder, UInt64Builder,
14 },
15};
16use arrow_cast::display::{ArrayFormatter, FormatOptions};
17use arrow_csv::Writer;
18use arrow_schema::{DataType, SchemaBuilder};
19use bytes::Bytes;
20use glob::{MatchOptions, Pattern};
21use poe_data_tools::{
22 Patch,
23 dat::ivy_schema::{ColumnSchema, DatTableSchema, fetch_schema, load_schema},
24 file_parsers::{
25 FileParser,
26 dat::{DatParser, types::DatFile},
27 },
28 fs::{FS, FileSystem},
29};
30
31use crate::VERBOSE;
32
33fn parse_foreignrow(bytes: &[u8]) -> u64 {
34 u128::from_le_bytes(bytes.try_into().unwrap()) as u64
37}
38
39fn parse_maybe_foreignrow(bytes: &[u8]) -> Option<u64> {
40 if bytes == [0xfe; 16] {
41 None
42 } else {
43 Some(parse_foreignrow(bytes))
44 }
45}
46
47fn parse_maybe_row(bytes: &[u8]) -> Option<u64> {
48 if bytes == [0xfe; 8] {
49 None
50 } else {
51 Some(parse_u64(bytes))
52 }
53}
54
55fn parse_u64(bytes: &[u8]) -> u64 {
56 u64::from_le_bytes(bytes.try_into().unwrap())
57}
58fn parse_u32(bytes: &[u8]) -> u32 {
59 u32::from_le_bytes(bytes.try_into().unwrap())
60}
61fn parse_i32(bytes: &[u8]) -> i32 {
62 i32::from_le_bytes(bytes.try_into().unwrap())
63}
64fn parse_f32(bytes: &[u8]) -> f32 {
65 f32::from_le_bytes(bytes.try_into().unwrap())
66}
67
68fn parse_u16(bytes: &[u8]) -> u16 {
69 u16::from_le_bytes(bytes.try_into().unwrap())
70}
71fn parse_i16(bytes: &[u8]) -> i16 {
72 i16::from_le_bytes(bytes.try_into().unwrap())
73}
74
75fn parse_bool(bytes: &[u8]) -> Result<bool> {
76 assert!(bytes.len() == 1);
77 ensure!(bytes[0] < 2, "Invalid boolean value: {:?}", bytes[0]);
78
79 Ok(bytes[0] == 1)
80}
81
82fn parse_column(
84 table: &DatFile,
85 column: &ColumnSchema,
86 cur_offset: usize,
87) -> Result<(usize, Result<ArrayRef>)> {
88 let (bytes_taken, series) = match (column.array, column.interval) {
89 (true, false) => {
91 let series = match column.column_type.as_str() {
92 "array" => Err(anyhow!("Unknown array type")),
94
95 "string" => table
96 .view_col_as_array_of_strings(cur_offset)?
97 .collect::<Result<Vec<_>>>()
98 .map(|s| {
99 let mut builder = ListBuilder::new(StringBuilder::new());
100 for row in s {
101 for val in row {
102 builder.values().append_option(val)
103 }
104 builder.append(true);
105 }
106
107 builder.finish()
108 }),
109
110 "foreignrow" => table
111 .view_col_as_array_of(cur_offset, 16, parse_foreignrow)?
112 .collect::<Result<Vec<_>>>()
113 .map(|s| {
114 let mut builder = ListBuilder::new(UInt64Builder::new());
115 for row in s {
116 for val in row {
117 builder.values().append_value(val)
118 }
119 builder.append(true);
120 }
121
122 builder.finish()
123 }),
124
125 "row" => table
126 .view_col_as_array_of(cur_offset, 8, parse_maybe_row)?
127 .collect::<Result<Vec<_>>>()
128 .map(|s| {
129 let mut builder = ListBuilder::new(UInt64Builder::new());
130 for row in s {
131 for val in row {
132 builder.values().append_option(val)
133 }
134 builder.append(true);
135 }
136
137 builder.finish()
138 }),
139
140 "enumrow" => table
141 .view_col_as_array_of(cur_offset, 4, parse_u32)?
142 .collect::<Result<Vec<_>>>()
143 .map(|s| {
144 let mut builder = ListBuilder::new(UInt32Builder::new());
145 for row in s {
146 for val in row {
147 builder.values().append_value(val)
148 }
149 builder.append(true);
150 }
151
152 builder.finish()
153 }),
154
155 "u32" => table
156 .view_col_as_array_of(cur_offset, 4, parse_u32)?
157 .collect::<Result<Vec<_>>>()
158 .map(|s| {
159 let mut builder = ListBuilder::new(UInt32Builder::new());
160 for row in s {
161 for val in row {
162 builder.values().append_value(val)
163 }
164 builder.append(true);
165 }
166
167 builder.finish()
168 }),
169
170 "f32" => table
171 .view_col_as_array_of(cur_offset, 4, parse_f32)?
172 .collect::<Result<Vec<_>>>()
173 .map(|s| {
174 let mut builder = ListBuilder::new(Float32Builder::new());
175 for row in s {
176 for val in row {
177 builder.values().append_value(val)
178 }
179 builder.append(true);
180 }
181
182 builder.finish()
183 }),
184
185 "i32" => table
186 .view_col_as_array_of(cur_offset, 4, parse_i32)?
187 .collect::<Result<Vec<_>>>()
188 .map(|s| {
189 let mut builder = ListBuilder::new(Int32Builder::new());
190 for row in s {
191 for val in row {
192 builder.values().append_value(val)
193 }
194 builder.append(true);
195 }
196
197 builder.finish()
198 }),
199
200 "i16" => table
201 .view_col_as_array_of(cur_offset, 2, parse_i16)?
202 .collect::<Result<Vec<_>>>()
203 .map(|s| {
204 let mut builder = ListBuilder::new(Int16Builder::new());
205 for row in s {
206 for val in row {
207 builder.values().append_value(val)
208 }
209 builder.append(true);
210 }
211
212 builder.finish()
213 }),
214
215 "u16" => table
216 .view_col_as_array_of(cur_offset, 2, parse_u16)?
217 .collect::<Result<Vec<_>>>()
218 .map(|s| {
219 let mut builder = ListBuilder::new(UInt16Builder::new());
220 for row in s {
221 for val in row {
222 builder.values().append_value(val)
223 }
224 builder.append(true);
225 }
226
227 builder.finish()
228 }),
229
230 _ => bail!("Unknown column type: {:?}", column),
231 }
232 .map(|s| Arc::new(s) as _);
233
234 (16, series)
235 }
236
237 (false, true) => match column.column_type.as_str() {
239 "i32" => {
240 let series = table.view_col(cur_offset, 8).map(|values| {
241 let mut builder = ListBuilder::new(Int32Builder::new());
242 values.for_each(|bytes| {
243 bytes
244 .chunks_exact(4)
245 .map(parse_i32)
246 .for_each(|val| builder.values().append_value(val));
247 builder.append(true);
248 });
249
250 Arc::new(builder.finish()) as _
251 });
252
253 (8, series)
254 }
255 _ => bail!("Unknown column type: {:?}", column),
256 },
257
258 (false, false) => match column.column_type.as_str() {
260 "string" => {
261 let series = table
262 .view_col_as_string(cur_offset)
263 .and_then(|strings| strings.collect::<Result<Vec<_>>>())
264 .map(|s| Arc::new(StringArray::from(s)) as _);
266 (8, series)
267 }
268
269 "foreignrow" => {
270 let series = table
271 .view_col(cur_offset, 16)
272 .map(|items| items.map(parse_maybe_foreignrow).collect::<Vec<_>>())
273 .map(|s| Arc::new(UInt64Array::from(s)) as _);
275 (16, series)
276 }
277
278 "row" => {
279 let series = table
280 .view_col(cur_offset, 8)
281 .map(|items| items.map(parse_maybe_row).collect::<Vec<_>>())
282 .map(|s| Arc::new(UInt64Array::from(s)) as _);
284 (8, series)
285 }
286
287 "enumrow" => {
288 let series = table
289 .view_col(cur_offset, 4)
290 .map(|items| items.map(parse_u32).collect::<Vec<_>>())
291 .map(|s| Arc::new(UInt32Array::from(s)) as _);
293 (4, series)
294 }
295
296 "u32" => {
297 let series = table
298 .view_col(cur_offset, 4)
299 .map(|items| items.map(parse_u32).collect::<Vec<_>>())
300 .map(|s| Arc::new(UInt32Array::from(s)) as _);
302 (4, series)
303 }
304
305 "f32" => {
306 let series = table
307 .view_col(cur_offset, 4)
308 .map(|items| items.map(parse_f32).collect::<Vec<_>>())
309 .map(|s| Arc::new(Float32Array::from(s)) as _);
311 (4, series)
312 }
313
314 "i32" => {
315 let series = table
316 .view_col(cur_offset, 4)
317 .map(|items| items.map(parse_i32).collect::<Vec<_>>())
318 .map(|s| Arc::new(Int32Array::from(s)) as _);
320 (4, series)
321 }
322
323 "i16" => {
324 let series = table
325 .view_col(cur_offset, 2)
326 .map(|items| items.map(parse_i16).collect::<Vec<_>>())
327 .map(|s| Arc::new(Int16Array::from(s)) as _);
329 (2, series)
330 }
331
332 "u16" => {
333 let series = table
334 .view_col(cur_offset, 2)
335 .map(|items| items.map(parse_u16).collect::<Vec<_>>())
336 .map(|s| Arc::new(UInt16Array::from(s)) as _);
338 (2, series)
339 }
340
341 "bool" => {
342 let series = table
343 .view_col(cur_offset, 1)
344 .and_then(|items| items.map(parse_bool).collect::<Result<Vec<_>>>())
345 .map(|s| Arc::new(BooleanArray::from(s)) as _);
347 (1, series)
348 }
349
350 _ => bail!("Unknown column type: {:?}", column),
351 },
352 _ => bail!("Can't be both array and interval"),
353 };
354
355 Ok((bytes_taken, series))
356}
357
358pub fn parse_table(table: &DatFile, schema: &DatTableSchema) -> Result<RecordBatch> {
360 let column_names = schema.column_names().collect::<Vec<_>>();
361
362 let mut parsed_columns = vec![];
364 let mut cur_offset = 0;
365 for column in &schema.columns {
366 let (bytes_taken, series) = parse_column(table, column, cur_offset)
370 .with_context(|| format!("Failed to parse column: {:?}", column))?;
371
372 match series {
374 Ok(series) => {
375 log::trace!(
376 "Successfully parsed column at bytes {}-{}: {:?}",
377 cur_offset,
378 cur_offset + bytes_taken,
379 column
380 );
381 parsed_columns.push(series);
382 }
383 Err(e) => {
384 let error_message = if *VERBOSE.get().unwrap() {
385 format!("{e:?}")
386 } else {
387 format!("{e}")
388 };
389 log::error!(
390 "Failed to parse column {:?}, skipping: {error_message}",
391 column.name
392 );
393 }
394 }
395 cur_offset += bytes_taken;
396 }
397
398 let df = RecordBatch::try_from_iter(column_names.into_iter().zip(parsed_columns))
400 .context("Failed to create df")?;
401 Ok(df)
402}
403
404fn save_to_csv(table: &RecordBatch, path: &Path) -> Result<()> {
406 let (schema, mut columns, _) = table.clone().into_parts();
407 let mut schema_builder = SchemaBuilder::from(&*schema);
408
409 columns
411 .iter_mut()
412 .enumerate()
413 .filter(|(_, c)| c.data_type().is_nested())
414 .for_each(|(i, c)| {
415 let stringy_vals = {
417 let options = FormatOptions::default();
418 let formatter =
419 ArrayFormatter::try_new(c, &options).expect("Failed to create table formatter");
420
421 (0..c.len())
422 .map(|i| format!("{}", formatter.value(i)))
423 .collect::<Vec<_>>()
424 };
425
426 *c = Arc::new(StringArray::from(stringy_vals)) as _;
428
429 let field = (**schema_builder.field(i))
430 .clone()
431 .with_data_type(DataType::Utf8);
432
433 *schema_builder.field_mut(i) = Arc::new(field);
434 });
435
436 let schema = Arc::new(schema_builder.finish());
437 let table = RecordBatch::try_new(schema, columns).context("Failed to re-create table")?;
438
439 create_dir_all(path.parent().context("No parent directory")?)
440 .context("Failed to create output dirs")?;
441
442 Writer::new(File::create(path).context("Failed to create output file")?)
443 .write(&table)
444 .context("Failed to write DF to file")
445}
446
447fn process_file(bytes: &Bytes, output_path: &Path, schema: &DatTableSchema) -> Result<()> {
448 let table = DatParser
450 .parse(bytes)
451 .as_anyhow()
452 .context("Failed to parse table data")?;
453
454 ensure!(!table.rows.is_empty(), "Empty table");
455
456 let df = parse_table(&table, schema).context("Failed to apply schema to table")?;
458
459 save_to_csv(&df, output_path).context("Failed to write CSV")?;
461
462 Ok(())
463}
464
465pub fn dump_tables(
467 fs: &mut FS,
468 patterns: &[Pattern],
469 cache_dir: &Path,
470 output_folder: &Path,
471 version: &Patch,
472 schema: Option<impl AsRef<Path>>,
473) -> Result<()> {
474 for pattern in patterns {
475 ensure!(
476 pattern.as_str().ends_with(".datc64"),
477 "Only .datc64 table export is supported."
478 );
479 }
480
481 let version = match version {
482 Patch::One => 1,
483 Patch::Two => 2,
484 _ => bail!("Only patch versions 1/2 supported for table extraction."),
485 };
486
487 let schemas = if let Some(path) = schema {
489 load_schema(path.as_ref()).context("Failed to load schema file")?
490 } else {
491 fetch_schema(cache_dir).context("Failed to fetch schema file")?
492 };
493
494 let filenames = fs
495 .list()
496 .filter(|filename| {
497 patterns.iter().any(|pattern| {
498 pattern.matches_with(
499 filename,
500 MatchOptions {
501 require_literal_separator: true,
502 ..Default::default()
503 },
504 )
505 })
506 })
507 .collect::<Vec<_>>();
508
509 fs.batch_read(&filenames)
510 .filter_map(|(path, res)| match res {
512 Ok(b) => Some((path, b)),
513 Err(e) => {
514 log::error!("Failed to extract file: {:?}: {:?}", path, e);
515 None
516 }
517 })
518 .map(|(filename, contents)| -> Result<_, anyhow::Error> {
520 let schema = schemas
522 .tables
523 .iter()
524 .filter(|t| t.valid_for == version || t.valid_for == 3)
526 .find(|t| {
527 *t.name.to_lowercase() == *PathBuf::from(filename.as_ref()).file_stem().unwrap()
528 })
529 .with_context(|| format!("Couldn't find schema for {:?}", filename))?;
530
531 let output_path = output_folder.join(filename.as_ref()).with_extension("csv");
533 process_file(&contents, &output_path, schema)
534 .with_context(|| format!("Failed to process file: {:?}", filename))?;
535
536 Ok(filename)
537 })
538 .for_each(|result| match result {
540 Ok(filename) => log::info!("Extracted table: {}", filename),
541 Err(e) => {
542 let error_message = if *VERBOSE.get().unwrap() {
543 format!("{e:?}")
544 } else {
545 format!("{e}")
546 };
547 log::error!("Failed to extract table: {error_message}");
548 }
549 });
550
551 Ok(())
552}