helix/dna/out/
helix_format.rs

1use arrow::array::*;
2use arrow::datatypes::*;
3use arrow::record_batch::RecordBatch;
4use arrow::ipc::writer::*;
5use arrow::ipc::reader::*;
6use std::io::{Write, Seek, SeekFrom};
7use std::collections::HashMap;
8use std::sync::Arc;
9use serde::{Serialize, Deserialize};
10pub use crate::dna::hel::error::HlxError;
11
12/// Helix Data Format (.helix files) Magic Header
13pub const HELIX_DATA_MAGIC: &[u8; 4] = b"HLX\x01";
14pub const HELIX_DATA_FOOTER_MAGIC: &[u8; 4] = b"\xFF\xFF\xFF\xFF";
15
16/// Helix Data Format Version
17pub const HELIX_DATA_VERSION: u8 = 1;
18
19/// Helix Data File (.helix) Header Structure
20/// This is for binary DATA output files, not configuration files.
21/// For configuration files, see hlx_config_format.rs (.hlx) and hlxb_config_format.rs (.hlxb)
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct HlxHeader {
24    /// Schema fields (simplified representation)
25    pub fields: Vec<HlxField>,
26    /// Optional metadata about the run
27    pub metadata: HashMap<String, serde_json::Value>,
28    /// Compression flags
29    pub flags: u8,
30    /// Number of rows in this file
31    pub row_count: u64,
32    /// Optional preview rows (first N rows as JSONL)
33    pub preview_rows: Option<Vec<serde_json::Value>>,
34}
35
36/// Simplified field representation for HLX header
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct HlxField {
39    pub name: String,
40    #[serde(rename = "type")]
41    pub field_type: String,
42}
43
44impl HlxHeader {
45    pub fn new(schema: &Schema, metadata: HashMap<String, serde_json::Value>) -> Self {
46        let fields: Vec<HlxField> = schema.fields().iter().map(|field| {
47            HlxField {
48                name: field.name().clone(),
49                field_type: format!("{:?}", field.data_type()).to_lowercase(),
50            }
51        }).collect();
52
53        Self {
54            fields,
55            metadata,
56            flags: 0,
57            row_count: 0,
58            preview_rows: None,
59        }
60    }
61
62    /// Set compression flag
63    pub fn with_compression(mut self, compressed: bool) -> Self {
64        if compressed {
65            self.flags |= 0x01;
66        } else {
67            self.flags &= !0x01;
68        }
69        self
70    }
71
72    /// Set row count
73    pub fn with_row_count(mut self, count: u64) -> Self {
74        self.row_count = count;
75        self
76    }
77
78    /// Set preview rows (first N rows as JSON)
79    pub fn with_preview(mut self, preview: Vec<serde_json::Value>) -> Self {
80        self.preview_rows = Some(preview);
81        self
82    }
83
84    /// Check if compression is enabled
85    pub fn is_compressed(&self) -> bool {
86        (self.flags & 0x01) != 0
87    }
88
89    /// Serialize header to JSON bytes
90    pub fn to_json_bytes(&self) -> Result<Vec<u8>, HlxError> {
91        serde_json::to_vec(self).map_err(|e| HlxError::json_error(e.to_string(), ""))
92    }
93
94    /// Deserialize header from JSON bytes
95    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, HlxError> {
96        serde_json::from_slice(bytes).map_err(|e| HlxError::json_error(e.to_string(), ""))
97    }
98}
99
100/// HLX Format Writer
101pub struct HlxWriter<W: Write + Seek> {
102    writer: W,
103    header: Option<HlxHeader>,
104    schema: Option<Arc<Schema>>,
105    batches: Vec<RecordBatch>,
106    compression_enabled: bool,
107}
108
109impl<W: Write + Seek> HlxWriter<W> {
110    pub fn new(writer: W) -> Self {
111        Self {
112            writer,
113            header: None,
114            schema: None,
115            batches: Vec::new(),
116            compression_enabled: false,
117        }
118    }
119
120    pub fn with_compression(mut self, enabled: bool) -> Self {
121        self.compression_enabled = enabled;
122        self
123    }
124
125    /// Set the schema for the output
126    pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
127        self.schema = Some(schema);
128        self
129    }
130
131    /// Add a record batch to the output
132    pub fn add_batch(&mut self, batch: RecordBatch) -> Result<(), HlxError> {
133        // Update schema if not set
134        if self.schema.is_none() {
135            self.schema = Some(batch.schema().clone());
136        } else if self.schema.as_ref().unwrap().as_ref() != batch.schema().as_ref() {
137            return Err(HlxError::validation_error(
138                "Schema mismatch between batches",
139                "All batches must have the same schema"
140            ));
141        }
142
143        self.batches.push(batch);
144        Ok(())
145    }
146
147    /// Finalize and write the HLX file
148    pub fn finalize(&mut self) -> Result<(), HlxError> {
149        if self.batches.is_empty() {
150            return Err(HlxError::validation_error(
151                "No data to write",
152                "At least one record batch is required"
153            ));
154        }
155
156        let schema = self.schema.as_ref().unwrap().clone();
157        let total_rows = self.batches.iter().map(|b| b.num_rows()).sum::<usize>() as u64;
158
159        // Create header with metadata
160        let mut metadata = HashMap::new();
161        metadata.insert("format_version".to_string(),
162                       serde_json::Value::String("1.0".to_string()));
163        metadata.insert("created_at".to_string(),
164                       serde_json::Value::String(chrono::Utc::now().to_rfc3339()));
165
166        let mut header = HlxHeader::new(&schema, metadata)
167            .with_compression(self.compression_enabled)
168            .with_row_count(total_rows);
169
170        // Add preview rows (first 10 rows)
171        if let Some(first_batch) = self.batches.first() {
172            let preview_count = std::cmp::min(10, first_batch.num_rows());
173            let preview_rows = extract_preview_rows(first_batch, preview_count);
174            header = header.with_preview(preview_rows);
175        }
176
177        // Write magic number and header
178        self.writer.write_all(HELIX_DATA_MAGIC)?;
179        let header_bytes = header.to_json_bytes()?;
180        let header_len = header_bytes.len() as u32;
181        self.writer.write_all(&header_len.to_le_bytes())?;
182        self.writer.write_all(&header_bytes)?;
183
184        // Write data as Arrow IPC stream
185        let mut options = IpcWriteOptions::default();
186        if self.compression_enabled {
187            // Try to enable ZSTD compression - this API may vary by Arrow version
188            // For now, we'll use the default options and note that compression
189            // needs to be properly configured for the specific Arrow version
190            eprintln!("Warning: Compression enabled but not yet implemented for this Arrow version");
191        }
192
193        let mut stream_writer = StreamWriter::try_new_with_options(&mut self.writer, &schema, options)
194            .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream writer: {}", e), "Check Arrow IPC format support"))?;
195
196        for batch in &self.batches {
197            stream_writer.write(batch)
198                .map_err(|e| HlxError::io_error(format!("Failed to write Arrow record batch: {}", e), "Check Arrow record batch format"))?;
199        }
200
201        stream_writer.finish()
202            .map_err(|e| HlxError::io_error(format!("Failed to finalize Arrow IPC stream: {}", e), "Check stream finalization"))?;
203
204        // Write preview footer if we have preview rows
205        if let Some(preview_rows) = &header.preview_rows {
206            if !preview_rows.is_empty() {
207                // Write footer magic
208                self.writer.write_all(HELIX_DATA_FOOTER_MAGIC)?;
209
210                // Write preview as JSONL
211                let preview_jsonl = preview_rows.iter()
212                    .map(|row| serde_json::to_string(row).unwrap_or_default())
213                    .collect::<Vec<_>>()
214                    .join("\n");
215
216                let footer_len = preview_jsonl.len() as u32;
217                self.writer.write_all(&footer_len.to_le_bytes())?;
218                self.writer.write_all(preview_jsonl.as_bytes())?;
219            }
220        }
221
222        Ok(())
223    }
224}
225
226/// Extract preview rows from a record batch
227fn extract_preview_rows(batch: &RecordBatch, count: usize) -> Vec<serde_json::Value> {
228    let mut preview_rows = Vec::new();
229
230    for row_idx in 0..count {
231        let mut row_json = serde_json::Map::new();
232
233        for (field_idx, field) in batch.schema().fields().iter().enumerate() {
234            if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<StringArray>() {
235                if array.is_valid(row_idx) {
236                    let value = array.value(row_idx);
237                    row_json.insert(field.name().clone(),
238                                   serde_json::Value::String(value.to_string()));
239                } else {
240                    row_json.insert(field.name().clone(), serde_json::Value::Null);
241                }
242            } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Float64Array>() {
243                if array.is_valid(row_idx) {
244                    let value = array.value(row_idx);
245                    row_json.insert(field.name().clone(),
246                                   serde_json::Value::Number(serde_json::Number::from_f64(value).unwrap_or(serde_json::Number::from(0))));
247                } else {
248                    row_json.insert(field.name().clone(), serde_json::Value::Null);
249                }
250            } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Int64Array>() {
251                if array.is_valid(row_idx) {
252                    let value = array.value(row_idx);
253                    row_json.insert(field.name().clone(),
254                                   serde_json::Value::Number(serde_json::Number::from(value)));
255                } else {
256                    row_json.insert(field.name().clone(), serde_json::Value::Null);
257                }
258            } else {
259                // For other array types, check if valid and convert to string
260                if batch.column(field_idx).is_valid(row_idx) {
261                    let value_str = format!("{:?}", batch.column(field_idx));
262                    row_json.insert(field.name().clone(),
263                                   serde_json::Value::String(value_str));
264                } else {
265                    row_json.insert(field.name().clone(), serde_json::Value::Null);
266                }
267            }
268        }
269
270        preview_rows.push(serde_json::Value::Object(row_json));
271    }
272
273    preview_rows
274}
275
276/// HLX Format Reader for preview functionality
277pub struct HlxReader<R: std::io::Read + Seek> {
278    reader: R,
279    header: Option<HlxHeader>,
280}
281
282impl<R: std::io::Read + Seek> HlxReader<R> {
283    pub fn new(reader: R) -> Self {
284        Self {
285            reader,
286            header: None,
287        }
288    }
289
290    /// Read and validate the header
291    pub fn read_header(&mut self) -> Result<&HlxHeader, HlxError> {
292        if self.header.is_some() {
293            return Ok(self.header.as_ref().unwrap());
294        }
295
296        // Read magic number
297        let mut magic = [0u8; 4];
298        self.reader.read_exact(&mut magic)?;
299        if magic != *HELIX_DATA_MAGIC {
300            return Err(HlxError::validation_error(
301                "Invalid Helix data magic number",
302                "File does not appear to be a valid .helix data file"
303            ));
304        }
305
306        // Read header length
307        let mut header_len_bytes = [0u8; 4];
308        self.reader.read_exact(&mut header_len_bytes)?;
309        let header_len = u32::from_le_bytes(header_len_bytes) as usize;
310
311        // Read header JSON
312        let mut header_bytes = vec![0u8; header_len];
313        self.reader.read_exact(&mut header_bytes)?;
314        let header: HlxHeader = HlxHeader::from_json_bytes(&header_bytes)?;
315
316        self.header = Some(header);
317        Ok(self.header.as_ref().unwrap())
318    }
319
320    /// Get preview rows if available
321    pub fn get_preview(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
322        let header = self.read_header()?;
323
324        if let Some(preview_rows) = &header.preview_rows {
325            return Ok(Some(preview_rows.clone()));
326        }
327
328        // Try to read from footer
329        if let Some(footer_rows) = self.read_footer()? {
330            return Ok(Some(footer_rows));
331        }
332
333        Ok(None)
334    }
335
336    /// Read footer preview if available
337    fn read_footer(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
338        // Seek to end and look for footer magic
339        let file_size = self.reader.seek(SeekFrom::End(0))?;
340        if file_size < 8 {
341            return Ok(None); // File too small for footer
342        }
343
344        // Read last 8 bytes (footer magic + length)
345        self.reader.seek(SeekFrom::End(-8))?;
346        let mut footer_header = [0u8; 8];
347        self.reader.read_exact(&mut footer_header)?;
348
349        let magic = &footer_header[0..4];
350        if magic != *HELIX_DATA_FOOTER_MAGIC {
351            return Ok(None); // No footer
352        }
353
354        let footer_len = u32::from_le_bytes(footer_header[4..8].try_into().unwrap()) as usize;
355
356        // Read footer content
357        self.reader.seek(SeekFrom::End(-8 - footer_len as i64))?;
358        let mut footer_bytes = vec![0u8; footer_len];
359        self.reader.read_exact(&mut footer_bytes)?;
360
361        let footer_jsonl = String::from_utf8(footer_bytes)
362            .map_err(|_| HlxError::validation_error("Invalid UTF-8 in footer", ""))?;
363
364        let rows: Vec<serde_json::Value> = footer_jsonl
365            .lines()
366            .filter(|line| !line.trim().is_empty())
367            .map(|line| serde_json::from_str(line).unwrap_or(serde_json::Value::Null))
368            .collect();
369
370        Ok(Some(rows))
371    }
372
373    /// Get schema information
374    pub fn get_schema(&mut self) -> Result<&HlxHeader, HlxError> {
375        self.read_header()
376    }
377
378    /// Read Arrow IPC data as record batches
379    pub fn read_batches(&mut self) -> Result<Vec<RecordBatch>, HlxError> {
380        // Skip header first
381        self.read_header()?;
382
383        // Create a reader from the current position (after header)
384        let reader = StreamReader::try_new(&mut self.reader, Default::default())
385            .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream reader: {}", e), "Check Arrow IPC stream format"))?;
386
387        // Collect all batches
388        let mut batches = Vec::new();
389        for batch_result in reader {
390            let batch = batch_result
391                .map_err(|e| HlxError::io_error(format!("Failed to read Arrow record batch: {}", e), "Check Arrow IPC data integrity"))?;
392            batches.push(batch);
393        }
394
395        Ok(batches)
396    }
397}
398
399/// Convert Helix Value to Arrow Array
400pub fn value_to_arrow_array(field: &Field, values: Vec<crate::dna::atp::value::Value>) -> Result<ArrayRef, HlxError> {
401    match field.data_type() {
402        DataType::Utf8 => {
403            let string_values: Vec<Option<String>> = values.into_iter()
404                .map(|v| match v {
405                    crate::dna::atp::value::Value::String(s) => Some(s),
406                    _ => Some(v.to_string()),
407                })
408                .collect();
409
410            let array = StringArray::from(string_values);
411            Ok(Arc::new(array))
412        }
413        DataType::Float64 => {
414            let float_values: Vec<Option<f64>> = values.into_iter()
415                .map(|v| match v {
416                    crate::dna::atp::value::Value::Number(n) => Some(n),
417                    _ => v.to_string().parse().ok(),
418                })
419                .collect();
420
421            let array = Float64Array::from(float_values);
422            Ok(Arc::new(array))
423        }
424        DataType::Int64 => {
425            let int_values: Vec<Option<i64>> = values.into_iter()
426                .map(|v| match v {
427                    crate::dna::atp::value::Value::Number(n) => Some(n as i64),
428                    _ => v.to_string().parse().ok(),
429                })
430                .collect();
431
432            let array = Int64Array::from(int_values);
433            Ok(Arc::new(array))
434        }
435        _ => {
436            // Default to string representation
437            let string_values: Vec<Option<String>> = values.into_iter()
438                .map(|v| Some(v.to_string()))
439                .collect();
440
441            let array = StringArray::from(string_values);
442            Ok(Arc::new(array))
443        }
444    }
445}
446
447/// Create Arrow schema from field definitions
448pub fn create_arrow_schema(fields: Vec<(&str, DataType)>) -> Schema {
449    let arrow_fields: Vec<Field> = fields.into_iter()
450        .map(|(name, data_type)| Field::new(name, data_type, true))
451        .collect();
452
453    Schema::new(arrow_fields)
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use std::io::Cursor;
460
461    #[test]
462    fn test_hlx_header_serialization() {
463        let mut metadata = HashMap::new();
464        metadata.insert("test".to_string(), serde_json::Value::String("value".to_string()));
465
466        let schema = create_arrow_schema(vec![
467            ("name", DataType::Utf8),
468            ("age", DataType::Int64),
469        ]);
470
471        let header = HlxHeader::new(&schema, metadata)
472            .with_compression(true)
473            .with_row_count(100);
474
475        let json_bytes = header.to_json_bytes().unwrap();
476        let deserialized: HlxHeader = HlxHeader::from_json_bytes(&json_bytes).unwrap();
477
478        assert_eq!(header.row_count, deserialized.row_count);
479        assert!(header.is_compressed());
480    }
481
482    #[test]
483    fn test_hlx_writer_basic() {
484        let buffer = Vec::new();
485        let cursor = Cursor::new(buffer);
486        let mut writer = HlxWriter::new(cursor);
487
488        // This test would need actual data to be meaningful
489        // For now, just test that the writer can be created
490        assert!(writer.header.is_none());
491    }
492}