1use arrow::array::*;
2use arrow::datatypes::*;
3use arrow::record_batch::RecordBatch;
4use arrow::ipc::writer::StreamWriter;
5use arrow::ipc::reader::StreamReader;
6use arrow::ipc::writer::IpcWriteOptions;
7use std::io::{Write, Seek, SeekFrom};
8use std::collections::HashMap;
9use serde::{Serialize, Deserialize};
10pub use crate::dna::hel::error::HlxError;
11pub use crate::dna::atp::output::OutputConfig;
12pub use crate::dna::atp::output::DataWriter;
13
14pub const HLXC_MAGIC: &[u8; 4] = b"HLXC";
16pub const HLXC_FOOTER_MAGIC: &[u8; 4] = b"\xFF\xFF\xFF\xFF";
17
18pub const HLXC_VERSION: u8 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct HlxcHeader {
24 pub fields: Vec<HlxcField>,
26 #[serde(skip_serializing_if = "Option::is_none")]
28 pub metadata: Option<HashMap<String, serde_json::Value>>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct HlxcField {
33 pub name: String,
34 #[serde(rename = "type")]
35 pub field_type: String,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub description: Option<String>,
38}
39
40impl HlxcHeader {
41 pub fn new(schema: &Schema) -> Self {
42 let fields: Vec<HlxcField> = schema.fields().iter().map(|field| {
43 HlxcField {
44 name: field.name().clone(),
45 field_type: format!("{:?}", field.data_type()).to_lowercase(),
46 description: None,
47 }
48 }).collect();
49
50 Self {
51 fields,
52 metadata: None,
53 }
54 }
55
56 pub fn with_metadata(mut self, metadata: HashMap<String, serde_json::Value>) -> Self {
57 self.metadata = Some(metadata);
58 self
59 }
60
61 pub fn to_json_bytes(&self) -> Result<Vec<u8>, HlxError> {
63 serde_json::to_vec(self).map_err(|e| HlxError::json_error(e.to_string(), ""))
64 }
65
66 pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, HlxError> {
68 serde_json::from_slice(bytes).map_err(|e| HlxError::json_error(e.to_string(), ""))
69 }
70}
71
72pub struct HlxcWriter<W: Write + Seek> {
74 writer: W,
75 schema: Option<Schema>,
76 batches: Vec<RecordBatch>,
77 include_preview: bool,
78 preview_rows: usize,
79}
80
81impl<W: Write + Seek> HlxcWriter<W> {
82 pub fn new(writer: W) -> Self {
83 Self {
84 writer,
85 schema: None,
86 batches: Vec::new(),
87 include_preview: true,
88 preview_rows: 10,
89 }
90 }
91
92 pub fn with_preview(mut self, include: bool, rows: usize) -> Self {
93 self.include_preview = include;
94 self.preview_rows = rows;
95 self
96 }
97
98 pub fn with_schema(mut self, schema: Schema) -> Self {
100 self.schema = Some(schema);
101 self
102 }
103
104 pub fn add_batch(&mut self, batch: RecordBatch) -> Result<(), HlxError> {
106 if self.schema.is_none() {
108 self.schema = Some(batch.schema().as_ref().clone());
109 } else if self.schema.as_ref().unwrap() != batch.schema().as_ref() {
110 return Err(HlxError::validation_error(
111 "Schema mismatch between batches",
112 "All batches must have the same schema"
113 ));
114 }
115
116 self.batches.push(batch);
117 Ok(())
118 }
119
120 pub fn finalize(&mut self) -> Result<(), HlxError> {
122 if self.batches.is_empty() {
123 return Err(HlxError::validation_error(
124 "No data to write",
125 "At least one record batch is required"
126 ));
127 }
128
129 let schema = self.schema.as_ref().unwrap().clone();
130
131 let mut metadata = HashMap::new();
133 metadata.insert("format_version".to_string(),
134 serde_json::Value::String("1.0".to_string()));
135 metadata.insert("created_at".to_string(),
136 serde_json::Value::String(chrono::Utc::now().to_rfc3339()));
137
138 let header = HlxcHeader::new(&schema).with_metadata(metadata);
139
140 self.writer.write_all(HLXC_MAGIC)?;
142
143 self.writer.write_all(&[HLXC_VERSION])?;
145
146 let flags: u8 = 0x00;
148 self.writer.write_all(&[flags])?;
149
150 let header_json = header.to_json_bytes()?;
152 let header_len = header_json.len() as u32;
153 self.writer.write_all(&header_len.to_le_bytes())?;
154 self.writer.write_all(&header_json)?;
155
156 let preview_jsonl = if self.include_preview {
158 self.extract_preview_jsonl()
159 } else {
160 None
161 };
162
163 let options = IpcWriteOptions::default();
165
166 let mut stream_writer = StreamWriter::try_new_with_options(&mut self.writer, &schema, options)
167 .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream writer: {}", e), "Check Arrow IPC format support"))?;
168
169 for batch in &self.batches {
170 stream_writer.write(batch)
171 .map_err(|e| HlxError::io_error(format!("Failed to write Arrow record batch: {}", e), "Check Arrow record batch format"))?;
172 }
173
174 stream_writer.finish()
175 .map_err(|e| HlxError::io_error(format!("Failed to finalize Arrow IPC stream: {}", e), "Check stream finalization"))?;
176
177 if let Some(jsonl) = preview_jsonl {
179 eprintln!("DEBUG: Preview JSONL length: {}", jsonl.len());
180 if !jsonl.is_empty() {
181 eprintln!("DEBUG: Writing footer with {} bytes at position before write", self.writer.stream_position().unwrap_or(0));
182 self.writer.write_all(jsonl.as_bytes())?;
186 eprintln!("DEBUG: Wrote footer content, now at position {}", self.writer.stream_position().unwrap_or(0));
187
188 let footer_len = jsonl.len() as u32;
190 self.writer.write_all(&footer_len.to_le_bytes())?;
191 eprintln!("DEBUG: Wrote footer length: {}, now at position {}", footer_len, self.writer.stream_position().unwrap_or(0));
192
193 self.writer.write_all(HLXC_FOOTER_MAGIC)?;
195 eprintln!("DEBUG: Wrote footer magic, final position {}", self.writer.stream_position().unwrap_or(0));
196 } else {
197 eprintln!("DEBUG: Preview JSONL is empty, not writing footer");
198 }
199 } else {
200 eprintln!("DEBUG: No preview JSONL generated");
201 }
202
203 Ok(())
204 }
205
206 fn extract_preview_jsonl(&self) -> Option<String> {
208 if self.batches.is_empty() {
209 return None;
210 }
211
212 let mut preview_lines = Vec::new();
213 let mut rows_collected = 0;
214
215 for batch in &self.batches {
216 if rows_collected >= self.preview_rows {
217 break;
218 }
219
220 let rows_in_batch = std::cmp::min(self.preview_rows - rows_collected, batch.num_rows());
221
222 for row_idx in 0..rows_in_batch {
223 let mut row_json = serde_json::Map::new();
224
225 for (field_idx, field) in batch.schema().fields().iter().enumerate() {
226 if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<StringArray>() {
227 if array.is_valid(row_idx) {
228 let value = array.value(row_idx);
229 row_json.insert(field.name().clone(),
230 serde_json::Value::String(value.to_string()));
231 } else {
232 row_json.insert(field.name().clone(), serde_json::Value::Null);
233 }
234 } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Float64Array>() {
235 if array.is_valid(row_idx) {
236 let value = array.value(row_idx);
237 row_json.insert(field.name().clone(),
238 serde_json::Value::Number(serde_json::Number::from_f64(value).unwrap_or(serde_json::Number::from(0))));
239 } else {
240 row_json.insert(field.name().clone(), serde_json::Value::Null);
241 }
242 } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Int64Array>() {
243 if array.is_valid(row_idx) {
244 let value = array.value(row_idx);
245 row_json.insert(field.name().clone(),
246 serde_json::Value::Number(serde_json::Number::from(value)));
247 } else {
248 row_json.insert(field.name().clone(), serde_json::Value::Null);
249 }
250 } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<BooleanArray>() {
251 if array.is_valid(row_idx) {
252 let value = array.value(row_idx);
253 row_json.insert(field.name().clone(),
254 serde_json::Value::Bool(value));
255 } else {
256 row_json.insert(field.name().clone(), serde_json::Value::Null);
257 }
258 } else {
259 if batch.column(field_idx).is_valid(row_idx) {
261 let value_str = format!("{:?}", batch.column(field_idx));
262 row_json.insert(field.name().clone(),
263 serde_json::Value::String(value_str));
264 } else {
265 row_json.insert(field.name().clone(), serde_json::Value::Null);
266 }
267 }
268 }
269
270 let row_json_value = serde_json::Value::Object(row_json);
271 if let Ok(json_str) = serde_json::to_string(&row_json_value) {
272 preview_lines.push(json_str);
273 }
274
275 rows_collected += 1;
276 if rows_collected >= self.preview_rows {
277 break;
278 }
279 }
280 }
281
282 if preview_lines.is_empty() {
283 None
284 } else {
285 Some(preview_lines.join("\n"))
286 }
287 }
288
289}
290
291pub struct HlxcReader<R: std::io::Read + Seek> {
293 reader: R,
294}
295
296impl<R: std::io::Read + Seek> HlxcReader<R> {
297 pub fn new(reader: R) -> Self {
298 Self { reader }
299 }
300
301 pub fn read_header(&mut self) -> Result<HlxcHeader, HlxError> {
303 let mut magic = [0u8; 4];
305 self.reader.read_exact(&mut magic)?;
306 if magic != *HLXC_MAGIC {
307 return Err(HlxError::validation_error(
308 "Invalid HLXC magic number",
309 "File does not appear to be a valid HLXC file"
310 ));
311 }
312
313 let mut version = [0u8; 1];
315 self.reader.read_exact(&mut version)?;
316 if version[0] != HLXC_VERSION {
317 return Err(HlxError::validation_error(
318 format!("Unsupported HLXC version: {}", version[0]),
319 "Only version 1 is supported"
320 ));
321 }
322
323 let mut flags = [0u8; 1];
325 self.reader.read_exact(&mut flags)?;
326
327 let mut header_len_bytes = [0u8; 4];
329 self.reader.read_exact(&mut header_len_bytes)?;
330 let header_len = u32::from_le_bytes(header_len_bytes) as usize;
331
332 let mut header_bytes = vec![0u8; header_len];
334 self.reader.read_exact(&mut header_bytes)?;
335 let header: HlxcHeader = HlxcHeader::from_json_bytes(&header_bytes)?;
336
337 Ok(header)
338 }
339
340 pub fn get_preview(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
342 self.read_footer()
344 }
345
346 fn read_footer(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
348 let current_pos = self.reader.stream_position()?;
350 eprintln!("DEBUG Reader: Current position before seeking: {}", current_pos);
351 let file_size = self.reader.seek(SeekFrom::End(0))?;
352 eprintln!("DEBUG Reader: File size: {}", file_size);
353 if file_size < 8 {
354 return Ok(None); }
356
357 self.reader.seek(SeekFrom::End(-8))?;
362 let pos_after_seek = self.reader.stream_position()?;
363 eprintln!("DEBUG Reader: Position after seeking to -8: {}", pos_after_seek);
364 let mut footer_header = [0u8; 8];
365 self.reader.read_exact(&mut footer_header)?;
366
367 let magic = &footer_header[4..8];
368 eprintln!("DEBUG Reader: Footer magic: {:?}", magic);
369 eprintln!("DEBUG Reader: Expected magic: {:?}", *HLXC_FOOTER_MAGIC);
370 if magic != *HLXC_FOOTER_MAGIC {
371 eprintln!("DEBUG Reader: Magic mismatch, no footer found");
372 return Ok(None); }
374
375 let footer_len = u32::from_le_bytes(footer_header[0..4].try_into().unwrap()) as usize;
376 eprintln!("DEBUG Reader: Footer length: {}", footer_len);
377
378 let content_start = file_size - 8 - footer_len as u64;
381 eprintln!("DEBUG Reader: Content should start at: {}", content_start);
382 self.reader.seek(SeekFrom::Start(content_start))?;
383 let mut footer_bytes = vec![0u8; footer_len];
384 self.reader.read_exact(&mut footer_bytes)?;
385
386 let footer_jsonl = String::from_utf8(footer_bytes)
387 .map_err(|_| HlxError::validation_error("Invalid UTF-8 in footer", ""))?;
388
389 let rows: Vec<serde_json::Value> = footer_jsonl
390 .lines()
391 .filter(|line| !line.trim().is_empty())
392 .map(|line| serde_json::from_str(line).unwrap_or(serde_json::Value::Null))
393 .collect();
394
395 Ok(Some(rows))
396 }
397
398 pub fn is_compressed(&mut self) -> Result<bool, HlxError> {
400 Ok(false)
402 }
403
404 pub fn read_batches(&mut self) -> Result<Vec<RecordBatch>, HlxError> {
406 self.read_header()?;
408
409 let reader = StreamReader::try_new(&mut self.reader, Default::default())
411 .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream reader: {}", e), "Check Arrow IPC stream format"))?;
412
413 let mut batches = Vec::new();
415 for batch_result in reader {
416 let batch = batch_result
417 .map_err(|e| HlxError::io_error(format!("Failed to read Arrow record batch: {}", e), "Check Arrow IPC data integrity"))?;
418 batches.push(batch);
419 }
420
421 Ok(batches)
422 }
423}
424
425#[cfg(test)]
426mod tests {
427
428 use std::io::Cursor;
429 use arrow::array::StringArray;
430
431 #[test]
432 fn test_hlxc_header_serialization() {
433 let schema = create_arrow_schema(vec![
434 ("name", DataType::Utf8),
435 ("age", DataType::Int64),
436 ]);
437
438 let header = HlxcHeader::new(&schema);
439
440 let json_bytes = header.to_json_bytes().unwrap();
441 let deserialized: HlxcHeader = HlxcHeader::from_json_bytes(&json_bytes).unwrap();
442
443 assert_eq!(header.fields.len(), deserialized.fields.len());
444 assert_eq!(header.fields[0].name, "name");
445 assert_eq!(header.fields[0].field_type, "utf8");
446 }
447
448 #[test]
449 fn test_hlxc_writer_basic() {
450 let buffer = Vec::new();
451 let cursor = Cursor::new(buffer);
452 let writer = HlxcWriter::new(cursor);
453
454 assert!(writer.schema.is_none());
456 }
457
458 #[test]
459 fn test_hlxc_reader_magic() {
460 let mut buffer = Vec::new();
461 buffer.extend_from_slice(HLXC_MAGIC);
462 buffer.push(HLXC_VERSION);
463 buffer.push(0x00); let header = HlxcHeader::new(&create_arrow_schema(vec![("test", DataType::Utf8)]));
466 let header_json = header.to_json_bytes().unwrap();
467 let header_len = header_json.len() as u32;
468 buffer.extend_from_slice(&header_len.to_le_bytes());
469 buffer.extend_from_slice(&header_json);
470
471 let cursor = Cursor::new(buffer);
472 let mut reader = HlxcReader::new(cursor);
473 let read_header = reader.read_header().unwrap();
474
475 assert_eq!(read_header.fields.len(), 1);
476 assert_eq!(read_header.fields[0].name, "test");
477 }
478}
479
480pub fn create_arrow_schema(fields: Vec<(&str, DataType)>) -> Schema {
482 let arrow_fields: Vec<Field> = fields.into_iter()
483 .map(|(name, data_type)| Field::new(name, data_type, true))
484 .collect();
485
486 Schema::new(arrow_fields)
487}
488
489pub struct HlxcDataWriter {
491 writer: Option<HlxcWriter<std::fs::File>>,
492 config: OutputConfig,
493 batch_count: usize,
494}
495
496impl HlxcDataWriter {
497 pub fn new(config: OutputConfig) -> Self {
498 Self {
499 writer: None,
500 config,
501 batch_count: 0,
502 }
503 }
504
505 fn ensure_writer(&mut self) -> Result<(), HlxError> {
506 if self.writer.is_none() {
507 let filename = format!("output_{:04}.hlxc", self.batch_count);
508 let filepath = self.config.output_dir.join(filename);
509 std::fs::create_dir_all(&self.config.output_dir)?;
510
511 let file = std::fs::File::create(filepath)?;
512 let writer = HlxcWriter::new(file)
513 .with_preview(self.config.include_preview, self.config.preview_rows);
514
515 self.writer = Some(writer);
516 }
517 Ok(())
518 }
519}
520
521impl DataWriter for HlxcDataWriter {
522 fn write_batch(&mut self, batch: RecordBatch) -> Result<(), HlxError> {
523 self.ensure_writer()?;
524
525 if let Some(writer) = &mut self.writer {
526 writer.add_batch(batch)?;
527 }
528
529 Ok(())
530 }
531
532 fn finalize(&mut self) -> Result<(), HlxError> {
533 if let Some(mut writer) = self.writer.take() {
534 writer.finalize()?;
535 self.batch_count += 1;
536 }
537 Ok(())
538 }
539}