1use arrow::array::*;
2use arrow::datatypes::*;
3use arrow::record_batch::RecordBatch;
4use arrow::ipc::writer::*;
5use arrow::ipc::reader::*;
6use std::io::{Write, Seek, SeekFrom};
7use std::collections::HashMap;
8use std::sync::Arc;
9use serde::{Serialize, Deserialize};
10pub use crate::dna::hel::error::HlxError;
11
12pub const HELIX_DATA_MAGIC: &[u8; 4] = b"HLX\x01";
14pub const HELIX_DATA_FOOTER_MAGIC: &[u8; 4] = b"\xFF\xFF\xFF\xFF";
15
16pub const HELIX_DATA_VERSION: u8 = 1;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct HlxHeader {
24 pub fields: Vec<HlxField>,
26 pub metadata: HashMap<String, serde_json::Value>,
28 pub flags: u8,
30 pub row_count: u64,
32 pub preview_rows: Option<Vec<serde_json::Value>>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct HlxField {
39 pub name: String,
40 #[serde(rename = "type")]
41 pub field_type: String,
42}
43
44impl HlxHeader {
45 pub fn new(schema: &Schema, metadata: HashMap<String, serde_json::Value>) -> Self {
46 let fields: Vec<HlxField> = schema.fields().iter().map(|field| {
47 HlxField {
48 name: field.name().clone(),
49 field_type: format!("{:?}", field.data_type()).to_lowercase(),
50 }
51 }).collect();
52
53 Self {
54 fields,
55 metadata,
56 flags: 0,
57 row_count: 0,
58 preview_rows: None,
59 }
60 }
61
62 pub fn with_compression(mut self, compressed: bool) -> Self {
64 if compressed {
65 self.flags |= 0x01;
66 } else {
67 self.flags &= !0x01;
68 }
69 self
70 }
71
72 pub fn with_row_count(mut self, count: u64) -> Self {
74 self.row_count = count;
75 self
76 }
77
78 pub fn with_preview(mut self, preview: Vec<serde_json::Value>) -> Self {
80 self.preview_rows = Some(preview);
81 self
82 }
83
84 pub fn is_compressed(&self) -> bool {
86 (self.flags & 0x01) != 0
87 }
88
89 pub fn to_json_bytes(&self) -> Result<Vec<u8>, HlxError> {
91 serde_json::to_vec(self).map_err(|e| HlxError::json_error(e.to_string(), ""))
92 }
93
94 pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, HlxError> {
96 serde_json::from_slice(bytes).map_err(|e| HlxError::json_error(e.to_string(), ""))
97 }
98}
99
100pub struct HlxWriter<W: Write + Seek> {
102 writer: W,
103 header: Option<HlxHeader>,
104 schema: Option<Arc<Schema>>,
105 batches: Vec<RecordBatch>,
106 compression_enabled: bool,
107}
108
109impl<W: Write + Seek> HlxWriter<W> {
110 pub fn new(writer: W) -> Self {
111 Self {
112 writer,
113 header: None,
114 schema: None,
115 batches: Vec::new(),
116 compression_enabled: false,
117 }
118 }
119
120 pub fn with_compression(mut self, enabled: bool) -> Self {
121 self.compression_enabled = enabled;
122 self
123 }
124
125 pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
127 self.schema = Some(schema);
128 self
129 }
130
131 pub fn add_batch(&mut self, batch: RecordBatch) -> Result<(), HlxError> {
133 if self.schema.is_none() {
135 self.schema = Some(batch.schema().clone());
136 } else if self.schema.as_ref().unwrap().as_ref() != batch.schema().as_ref() {
137 return Err(HlxError::validation_error(
138 "Schema mismatch between batches",
139 "All batches must have the same schema"
140 ));
141 }
142
143 self.batches.push(batch);
144 Ok(())
145 }
146
147 pub fn finalize(&mut self) -> Result<(), HlxError> {
149 if self.batches.is_empty() {
150 return Err(HlxError::validation_error(
151 "No data to write",
152 "At least one record batch is required"
153 ));
154 }
155
156 let schema = self.schema.as_ref().unwrap().clone();
157 let total_rows = self.batches.iter().map(|b| b.num_rows()).sum::<usize>() as u64;
158
159 let mut metadata = HashMap::new();
161 metadata.insert("format_version".to_string(),
162 serde_json::Value::String("1.0".to_string()));
163 metadata.insert("created_at".to_string(),
164 serde_json::Value::String(chrono::Utc::now().to_rfc3339()));
165
166 let mut header = HlxHeader::new(&schema, metadata)
167 .with_compression(self.compression_enabled)
168 .with_row_count(total_rows);
169
170 if let Some(first_batch) = self.batches.first() {
172 let preview_count = std::cmp::min(10, first_batch.num_rows());
173 let preview_rows = extract_preview_rows(first_batch, preview_count);
174 header = header.with_preview(preview_rows);
175 }
176
177 self.writer.write_all(HELIX_DATA_MAGIC)?;
179 let header_bytes = header.to_json_bytes()?;
180 let header_len = header_bytes.len() as u32;
181 self.writer.write_all(&header_len.to_le_bytes())?;
182 self.writer.write_all(&header_bytes)?;
183
184 let mut options = IpcWriteOptions::default();
186 if self.compression_enabled {
187 eprintln!("Warning: Compression enabled but not yet implemented for this Arrow version");
191 }
192
193 let mut stream_writer = StreamWriter::try_new_with_options(&mut self.writer, &schema, options)
194 .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream writer: {}", e), "Check Arrow IPC format support"))?;
195
196 for batch in &self.batches {
197 stream_writer.write(batch)
198 .map_err(|e| HlxError::io_error(format!("Failed to write Arrow record batch: {}", e), "Check Arrow record batch format"))?;
199 }
200
201 stream_writer.finish()
202 .map_err(|e| HlxError::io_error(format!("Failed to finalize Arrow IPC stream: {}", e), "Check stream finalization"))?;
203
204 if let Some(preview_rows) = &header.preview_rows {
206 if !preview_rows.is_empty() {
207 self.writer.write_all(HELIX_DATA_FOOTER_MAGIC)?;
209
210 let preview_jsonl = preview_rows.iter()
212 .map(|row| serde_json::to_string(row).unwrap_or_default())
213 .collect::<Vec<_>>()
214 .join("\n");
215
216 let footer_len = preview_jsonl.len() as u32;
217 self.writer.write_all(&footer_len.to_le_bytes())?;
218 self.writer.write_all(preview_jsonl.as_bytes())?;
219 }
220 }
221
222 Ok(())
223 }
224}
225
226fn extract_preview_rows(batch: &RecordBatch, count: usize) -> Vec<serde_json::Value> {
228 let mut preview_rows = Vec::new();
229
230 for row_idx in 0..count {
231 let mut row_json = serde_json::Map::new();
232
233 for (field_idx, field) in batch.schema().fields().iter().enumerate() {
234 if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<StringArray>() {
235 if array.is_valid(row_idx) {
236 let value = array.value(row_idx);
237 row_json.insert(field.name().clone(),
238 serde_json::Value::String(value.to_string()));
239 } else {
240 row_json.insert(field.name().clone(), serde_json::Value::Null);
241 }
242 } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Float64Array>() {
243 if array.is_valid(row_idx) {
244 let value = array.value(row_idx);
245 row_json.insert(field.name().clone(),
246 serde_json::Value::Number(serde_json::Number::from_f64(value).unwrap_or(serde_json::Number::from(0))));
247 } else {
248 row_json.insert(field.name().clone(), serde_json::Value::Null);
249 }
250 } else if let Some(array) = batch.column(field_idx).as_any().downcast_ref::<Int64Array>() {
251 if array.is_valid(row_idx) {
252 let value = array.value(row_idx);
253 row_json.insert(field.name().clone(),
254 serde_json::Value::Number(serde_json::Number::from(value)));
255 } else {
256 row_json.insert(field.name().clone(), serde_json::Value::Null);
257 }
258 } else {
259 if batch.column(field_idx).is_valid(row_idx) {
261 let value_str = format!("{:?}", batch.column(field_idx));
262 row_json.insert(field.name().clone(),
263 serde_json::Value::String(value_str));
264 } else {
265 row_json.insert(field.name().clone(), serde_json::Value::Null);
266 }
267 }
268 }
269
270 preview_rows.push(serde_json::Value::Object(row_json));
271 }
272
273 preview_rows
274}
275
276pub struct HlxReader<R: std::io::Read + Seek> {
278 reader: R,
279 header: Option<HlxHeader>,
280}
281
282impl<R: std::io::Read + Seek> HlxReader<R> {
283 pub fn new(reader: R) -> Self {
284 Self {
285 reader,
286 header: None,
287 }
288 }
289
290 pub fn read_header(&mut self) -> Result<&HlxHeader, HlxError> {
292 if self.header.is_some() {
293 return Ok(self.header.as_ref().unwrap());
294 }
295
296 let mut magic = [0u8; 4];
298 self.reader.read_exact(&mut magic)?;
299 if magic != *HELIX_DATA_MAGIC {
300 return Err(HlxError::validation_error(
301 "Invalid Helix data magic number",
302 "File does not appear to be a valid .helix data file"
303 ));
304 }
305
306 let mut header_len_bytes = [0u8; 4];
308 self.reader.read_exact(&mut header_len_bytes)?;
309 let header_len = u32::from_le_bytes(header_len_bytes) as usize;
310
311 let mut header_bytes = vec![0u8; header_len];
313 self.reader.read_exact(&mut header_bytes)?;
314 let header: HlxHeader = HlxHeader::from_json_bytes(&header_bytes)?;
315
316 self.header = Some(header);
317 Ok(self.header.as_ref().unwrap())
318 }
319
320 pub fn get_preview(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
322 let header = self.read_header()?;
323
324 if let Some(preview_rows) = &header.preview_rows {
325 return Ok(Some(preview_rows.clone()));
326 }
327
328 if let Some(footer_rows) = self.read_footer()? {
330 return Ok(Some(footer_rows));
331 }
332
333 Ok(None)
334 }
335
336 fn read_footer(&mut self) -> Result<Option<Vec<serde_json::Value>>, HlxError> {
338 let file_size = self.reader.seek(SeekFrom::End(0))?;
340 if file_size < 8 {
341 return Ok(None); }
343
344 self.reader.seek(SeekFrom::End(-8))?;
346 let mut footer_header = [0u8; 8];
347 self.reader.read_exact(&mut footer_header)?;
348
349 let magic = &footer_header[0..4];
350 if magic != *HELIX_DATA_FOOTER_MAGIC {
351 return Ok(None); }
353
354 let footer_len = u32::from_le_bytes(footer_header[4..8].try_into().unwrap()) as usize;
355
356 self.reader.seek(SeekFrom::End(-8 - footer_len as i64))?;
358 let mut footer_bytes = vec![0u8; footer_len];
359 self.reader.read_exact(&mut footer_bytes)?;
360
361 let footer_jsonl = String::from_utf8(footer_bytes)
362 .map_err(|_| HlxError::validation_error("Invalid UTF-8 in footer", ""))?;
363
364 let rows: Vec<serde_json::Value> = footer_jsonl
365 .lines()
366 .filter(|line| !line.trim().is_empty())
367 .map(|line| serde_json::from_str(line).unwrap_or(serde_json::Value::Null))
368 .collect();
369
370 Ok(Some(rows))
371 }
372
373 pub fn get_schema(&mut self) -> Result<&HlxHeader, HlxError> {
375 self.read_header()
376 }
377
378 pub fn read_batches(&mut self) -> Result<Vec<RecordBatch>, HlxError> {
380 self.read_header()?;
382
383 let reader = StreamReader::try_new(&mut self.reader, Default::default())
385 .map_err(|e| HlxError::io_error(format!("Failed to create Arrow IPC stream reader: {}", e), "Check Arrow IPC stream format"))?;
386
387 let mut batches = Vec::new();
389 for batch_result in reader {
390 let batch = batch_result
391 .map_err(|e| HlxError::io_error(format!("Failed to read Arrow record batch: {}", e), "Check Arrow IPC data integrity"))?;
392 batches.push(batch);
393 }
394
395 Ok(batches)
396 }
397}
398
399pub fn value_to_arrow_array(field: &Field, values: Vec<crate::dna::atp::value::Value>) -> Result<ArrayRef, HlxError> {
401 match field.data_type() {
402 DataType::Utf8 => {
403 let string_values: Vec<Option<String>> = values.into_iter()
404 .map(|v| match v {
405 crate::dna::atp::value::Value::String(s) => Some(s),
406 _ => Some(v.to_string()),
407 })
408 .collect();
409
410 let array = StringArray::from(string_values);
411 Ok(Arc::new(array))
412 }
413 DataType::Float64 => {
414 let float_values: Vec<Option<f64>> = values.into_iter()
415 .map(|v| match v {
416 crate::dna::atp::value::Value::Number(n) => Some(n),
417 _ => v.to_string().parse().ok(),
418 })
419 .collect();
420
421 let array = Float64Array::from(float_values);
422 Ok(Arc::new(array))
423 }
424 DataType::Int64 => {
425 let int_values: Vec<Option<i64>> = values.into_iter()
426 .map(|v| match v {
427 crate::dna::atp::value::Value::Number(n) => Some(n as i64),
428 _ => v.to_string().parse().ok(),
429 })
430 .collect();
431
432 let array = Int64Array::from(int_values);
433 Ok(Arc::new(array))
434 }
435 _ => {
436 let string_values: Vec<Option<String>> = values.into_iter()
438 .map(|v| Some(v.to_string()))
439 .collect();
440
441 let array = StringArray::from(string_values);
442 Ok(Arc::new(array))
443 }
444 }
445}
446
447pub fn create_arrow_schema(fields: Vec<(&str, DataType)>) -> Schema {
449 let arrow_fields: Vec<Field> = fields.into_iter()
450 .map(|(name, data_type)| Field::new(name, data_type, true))
451 .collect();
452
453 Schema::new(arrow_fields)
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use std::io::Cursor;
460
461 #[test]
462 fn test_hlx_header_serialization() {
463 let mut metadata = HashMap::new();
464 metadata.insert("test".to_string(), serde_json::Value::String("value".to_string()));
465
466 let schema = create_arrow_schema(vec![
467 ("name", DataType::Utf8),
468 ("age", DataType::Int64),
469 ]);
470
471 let header = HlxHeader::new(&schema, metadata)
472 .with_compression(true)
473 .with_row_count(100);
474
475 let json_bytes = header.to_json_bytes().unwrap();
476 let deserialized: HlxHeader = HlxHeader::from_json_bytes(&json_bytes).unwrap();
477
478 assert_eq!(header.row_count, deserialized.row_count);
479 assert!(header.is_compressed());
480 }
481
482 #[test]
483 fn test_hlx_writer_basic() {
484 let buffer = Vec::new();
485 let cursor = Cursor::new(buffer);
486 let mut writer = HlxWriter::new(cursor);
487
488 assert!(writer.header.is_none());
491 }
492}