Skip to main content

difi/
io.rs

1//! Parquet I/O for difi types.
2//!
3//! Reads observations and linkage members from Parquet files,
4//! handling string ID interning at the boundary. Writes output
5//! types back with string de-interning.
6//!
7//! Column projection is supported to skip unused columns at read time,
8//! which is critical at survey scale (166M+ rows).
9
10use std::path::Path;
11use std::sync::Arc;
12
13use arrow::array::{
14    Array, BooleanArray, Float64Array, Int64Array, LargeStringArray, RecordBatch, StringArray,
15};
16use arrow::datatypes::{DataType, Field, Schema};
17use parquet::arrow::ArrowWriter;
18use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21
22use crate::error::{Error, Result};
23use crate::partitions::PartitionSummary;
24use crate::types::{
25    AllLinkages, AllObjects, FindableObservations, IgnoredLinkages, LinkageMembers, Observations,
26    StringInterner,
27};
28
29// ---------------------------------------------------------------------------
30// Column extraction helpers
31// ---------------------------------------------------------------------------
32
33/// Extract a required large_string or utf8 column as Vec<String>.
34fn get_string_column(batch: &RecordBatch, name: &str) -> Result<Vec<String>> {
35    let col = batch
36        .column_by_name(name)
37        .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
38
39    if let Some(arr) = col.as_any().downcast_ref::<LargeStringArray>() {
40        Ok((0..arr.len()).map(|i| arr.value(i).to_string()).collect())
41    } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
42        Ok((0..arr.len()).map(|i| arr.value(i).to_string()).collect())
43    } else {
44        Err(Error::InvalidInput(format!(
45            "Column {name} is not a string type"
46        )))
47    }
48}
49
50/// Extract an optional (nullable) large_string column as Vec<Option<String>>.
51fn get_optional_string_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<String>>> {
52    let col = match batch.column_by_name(name) {
53        Some(c) => c,
54        None => return Ok(vec![None; batch.num_rows()]),
55    };
56
57    if let Some(arr) = col.as_any().downcast_ref::<LargeStringArray>() {
58        Ok((0..arr.len())
59            .map(|i| {
60                if arr.is_null(i) {
61                    None
62                } else {
63                    Some(arr.value(i).to_string())
64                }
65            })
66            .collect())
67    } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
68        Ok((0..arr.len())
69            .map(|i| {
70                if arr.is_null(i) {
71                    None
72                } else {
73                    Some(arr.value(i).to_string())
74                }
75            })
76            .collect())
77    } else {
78        Err(Error::InvalidInput(format!(
79            "Column {name} is not a string type"
80        )))
81    }
82}
83
84/// Extract a required f64 column.
85fn get_f64_column(batch: &RecordBatch, name: &str) -> Result<Vec<f64>> {
86    let col = batch
87        .column_by_name(name)
88        .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
89    let arr = col
90        .as_any()
91        .downcast_ref::<Float64Array>()
92        .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Float64")))?;
93    Ok(arr.values().to_vec())
94}
95
96/// Extract a required i64 column.
97fn get_i64_column(batch: &RecordBatch, name: &str) -> Result<Vec<i64>> {
98    let col = batch
99        .column_by_name(name)
100        .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
101    let arr = col
102        .as_any()
103        .downcast_ref::<Int64Array>()
104        .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Int64")))?;
105    Ok(arr.values().to_vec())
106}
107
108/// Extract the `time` struct column (days: i64, nanos: i64) and convert to MJD.
109fn get_time_as_mjd(batch: &RecordBatch) -> Result<Vec<f64>> {
110    let col = batch
111        .column_by_name("time")
112        .ok_or_else(|| Error::InvalidInput("Missing column: time".to_string()))?;
113    let struct_arr = col
114        .as_any()
115        .downcast_ref::<arrow::array::StructArray>()
116        .ok_or_else(|| Error::InvalidInput("Column time is not a struct".to_string()))?;
117
118    let days = struct_arr
119        .column_by_name("days")
120        .ok_or_else(|| Error::InvalidInput("time struct missing 'days' field".to_string()))?
121        .as_any()
122        .downcast_ref::<Int64Array>()
123        .ok_or_else(|| Error::InvalidInput("time.days is not Int64".to_string()))?;
124
125    let nanos = struct_arr
126        .column_by_name("nanos")
127        .ok_or_else(|| Error::InvalidInput("time struct missing 'nanos' field".to_string()))?
128        .as_any()
129        .downcast_ref::<Int64Array>()
130        .ok_or_else(|| Error::InvalidInput("time.nanos is not Int64".to_string()))?;
131
132    let nanos_per_day: f64 = 86_400.0 * 1e9;
133    Ok((0..days.len())
134        .map(|i| days.value(i) as f64 + nanos.value(i) as f64 / nanos_per_day)
135        .collect())
136}
137
138// ---------------------------------------------------------------------------
139// Column projection
140// ---------------------------------------------------------------------------
141
142/// Build a column projection mask for the given column names.
143/// Returns indices into the Parquet schema for only the requested columns.
144fn build_projection_mask(
145    parquet_schema: &parquet::schema::types::SchemaDescriptor,
146    arrow_schema: &Schema,
147    columns: &[&str],
148) -> parquet::arrow::ProjectionMask {
149    let indices: Vec<usize> = columns
150        .iter()
151        .filter_map(|name| arrow_schema.fields().iter().position(|f| f.name() == *name))
152        .collect();
153    parquet::arrow::ProjectionMask::roots(parquet_schema, indices)
154}
155
156// ---------------------------------------------------------------------------
157// Readers
158// ---------------------------------------------------------------------------
159
160/// Read observations from a Parquet file.
161///
162/// Returns the observations, a string interner for obs/object IDs,
163/// and a separate interner for observatory codes.
164pub fn read_observations(path: &Path) -> Result<(Observations, StringInterner, StringInterner)> {
165    read_observations_projected(path, None)
166}
167
168/// Read observations with optional column projection.
169///
170/// If `columns` is Some, only those columns are read from Parquet.
171/// Missing projected columns get default/empty values.
172pub fn read_observations_projected(
173    path: &Path,
174    columns: Option<&[&str]>,
175) -> Result<(Observations, StringInterner, StringInterner)> {
176    let file = std::fs::File::open(path)?;
177    let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
178
179    let reader = if let Some(cols) = columns {
180        let parquet_schema = builder.parquet_schema().clone();
181        let arrow_schema = builder.schema().clone();
182        let mask = build_projection_mask(&parquet_schema, &arrow_schema, cols);
183        builder.with_projection(mask).build()?
184    } else {
185        builder.build()?
186    };
187
188    let mut id_interner = StringInterner::new();
189    let mut obs_code_interner = StringInterner::new();
190
191    let mut all_id = Vec::new();
192    let mut all_time = Vec::new();
193    let mut all_ra = Vec::new();
194    let mut all_dec = Vec::new();
195    let mut all_obs_code = Vec::new();
196    let mut all_object_id = Vec::new();
197    let mut all_night = Vec::new();
198
199    for batch in reader {
200        let batch = batch?;
201        let n = batch.num_rows();
202
203        // Required: id, night
204        let ids_str = get_string_column(&batch, "id")?;
205        let night = get_i64_column(&batch, "night")?;
206
207        // Optional columns — fill with defaults if not projected
208        let time_mjd = if batch.column_by_name("time").is_some() {
209            get_time_as_mjd(&batch)?
210        } else {
211            vec![0.0; n]
212        };
213
214        let ra = if batch.column_by_name("ra").is_some() {
215            get_f64_column(&batch, "ra")?
216        } else {
217            vec![0.0; n]
218        };
219
220        let dec = if batch.column_by_name("dec").is_some() {
221            get_f64_column(&batch, "dec")?
222        } else {
223            vec![0.0; n]
224        };
225
226        let obs_codes_str = if batch.column_by_name("observatory_code").is_some() {
227            get_string_column(&batch, "observatory_code")?
228        } else {
229            vec![String::new(); n]
230        };
231
232        let object_ids_str = get_optional_string_column(&batch, "object_id")?;
233
234        for i in 0..n {
235            all_id.push(id_interner.intern(&ids_str[i]));
236            all_time.push(time_mjd[i]);
237            all_ra.push(ra[i]);
238            all_dec.push(dec[i]);
239            all_obs_code.push(obs_code_interner.intern(&obs_codes_str[i]) as u32);
240            all_object_id.push(
241                object_ids_str[i]
242                    .as_ref()
243                    .map(|s| id_interner.intern(s))
244                    .unwrap_or(crate::types::NO_OBJECT),
245            );
246            all_night.push(night[i]);
247        }
248    }
249
250    let observations = Observations::new(
251        all_id,
252        all_time,
253        all_ra,
254        all_dec,
255        all_obs_code,
256        all_object_id,
257        all_night,
258    );
259
260    Ok((observations, id_interner, obs_code_interner))
261}
262
263/// Read linkage members from a Parquet file.
264///
265/// Uses the provided `id_interner` to map string IDs to the same
266/// integer space as the observations.
267pub fn read_linkage_members(
268    path: &Path,
269    id_interner: &mut StringInterner,
270) -> Result<LinkageMembers> {
271    let file = std::fs::File::open(path)?;
272    let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
273
274    let mut all_linkage_id = Vec::new();
275    let mut all_obs_id = Vec::new();
276
277    for batch in reader {
278        let batch = batch?;
279
280        let linkage_ids_str = get_string_column(&batch, "linkage_id")?;
281        let obs_ids_str = get_string_column(&batch, "obs_id")?;
282
283        for i in 0..batch.num_rows() {
284            all_linkage_id.push(id_interner.intern(&linkage_ids_str[i]));
285            all_obs_id.push(id_interner.intern(&obs_ids_str[i]));
286        }
287    }
288
289    Ok(LinkageMembers {
290        linkage_id: all_linkage_id,
291        obs_id: all_obs_id,
292    })
293}
294
295/// Parse a `LargeUtf8` / `Utf8` column of stringified `u64`s into a `Vec<u64>`.
296fn get_u64_string_column(batch: &RecordBatch, name: &str) -> Result<Vec<u64>> {
297    let strings = get_string_column(batch, name)?;
298    strings
299        .iter()
300        .map(|s| {
301            s.parse::<u64>().map_err(|e| {
302                Error::InvalidInput(format!("Column {name}: could not parse {s:?} as u64: {e}"))
303            })
304        })
305        .collect()
306}
307
308/// Extract a nullable Int64 column as `Vec<Option<i64>>`.
309fn get_optional_i64_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<i64>>> {
310    let col = batch
311        .column_by_name(name)
312        .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
313    let arr = col
314        .as_any()
315        .downcast_ref::<Int64Array>()
316        .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Int64")))?;
317    Ok((0..arr.len())
318        .map(|i| {
319            if arr.is_null(i) {
320                None
321            } else {
322                Some(arr.value(i))
323            }
324        })
325        .collect())
326}
327
328/// Extract a nullable Float64 column as `Vec<Option<f64>>`.
329fn get_optional_f64_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<f64>>> {
330    let col = batch
331        .column_by_name(name)
332        .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
333    let arr = col
334        .as_any()
335        .downcast_ref::<Float64Array>()
336        .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Float64")))?;
337    Ok((0..arr.len())
338        .map(|i| {
339            if arr.is_null(i) {
340                None
341            } else {
342                Some(arr.value(i))
343            }
344        })
345        .collect())
346}
347
348/// Extract a nullable Boolean column as `Vec<Option<bool>>`.
349fn get_optional_bool_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<bool>>> {
350    let col = batch
351        .column_by_name(name)
352        .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
353    let arr = col
354        .as_any()
355        .downcast_ref::<BooleanArray>()
356        .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Boolean")))?;
357    Ok((0..arr.len())
358        .map(|i| {
359            if arr.is_null(i) {
360                None
361            } else {
362                Some(arr.value(i))
363            }
364        })
365        .collect())
366}
367
368/// Read an `AllObjects` table from a Parquet file written by `write_all_objects`.
369///
370/// **Interner ordering contract:** callers must intern observations first
371/// (via `read_observations` / `read_observations_projected`), then pass the
372/// returned `&mut StringInterner` here. This re-interns `object_id` strings
373/// so the `u64` IDs align with the observations in the current session. Using
374/// a fresh interner, or interning additional strings between calls, will
375/// silently produce misaligned IDs.
376pub fn read_all_objects(path: &Path, id_interner: &mut StringInterner) -> Result<AllObjects> {
377    let file = std::fs::File::open(path)?;
378    let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
379
380    let mut out = AllObjects::default();
381
382    for batch in reader {
383        let batch = batch?;
384
385        let object_ids_str = get_string_column(&batch, "object_id")?;
386        let partition_ids = get_u64_string_column(&batch, "partition_id")?;
387        let mjd_min = get_f64_column(&batch, "mjd_min")?;
388        let mjd_max = get_f64_column(&batch, "mjd_max")?;
389        let arc_length = get_f64_column(&batch, "arc_length")?;
390        let num_obs = get_i64_column(&batch, "num_obs")?;
391        let num_observatories = get_i64_column(&batch, "num_observatories")?;
392        let findable = get_optional_bool_column(&batch, "findable")?;
393        let found_pure = get_i64_column(&batch, "found_pure")?;
394        let found_contaminated = get_i64_column(&batch, "found_contaminated")?;
395        let pure = get_i64_column(&batch, "pure")?;
396        let pure_complete = get_i64_column(&batch, "pure_complete")?;
397        let contaminated = get_i64_column(&batch, "contaminated")?;
398        let contaminant = get_i64_column(&batch, "contaminant")?;
399        let mixed = get_i64_column(&batch, "mixed")?;
400        let obs_in_pure = get_i64_column(&batch, "obs_in_pure")?;
401        let obs_in_pure_complete = get_i64_column(&batch, "obs_in_pure_complete")?;
402        let obs_in_contaminated = get_i64_column(&batch, "obs_in_contaminated")?;
403        let obs_as_contaminant = get_i64_column(&batch, "obs_as_contaminant")?;
404        let obs_in_mixed = get_i64_column(&batch, "obs_in_mixed")?;
405
406        for s in &object_ids_str {
407            out.object_id.push(id_interner.intern(s));
408        }
409        out.partition_id.extend(partition_ids);
410        out.mjd_min.extend(mjd_min);
411        out.mjd_max.extend(mjd_max);
412        out.arc_length.extend(arc_length);
413        out.num_obs.extend(num_obs);
414        out.num_observatories.extend(num_observatories);
415        out.findable.extend(findable);
416        out.found_pure.extend(found_pure);
417        out.found_contaminated.extend(found_contaminated);
418        out.pure.extend(pure);
419        out.pure_complete.extend(pure_complete);
420        out.contaminated.extend(contaminated);
421        out.contaminant.extend(contaminant);
422        out.mixed.extend(mixed);
423        out.obs_in_pure.extend(obs_in_pure);
424        out.obs_in_pure_complete.extend(obs_in_pure_complete);
425        out.obs_in_contaminated.extend(obs_in_contaminated);
426        out.obs_as_contaminant.extend(obs_as_contaminant);
427        out.obs_in_mixed.extend(obs_in_mixed);
428    }
429
430    Ok(out)
431}
432
433/// Read a `Vec<PartitionSummary>` from a Parquet file written by
434/// `write_partition_summaries`. `id` is parsed from its `LargeUtf8`
435/// representation back to `u64`.
436pub fn read_partition_summaries(path: &Path) -> Result<Vec<PartitionSummary>> {
437    let file = std::fs::File::open(path)?;
438    let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
439
440    let mut out = Vec::new();
441
442    for batch in reader {
443        let batch = batch?;
444
445        let ids = get_u64_string_column(&batch, "id")?;
446        let start_night = get_i64_column(&batch, "start_night")?;
447        let end_night = get_i64_column(&batch, "end_night")?;
448        let observations = get_i64_column(&batch, "observations")?;
449        let findable = get_optional_i64_column(&batch, "findable")?;
450        let found = get_optional_i64_column(&batch, "found")?;
451        let completeness = get_optional_f64_column(&batch, "completeness")?;
452        let pure_known = get_optional_i64_column(&batch, "pure_known")?;
453        let pure_unknown = get_optional_i64_column(&batch, "pure_unknown")?;
454        let contaminated = get_optional_i64_column(&batch, "contaminated")?;
455        let mixed = get_optional_i64_column(&batch, "mixed")?;
456
457        for i in 0..batch.num_rows() {
458            out.push(PartitionSummary {
459                id: ids[i],
460                start_night: start_night[i],
461                end_night: end_night[i],
462                observations: observations[i],
463                findable: findable[i],
464                found: found[i],
465                completeness: completeness[i],
466                pure_known: pure_known[i],
467                pure_unknown: pure_unknown[i],
468                contaminated: contaminated[i],
469                mixed: mixed[i],
470            });
471        }
472    }
473
474    Ok(out)
475}
476
477/// Read `FindableObservations` from a Parquet file written by
478/// `write_findable_observations`.
479///
480/// Note: the writer does not persist the `obs_ids` field, so the returned
481/// `FindableObservations.obs_ids` is filled with `None` for every row. The
482/// DIFI phase does not consume `obs_ids`, so this is sufficient for CIFI-output
483/// reuse today.
484///
485/// Same interner ordering contract as `read_all_objects`.
486pub fn read_findable_observations(
487    path: &Path,
488    id_interner: &mut StringInterner,
489) -> Result<FindableObservations> {
490    let file = std::fs::File::open(path)?;
491    let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
492
493    let mut out = FindableObservations::default();
494
495    for batch in reader {
496        let batch = batch?;
497
498        let partition_ids = get_u64_string_column(&batch, "partition_id")?;
499        let object_ids_str = get_string_column(&batch, "object_id")?;
500        let discovery_night = get_optional_i64_column(&batch, "discovery_night")?;
501
502        for i in 0..batch.num_rows() {
503            out.partition_id.push(partition_ids[i]);
504            out.object_id.push(id_interner.intern(&object_ids_str[i]));
505            out.discovery_night.push(discovery_night[i]);
506            out.obs_ids.push(None);
507        }
508    }
509
510    Ok(out)
511}
512
513// ---------------------------------------------------------------------------
514// Writers
515// ---------------------------------------------------------------------------
516
517fn write_props() -> WriterProperties {
518    WriterProperties::builder()
519        .set_compression(Compression::SNAPPY)
520        .build()
521}
522
523/// Write AllObjects to a Parquet file, de-interning IDs back to strings.
524pub fn write_all_objects(
525    path: &Path,
526    all_objects: &AllObjects,
527    id_interner: &StringInterner,
528) -> Result<()> {
529    let schema = Arc::new(Schema::new(vec![
530        Field::new("object_id", DataType::LargeUtf8, false),
531        Field::new("partition_id", DataType::LargeUtf8, false),
532        Field::new("mjd_min", DataType::Float64, false),
533        Field::new("mjd_max", DataType::Float64, false),
534        Field::new("arc_length", DataType::Float64, false),
535        Field::new("num_obs", DataType::Int64, false),
536        Field::new("num_observatories", DataType::Int64, false),
537        Field::new("findable", DataType::Boolean, true),
538        Field::new("found_pure", DataType::Int64, false),
539        Field::new("found_contaminated", DataType::Int64, false),
540        Field::new("pure", DataType::Int64, false),
541        Field::new("pure_complete", DataType::Int64, false),
542        Field::new("contaminated", DataType::Int64, false),
543        Field::new("contaminant", DataType::Int64, false),
544        Field::new("mixed", DataType::Int64, false),
545        Field::new("obs_in_pure", DataType::Int64, false),
546        Field::new("obs_in_pure_complete", DataType::Int64, false),
547        Field::new("obs_in_contaminated", DataType::Int64, false),
548        Field::new("obs_as_contaminant", DataType::Int64, false),
549        Field::new("obs_in_mixed", DataType::Int64, false),
550    ]));
551
552    let object_ids: Vec<&str> = all_objects
553        .object_id
554        .iter()
555        .map(|&id| id_interner.resolve(id).unwrap_or(""))
556        .collect();
557    let partition_ids: Vec<String> = all_objects
558        .partition_id
559        .iter()
560        .map(|id| id.to_string())
561        .collect();
562    let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
563
564    let columns: Vec<Arc<dyn Array>> = vec![
565        Arc::new(LargeStringArray::from(object_ids)),
566        Arc::new(LargeStringArray::from(partition_id_refs)),
567        Arc::new(Float64Array::from(all_objects.mjd_min.clone())),
568        Arc::new(Float64Array::from(all_objects.mjd_max.clone())),
569        Arc::new(Float64Array::from(all_objects.arc_length.clone())),
570        Arc::new(Int64Array::from(all_objects.num_obs.clone())),
571        Arc::new(Int64Array::from(all_objects.num_observatories.clone())),
572        Arc::new(BooleanArray::from(all_objects.findable.clone())),
573        Arc::new(Int64Array::from(all_objects.found_pure.clone())),
574        Arc::new(Int64Array::from(all_objects.found_contaminated.clone())),
575        Arc::new(Int64Array::from(all_objects.pure.clone())),
576        Arc::new(Int64Array::from(all_objects.pure_complete.clone())),
577        Arc::new(Int64Array::from(all_objects.contaminated.clone())),
578        Arc::new(Int64Array::from(all_objects.contaminant.clone())),
579        Arc::new(Int64Array::from(all_objects.mixed.clone())),
580        Arc::new(Int64Array::from(all_objects.obs_in_pure.clone())),
581        Arc::new(Int64Array::from(all_objects.obs_in_pure_complete.clone())),
582        Arc::new(Int64Array::from(all_objects.obs_in_contaminated.clone())),
583        Arc::new(Int64Array::from(all_objects.obs_as_contaminant.clone())),
584        Arc::new(Int64Array::from(all_objects.obs_in_mixed.clone())),
585    ];
586
587    let batch = RecordBatch::try_new(schema.clone(), columns)?;
588    let file = std::fs::File::create(path)?;
589    let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
590    writer.write(&batch)?;
591    writer.close()?;
592    Ok(())
593}
594
595/// Write AllLinkages to a Parquet file, de-interning IDs back to strings.
596pub fn write_all_linkages(
597    path: &Path,
598    all_linkages: &AllLinkages,
599    id_interner: &StringInterner,
600) -> Result<()> {
601    let schema = Arc::new(Schema::new(vec![
602        Field::new("linkage_id", DataType::LargeUtf8, false),
603        Field::new("partition_id", DataType::LargeUtf8, false),
604        Field::new("linked_object_id", DataType::LargeUtf8, true),
605        Field::new("num_obs", DataType::Int64, false),
606        Field::new("num_obs_outside_partition", DataType::Int64, false),
607        Field::new("num_members", DataType::Int64, false),
608        Field::new("pure", DataType::Boolean, false),
609        Field::new("pure_complete", DataType::Boolean, false),
610        Field::new("contaminated", DataType::Boolean, false),
611        Field::new("contamination", DataType::Float64, false),
612        Field::new("mixed", DataType::Boolean, false),
613        Field::new("found_pure", DataType::Boolean, false),
614        Field::new("found_contaminated", DataType::Boolean, false),
615    ]));
616
617    let linkage_ids: Vec<&str> = all_linkages
618        .linkage_id
619        .iter()
620        .map(|&id| id_interner.resolve(id).unwrap_or(""))
621        .collect();
622    let partition_ids: Vec<String> = all_linkages
623        .partition_id
624        .iter()
625        .map(|id| id.to_string())
626        .collect();
627    let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
628    let linked_obj_ids: Vec<Option<&str>> = all_linkages
629        .linked_object_id
630        .iter()
631        .map(|&id| id_interner.resolve(id))
632        .collect();
633
634    let columns: Vec<Arc<dyn Array>> = vec![
635        Arc::new(LargeStringArray::from(linkage_ids)),
636        Arc::new(LargeStringArray::from(partition_id_refs)),
637        Arc::new(LargeStringArray::from(linked_obj_ids)),
638        Arc::new(Int64Array::from(all_linkages.num_obs.clone())),
639        Arc::new(Int64Array::from(
640            all_linkages.num_obs_outside_partition.clone(),
641        )),
642        Arc::new(Int64Array::from(all_linkages.num_members.clone())),
643        Arc::new(BooleanArray::from(all_linkages.pure.clone())),
644        Arc::new(BooleanArray::from(all_linkages.pure_complete.clone())),
645        Arc::new(BooleanArray::from(all_linkages.contaminated.clone())),
646        Arc::new(Float64Array::from(all_linkages.contamination.clone())),
647        Arc::new(BooleanArray::from(all_linkages.mixed.clone())),
648        Arc::new(BooleanArray::from(all_linkages.found_pure.clone())),
649        Arc::new(BooleanArray::from(all_linkages.found_contaminated.clone())),
650    ];
651
652    let batch = RecordBatch::try_new(schema.clone(), columns)?;
653    let file = std::fs::File::create(path)?;
654    let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
655    writer.write(&batch)?;
656    writer.close()?;
657    Ok(())
658}
659
660/// Write partition summaries to a Parquet file.
661pub fn write_partition_summaries(path: &Path, summaries: &[PartitionSummary]) -> Result<()> {
662    let schema = Arc::new(Schema::new(vec![
663        Field::new("id", DataType::LargeUtf8, false),
664        Field::new("start_night", DataType::Int64, false),
665        Field::new("end_night", DataType::Int64, false),
666        Field::new("observations", DataType::Int64, false),
667        Field::new("findable", DataType::Int64, true),
668        Field::new("found", DataType::Int64, true),
669        Field::new("completeness", DataType::Float64, true),
670        Field::new("pure_known", DataType::Int64, true),
671        Field::new("pure_unknown", DataType::Int64, true),
672        Field::new("contaminated", DataType::Int64, true),
673        Field::new("mixed", DataType::Int64, true),
674    ]));
675
676    let ids: Vec<String> = summaries.iter().map(|s| s.id.to_string()).collect();
677    let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
678
679    let columns: Vec<Arc<dyn Array>> = vec![
680        Arc::new(LargeStringArray::from(id_refs)),
681        Arc::new(Int64Array::from(
682            summaries.iter().map(|s| s.start_night).collect::<Vec<_>>(),
683        )),
684        Arc::new(Int64Array::from(
685            summaries.iter().map(|s| s.end_night).collect::<Vec<_>>(),
686        )),
687        Arc::new(Int64Array::from(
688            summaries.iter().map(|s| s.observations).collect::<Vec<_>>(),
689        )),
690        Arc::new(Int64Array::from(
691            summaries.iter().map(|s| s.findable).collect::<Vec<_>>(),
692        )),
693        Arc::new(Int64Array::from(
694            summaries.iter().map(|s| s.found).collect::<Vec<_>>(),
695        )),
696        Arc::new(Float64Array::from(
697            summaries.iter().map(|s| s.completeness).collect::<Vec<_>>(),
698        )),
699        Arc::new(Int64Array::from(
700            summaries.iter().map(|s| s.pure_known).collect::<Vec<_>>(),
701        )),
702        Arc::new(Int64Array::from(
703            summaries.iter().map(|s| s.pure_unknown).collect::<Vec<_>>(),
704        )),
705        Arc::new(Int64Array::from(
706            summaries.iter().map(|s| s.contaminated).collect::<Vec<_>>(),
707        )),
708        Arc::new(Int64Array::from(
709            summaries.iter().map(|s| s.mixed).collect::<Vec<_>>(),
710        )),
711    ];
712
713    let batch = RecordBatch::try_new(schema.clone(), columns)?;
714    let file = std::fs::File::create(path)?;
715    let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
716    writer.write(&batch)?;
717    writer.close()?;
718    Ok(())
719}
720
721/// Write findable observations to a Parquet file.
722pub fn write_findable_observations(
723    path: &Path,
724    findable: &FindableObservations,
725    id_interner: &StringInterner,
726) -> Result<()> {
727    let schema = Arc::new(Schema::new(vec![
728        Field::new("partition_id", DataType::LargeUtf8, false),
729        Field::new("object_id", DataType::LargeUtf8, false),
730        Field::new("discovery_night", DataType::Int64, true),
731    ]));
732
733    let partition_ids: Vec<String> = findable
734        .partition_id
735        .iter()
736        .map(|id| id.to_string())
737        .collect();
738    let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
739    let object_ids: Vec<&str> = findable
740        .object_id
741        .iter()
742        .map(|&id| id_interner.resolve(id).unwrap_or(""))
743        .collect();
744
745    let columns: Vec<Arc<dyn Array>> = vec![
746        Arc::new(LargeStringArray::from(partition_id_refs)),
747        Arc::new(LargeStringArray::from(object_ids)),
748        Arc::new(Int64Array::from(findable.discovery_night.clone())),
749    ];
750
751    let batch = RecordBatch::try_new(schema.clone(), columns)?;
752    let file = std::fs::File::create(path)?;
753    let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
754    writer.write(&batch)?;
755    writer.close()?;
756    Ok(())
757}
758
759/// Write ignored-linkage records to a Parquet file, de-interning IDs back to
760/// strings. Consumers can union with `all_linkages.parquet` by
761/// `(linkage_id, partition_id)`.
762pub fn write_ignored_linkages(
763    path: &Path,
764    ignored: &IgnoredLinkages,
765    id_interner: &StringInterner,
766) -> Result<()> {
767    let schema = Arc::new(Schema::new(vec![
768        Field::new("linkage_id", DataType::LargeUtf8, false),
769        Field::new("partition_id", DataType::LargeUtf8, false),
770        Field::new("reason", DataType::LargeUtf8, false),
771        Field::new("num_obs", DataType::Int64, false),
772        Field::new("num_members", DataType::Int64, false),
773    ]));
774
775    let linkage_ids: Vec<&str> = ignored
776        .linkage_id
777        .iter()
778        .map(|&id| id_interner.resolve(id).unwrap_or(""))
779        .collect();
780    let partition_ids: Vec<String> = ignored
781        .partition_id
782        .iter()
783        .map(|id| id.to_string())
784        .collect();
785    let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
786    let reasons: Vec<&str> = ignored.reason.iter().map(|r| r.as_str()).collect();
787
788    let columns: Vec<Arc<dyn Array>> = vec![
789        Arc::new(LargeStringArray::from(linkage_ids)),
790        Arc::new(LargeStringArray::from(partition_id_refs)),
791        Arc::new(LargeStringArray::from(reasons)),
792        Arc::new(Int64Array::from(ignored.num_obs.clone())),
793        Arc::new(Int64Array::from(ignored.num_members.clone())),
794    ];
795
796    let batch = RecordBatch::try_new(schema.clone(), columns)?;
797    let file = std::fs::File::create(path)?;
798    let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
799    writer.write(&batch)?;
800    writer.close()?;
801    Ok(())
802}
803
804/// Predefined column sets for common use cases.
805pub mod columns {
806    /// Columns needed for CIFI analysis (both singleton and tracklet metrics).
807    pub const CIFI: &[&str] = &[
808        "id",
809        "night",
810        "object_id",
811        "time",
812        "ra",
813        "dec",
814        "observatory_code",
815    ];
816
817    /// Minimal columns for DIFI linkage classification.
818    pub const DIFI: &[&str] = &["id", "night", "object_id"];
819}