helix/dna/out/
hlxc_format.rs

1use arrow::array::*;
2use arrow::datatypes::*;
3use arrow::record_batch::RecordBatch;
4use arrow::ipc::writer::StreamWriter;
5use arrow::ipc::reader::StreamReader;
6use arrow::ipc::writer::IpcWriteOptions;
7use std::io::{Write, Seek, SeekFrom};
8use std::collections::HashMap;
9use serde::{Serialize, Deserialize};
10pub use crate::dna::hel::error::HlxError;
11pub use crate::dna::atp::output::OutputConfig;
12pub use crate::dna::atp::output::DataWriter;
13
14/// HLXC Format Magic Header - exactly as specified
15pub const HLXC_MAGIC: &[u8; 4] = b"HLXC";
16pub const HLXC_FOOTER_MAGIC: &[u8; 4] = b"\xFF\xFF\xFF\xFF";
17
18/// HLXC Format Version
19pub const HLXC_VERSION: u8 = 1;
20
21/// HLXC File Header Structure - simplified JSON schema only
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct HlxcHeader {
24    /// Schema fields as JSON array
25    pub fields: Vec<HlxcField>,
26    /// Optional metadata
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub metadata: Option<HashMap<String, serde_json::Value>>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct HlxcField {
33    pub name: String,
34    #[serde(rename = "type")]
35    pub field_type: String,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub description: Option<String>,
38}
39
40impl HlxcHeader {
41    pub fn new(schema: &Schema) -> Self {
42        let fields: Vec<HlxcField> = schema.fields().iter().map(|field| {
43            HlxcField {
44                name: field.name().clone(),
45                field_type: format!("{:?}", field.data_type()).to_lowercase(),
46                description: None,
47            }
48        }).collect();
49
50        Self {
51            fields,
52            metadata: None,
53        }
54    }
55
56    pub fn with_metadata(mut self, metadata: HashMap<String, serde_json::Value>) -> Self {
57        self.metadata = Some(metadata);
58        self
59    }
60
61    /// Serialize header to JSON bytes
62    pub fn to_json_bytes(&self) -> Result<Vec<u8>, HlxError> {
63        serde_json::to_vec(self).map_err(|e| HlxError::json_error(e.to_string(), ""))
64    }
65
66    /// Deserialize header from JSON bytes
67    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, HlxError> {
68        serde_json::from_slice(bytes).map_err(|e| HlxError::json_error(e.to_string(), ""))
69    }
70}
71
72/// HLXC Format Writer - implements the exact specification
73pub struct HlxcWriter<W: Write + Seek> {
74    writer: W,
75    schema: Option<Schema>,
76    batches: Vec<RecordBatch>,
77    include_preview: bool,
78    preview_rows: usize,
79}
80
81impl<W: Write + Seek> HlxcWriter<W> {
82    pub fn new(writer: W) -> Self {
83        Self {
84            writer,
85            schema: None,
86            batches: Vec::new(),
87            include_preview: true,
88            preview_rows: 10,
89        }
90    }
91
92    pub fn with_preview(mut self, include: bool, rows: usize) -> Self {
93        self.include_preview = include;
94        self.preview_rows = rows;
95        self
96    }
97
98    /// Set the schema for the output
99    pub fn with_schema(mut self, schema: Schema) -> Self {
100        self.schema = Some(schema);
101        self
102    }
103
104    /// Add a record batch to the output
105    pub fn add_batch(&mut self, batch: RecordBatch) -> Result<(), HlxError> {
106        // Update schema if not set
107        if self.schema.is_none() {
108            self.schema = Some(batch.schema().as_ref().clone());
109        } else if self.schema.as_ref().unwrap() != batch.schema().as_ref() {
110            return Err(HlxError::validation_error(
111                "Schema mismatch between batches",
112                "All batches must have the same schema"
113            ));
114        }
115
116        self.batches.push(batch);
117        Ok(())
118    }
119
120    /// Finalize and write the HLXC file with exact specification format
121    pub fn finalize(&mut self) -> Result<(), HlxError> {
122        if self.batches.is_empty() {
123            return Err(HlxError::validation_error(
124                "No data to write",
125                "At least one record batch is required"
126            ));
127        }
128
129        let schema = self.schema.as_ref().unwrap().clone();
130
131        // Create header with schema
132        let mut metadata = HashMap::new();
133        metadata.insert("format_version".to_string(),
134                       serde_json::Value::String("1.0".to_string()));
135        metadata.insert("created_at".to_string(),
136                       serde_json::Value::String(chrono::Utc::now().to_rfc3339()));
137
138        let header = HlxcHeader::new(&schema).with_metadata(metadata);
139
140        // Write magic number "HLX"
141        self.writer.write_all(HLXC_MAGIC)?;
142
143        // Write version byte (0x01)
144        self.writer.write_all(&[HLXC_VERSION])?;
145
146        // Write flags byte (currently unused, set to 0)
147        let flags: u8 = 0x00;
148        self.writer.write_all(&[flags])?;
149
150        // Write header JSON
151        let header_json = header.to_json_bytes()?;
152        let header_len = header_json.len() as u32;
153        self.writer.write_all(&header_len.to_le_bytes())?;
154        self.writer.write_all(&header_json)?;
155
156        // Collect all preview rows if needed
157        let preview_jsonl = if self.include_preview {
158            self.extract_preview_jsonl()
159        } else {
160            None
161        };
162
163        // Write data as Arrow IPC stream
164        let options = IpcWriteOptions::default();
165
166        let mut stream_writer = StreamWriter::try_new_with_options(&mut self.writer, &schema, options)
167            .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream writer: {}", e), "Check Arrow IPC format support"))?;
168
169        for batch in &self.batches {
170            stream_writer.write(batch)
171                .map_err(|e| HlxError::io_error(format!("Failed to write Arrow record batch: {}", e), "Check Arrow record batch format"))?;
172        }
173
174        stream_writer.finish()
175            .map_err(|e| HlxError::io_error(format!("Failed to finalize Arrow IPC stream: {}", e), "Check stream finalization"))?;
176
177        // Write preview footer if we have preview rows
178        if let Some(jsonl) = preview_jsonl {
179            eprintln!("DEBUG: Preview JSONL length: {}", jsonl.len());
180            if !jsonl.is_empty() {
181                eprintln!("DEBUG: Writing footer with {} bytes at position before write", self.writer.stream_position().unwrap_or(0));
182                // Write footer in reverse order so magic is at the end: content + length + magic
183
184                // Write JSONL content first
185                self.writer.write_all(jsonl.as_bytes())?;
186                eprintln!("DEBUG: Wrote footer content, now at position {}", self.writer.stream_position().unwrap_or(0));
187
188                // Write footer length
189                let footer_len = jsonl.len() as u32;
190                self.writer.write_all(&footer_len.to_le_bytes())?;
191                eprintln!("DEBUG: Wrote footer length: {}, now at position {}", footer_len, self.writer.stream_position().unwrap_or(0));
192
193                // Write footer magic last (so it's at the very end)
194                self.writer.write_all(HLXC_FOOTER_MAGIC)?;
195                eprintln!("DEBUG: Wrote footer magic, final position {}", self.writer.stream_position().unwrap_or(0));
196            } else {
197                eprintln!("DEBUG: Preview JSONL is empty, not writing footer");
198            }
199        } else {
200            eprintln!("DEBUG: No preview JSONL generated");
201        }
202
203        Ok(())
204    }
205
206    /// Extract preview rows as JSONL string
207    fn extract_preview_jsonl(&self) -> Option<String> {
208        if self.batches.is_empty() {
209            return None;
210        }
211
212        let mut preview_lines = Vec::new();
213        let mut rows_collected = 0;
214
215        for batch in &self.batches {
216            if rows_collected >= self.preview_rows {
217                break;
218            }
219
220            let rows_in_batch = std::cmp::min(self.preview_rows - rows_collected, batch.num_rows());
221
222            for row_idx in 0..rows_in_batch {
223                let mut row_json = serde_json::Map::new();
224
225                for (field_idx, field) in batch.schema().fields().iter().enumerate() {
226                    if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<StringArray>() {
227                        if array.is_valid(row_idx) {
228                            let value = array.value(row_idx);
229                            row_json.insert(field.name().clone(),
230                                           serde_json::Value::String(value.to_string()));
231                        } else {
232                            row_json.insert(field.name().clone(), serde_json::Value::Null);
233                        }
234                    } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Float64Array>() {
235                        if array.is_valid(row_idx) {
236                            let value = array.value(row_idx);
237                            row_json.insert(field.name().clone(),
238                                           serde_json::Value::Number(serde_json::Number::from_f64(value).unwrap_or(serde_json::Number::from(0))));
239                        } else {
240                            row_json.insert(field.name().clone(), serde_json::Value::Null);
241                        }
242                    } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Int64Array>() {
243                        if array.is_valid(row_idx) {
244                            let value = array.value(row_idx);
245                            row_json.insert(field.name().clone(),
246                                           serde_json::Value::Number(serde_json::Number::from(value)));
247                        } else {
248                            row_json.insert(field.name().clone(), serde_json::Value::Null);
249                        }
250                    } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<BooleanArray>() {
251                        if array.is_valid(row_idx) {
252                            let value = array.value(row_idx);
253                            row_json.insert(field.name().clone(),
254                                           serde_json::Value::Bool(value));
255                        } else {
256                            row_json.insert(field.name().clone(), serde_json::Value::Null);
257                        }
258                    } else {
259                        // For other array types, check if valid and convert to string
260                        if batch.column(field_idx).is_valid(row_idx) {
261                            let value_str = format!("{:?}", batch.column(field_idx));
262                            row_json.insert(field.name().clone(),
263                                           serde_json::Value::String(value_str));
264                        } else {
265                            row_json.insert(field.name().clone(), serde_json::Value::Null);
266                        }
267                    }
268                }
269
270                let row_json_value = serde_json::Value::Object(row_json);
271                if let Ok(json_str) = serde_json::to_string(&row_json_value) {
272                    preview_lines.push(json_str);
273                }
274
275                rows_collected += 1;
276                if rows_collected >= self.preview_rows {
277                    break;
278                }
279            }
280        }
281
282        if preview_lines.is_empty() {
283            None
284        } else {
285            Some(preview_lines.join("\n"))
286        }
287    }
288
289}
290
291/// HLXC Format Reader for preview functionality
292pub struct HlxcReader<R: std::io::Read + Seek> {
293    reader: R,
294}
295
296impl<R: std::io::Read + Seek> HlxcReader<R> {
297    pub fn new(reader: R) -> Self {
298        Self { reader }
299    }
300
301    /// Read and validate the header
302    pub fn read_header(&mut self) -> Result<HlxcHeader, HlxError> {
303        // Read magic number
304        let mut magic = [0u8; 4];
305        self.reader.read_exact(&mut magic)?;
306        if magic != *HLXC_MAGIC {
307            return Err(HlxError::validation_error(
308                "Invalid HLXC magic number",
309                "File does not appear to be a valid HLXC file"
310            ));
311        }
312
313        // Read version
314        let mut version = [0u8; 1];
315        self.reader.read_exact(&mut version)?;
316        if version[0] != HLXC_VERSION {
317            return Err(HlxError::validation_error(
318                format!("Unsupported HLXC version: {}", version[0]),
319                "Only version 1 is supported"
320            ));
321        }
322
323        // Read flags
324        let mut flags = [0u8; 1];
325        self.reader.read_exact(&mut flags)?;
326
327        // Read header length
328        let mut header_len_bytes = [0u8; 4];
329        self.reader.read_exact(&mut header_len_bytes)?;
330        let header_len = u32::from_le_bytes(header_len_bytes) as usize;
331
332        // Read header JSON
333        let mut header_bytes = vec![0u8; header_len];
334        self.reader.read_exact(&mut header_bytes)?;
335        let header: HlxcHeader = HlxcHeader::from_json_bytes(&header_bytes)?;
336
337        Ok(header)
338    }
339
340    /// Get preview rows if available
341    pub fn get_preview(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
342        // Try to read from footer
343        self.read_footer()
344    }
345
346    /// Read footer preview if available
347    fn read_footer(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
348        // Seek to end and look for footer magic
349        let current_pos = self.reader.stream_position()?;
350        eprintln!("DEBUG Reader: Current position before seeking: {}", current_pos);
351        let file_size = self.reader.seek(SeekFrom::End(0))?;
352        eprintln!("DEBUG Reader: File size: {}", file_size);
353        if file_size < 8 {
354            return Ok(None); // File too small for footer
355        }
356
357        // The footer format is: content + length (4 bytes) + magic (4 bytes)
358        // So the last 8 bytes are: length (4) + magic (4)
359
360        // Read the last 8 bytes - this should be length + magic
361        self.reader.seek(SeekFrom::End(-8))?;
362        let pos_after_seek = self.reader.stream_position()?;
363        eprintln!("DEBUG Reader: Position after seeking to -8: {}", pos_after_seek);
364        let mut footer_header = [0u8; 8];
365        self.reader.read_exact(&mut footer_header)?;
366
367        let magic = &footer_header[4..8];
368        eprintln!("DEBUG Reader: Footer magic: {:?}", magic);
369        eprintln!("DEBUG Reader: Expected magic: {:?}", *HLXC_FOOTER_MAGIC);
370        if magic != *HLXC_FOOTER_MAGIC {
371            eprintln!("DEBUG Reader: Magic mismatch, no footer found");
372            return Ok(None); // No footer
373        }
374
375        let footer_len = u32::from_le_bytes(footer_header[0..4].try_into().unwrap()) as usize;
376        eprintln!("DEBUG Reader: Footer length: {}", footer_len);
377
378        // Now seek back to read the footer content
379        // Footer content starts at (end - 8 - footer_len)
380        let content_start = file_size - 8 - footer_len as u64;
381        eprintln!("DEBUG Reader: Content should start at: {}", content_start);
382        self.reader.seek(SeekFrom::Start(content_start))?;
383        let mut footer_bytes = vec![0u8; footer_len];
384        self.reader.read_exact(&mut footer_bytes)?;
385
386        let footer_jsonl = String::from_utf8(footer_bytes)
387            .map_err(|_| HlxError::validation_error("Invalid UTF-8 in footer", ""))?;
388
389        let rows: Vec<serde_json::Value> = footer_jsonl
390            .lines()
391            .filter(|line| !line.trim().is_empty())
392            .map(|line| serde_json::from_str(line).unwrap_or(serde_json::Value::Null))
393            .collect();
394
395        Ok(Some(rows))
396    }
397
398    /// Check if file is compressed
399    pub fn is_compressed(&mut self) -> Result<bool, HlxError> {
400        // Currently no compression support, always return false
401        Ok(false)
402    }
403
404    /// Read Arrow IPC data as record batches
405    pub fn read_batches(&mut self) -> Result<Vec<RecordBatch>, HlxError> {
406        // Skip header first
407        self.read_header()?;
408
409        // Create a reader from the current position (after header)
410        let reader = StreamReader::try_new(&mut self.reader, Default::default())
411            .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream reader: {}", e), "Check Arrow IPC stream format"))?;
412
413        // Collect all batches
414        let mut batches = Vec::new();
415        for batch_result in reader {
416            let batch = batch_result
417                .map_err(|e| HlxError::io_error(format!("Failed to read Arrow record batch: {}", e), "Check Arrow IPC data integrity"))?;
418            batches.push(batch);
419        }
420
421        Ok(batches)
422    }
423}
424
425#[cfg(test)]
426mod tests {
427
428    use std::io::Cursor;
429    use arrow::array::StringArray;
430
431    #[test]
432    fn test_hlxc_header_serialization() {
433        let schema = create_arrow_schema(vec![
434            ("name", DataType::Utf8),
435            ("age", DataType::Int64),
436        ]);
437
438        let header = HlxcHeader::new(&schema);
439
440        let json_bytes = header.to_json_bytes().unwrap();
441        let deserialized: HlxcHeader = HlxcHeader::from_json_bytes(&json_bytes).unwrap();
442
443        assert_eq!(header.fields.len(), deserialized.fields.len());
444        assert_eq!(header.fields[0].name, "name");
445        assert_eq!(header.fields[0].field_type, "utf8");
446    }
447
448    #[test]
449    fn test_hlxc_writer_basic() {
450        let buffer = Vec::new();
451        let cursor = Cursor::new(buffer);
452        let writer = HlxcWriter::new(cursor);
453
454        // Just test that the writer can be created
455        assert!(writer.schema.is_none());
456    }
457
458    #[test]
459    fn test_hlxc_reader_magic() {
460        let mut buffer = Vec::new();
461        buffer.extend_from_slice(HLXC_MAGIC);
462        buffer.push(HLXC_VERSION);
463        buffer.push(0x00); // flags
464
465        let header = HlxcHeader::new(&create_arrow_schema(vec![("test", DataType::Utf8)]));
466        let header_json = header.to_json_bytes().unwrap();
467        let header_len = header_json.len() as u32;
468        buffer.extend_from_slice(&header_len.to_le_bytes());
469        buffer.extend_from_slice(&header_json);
470
471        let cursor = Cursor::new(buffer);
472        let mut reader = HlxcReader::new(cursor);
473        let read_header = reader.read_header().unwrap();
474
475        assert_eq!(read_header.fields.len(), 1);
476        assert_eq!(read_header.fields[0].name, "test");
477    }
478}
479
480/// Create Arrow schema from field definitions
481pub fn create_arrow_schema(fields: Vec<(&str, DataType)>) -> Schema {
482    let arrow_fields: Vec<Field> = fields.into_iter()
483        .map(|(name, data_type)| Field::new(name, data_type, true))
484        .collect();
485
486    Schema::new(arrow_fields)
487}
488
489/// HLXC Data Writer implementation for the OutputManager
490pub struct HlxcDataWriter {
491    writer: Option<HlxcWriter<std::fs::File>>,
492    config: OutputConfig,
493    batch_count: usize,
494}
495
496impl HlxcDataWriter {
497    pub fn new(config: OutputConfig) -> Self {
498        Self {
499            writer: None,
500            config,
501            batch_count: 0,
502        }
503    }
504
505    fn ensure_writer(&mut self) -> Result<(), HlxError> {
506        if self.writer.is_none() {
507            let filename = format!("output_{:04}.hlxc", self.batch_count);
508            let filepath = self.config.output_dir.join(filename);
509            std::fs::create_dir_all(&self.config.output_dir)?;
510
511            let file = std::fs::File::create(filepath)?;
512            let writer = HlxcWriter::new(file)
513                .with_preview(self.config.include_preview, self.config.preview_rows);
514
515            self.writer = Some(writer);
516        }
517        Ok(())
518    }
519}
520
521impl DataWriter for HlxcDataWriter {
522    fn write_batch(&mut self, batch: RecordBatch) -> Result<(), HlxError> {
523        self.ensure_writer()?;
524
525        if let Some(writer) = &mut self.writer {
526            writer.add_batch(batch)?;
527        }
528
529        Ok(())
530    }
531
532    fn finalize(&mut self) -> Result<(), HlxError> {
533        if let Some(mut writer) = self.writer.take() {
534            writer.finalize()?;
535            self.batch_count += 1;
536        }
537        Ok(())
538    }
539}