1use std::path::Path;
11use std::sync::Arc;
12
13use arrow::array::{
14 Array, BooleanArray, Float64Array, Int64Array, LargeStringArray, RecordBatch, StringArray,
15};
16use arrow::datatypes::{DataType, Field, Schema};
17use parquet::arrow::ArrowWriter;
18use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21
22use crate::error::{Error, Result};
23use crate::partitions::PartitionSummary;
24use crate::types::{
25 AllLinkages, AllObjects, FindableObservations, IgnoredLinkages, LinkageMembers, Observations,
26 StringInterner,
27};
28
29fn get_string_column(batch: &RecordBatch, name: &str) -> Result<Vec<String>> {
35 let col = batch
36 .column_by_name(name)
37 .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
38
39 if let Some(arr) = col.as_any().downcast_ref::<LargeStringArray>() {
40 Ok((0..arr.len()).map(|i| arr.value(i).to_string()).collect())
41 } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
42 Ok((0..arr.len()).map(|i| arr.value(i).to_string()).collect())
43 } else {
44 Err(Error::InvalidInput(format!(
45 "Column {name} is not a string type"
46 )))
47 }
48}
49
50fn get_optional_string_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<String>>> {
52 let col = match batch.column_by_name(name) {
53 Some(c) => c,
54 None => return Ok(vec![None; batch.num_rows()]),
55 };
56
57 if let Some(arr) = col.as_any().downcast_ref::<LargeStringArray>() {
58 Ok((0..arr.len())
59 .map(|i| {
60 if arr.is_null(i) {
61 None
62 } else {
63 Some(arr.value(i).to_string())
64 }
65 })
66 .collect())
67 } else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
68 Ok((0..arr.len())
69 .map(|i| {
70 if arr.is_null(i) {
71 None
72 } else {
73 Some(arr.value(i).to_string())
74 }
75 })
76 .collect())
77 } else {
78 Err(Error::InvalidInput(format!(
79 "Column {name} is not a string type"
80 )))
81 }
82}
83
84fn get_f64_column(batch: &RecordBatch, name: &str) -> Result<Vec<f64>> {
86 let col = batch
87 .column_by_name(name)
88 .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
89 let arr = col
90 .as_any()
91 .downcast_ref::<Float64Array>()
92 .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Float64")))?;
93 Ok(arr.values().to_vec())
94}
95
96fn get_i64_column(batch: &RecordBatch, name: &str) -> Result<Vec<i64>> {
98 let col = batch
99 .column_by_name(name)
100 .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
101 let arr = col
102 .as_any()
103 .downcast_ref::<Int64Array>()
104 .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Int64")))?;
105 Ok(arr.values().to_vec())
106}
107
108fn get_time_as_mjd(batch: &RecordBatch) -> Result<Vec<f64>> {
110 let col = batch
111 .column_by_name("time")
112 .ok_or_else(|| Error::InvalidInput("Missing column: time".to_string()))?;
113 let struct_arr = col
114 .as_any()
115 .downcast_ref::<arrow::array::StructArray>()
116 .ok_or_else(|| Error::InvalidInput("Column time is not a struct".to_string()))?;
117
118 let days = struct_arr
119 .column_by_name("days")
120 .ok_or_else(|| Error::InvalidInput("time struct missing 'days' field".to_string()))?
121 .as_any()
122 .downcast_ref::<Int64Array>()
123 .ok_or_else(|| Error::InvalidInput("time.days is not Int64".to_string()))?;
124
125 let nanos = struct_arr
126 .column_by_name("nanos")
127 .ok_or_else(|| Error::InvalidInput("time struct missing 'nanos' field".to_string()))?
128 .as_any()
129 .downcast_ref::<Int64Array>()
130 .ok_or_else(|| Error::InvalidInput("time.nanos is not Int64".to_string()))?;
131
132 let nanos_per_day: f64 = 86_400.0 * 1e9;
133 Ok((0..days.len())
134 .map(|i| days.value(i) as f64 + nanos.value(i) as f64 / nanos_per_day)
135 .collect())
136}
137
138fn build_projection_mask(
145 parquet_schema: &parquet::schema::types::SchemaDescriptor,
146 arrow_schema: &Schema,
147 columns: &[&str],
148) -> parquet::arrow::ProjectionMask {
149 let indices: Vec<usize> = columns
150 .iter()
151 .filter_map(|name| arrow_schema.fields().iter().position(|f| f.name() == *name))
152 .collect();
153 parquet::arrow::ProjectionMask::roots(parquet_schema, indices)
154}
155
156pub fn read_observations(path: &Path) -> Result<(Observations, StringInterner, StringInterner)> {
165 read_observations_projected(path, None)
166}
167
168pub fn read_observations_projected(
173 path: &Path,
174 columns: Option<&[&str]>,
175) -> Result<(Observations, StringInterner, StringInterner)> {
176 let file = std::fs::File::open(path)?;
177 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
178
179 let reader = if let Some(cols) = columns {
180 let parquet_schema = builder.parquet_schema().clone();
181 let arrow_schema = builder.schema().clone();
182 let mask = build_projection_mask(&parquet_schema, &arrow_schema, cols);
183 builder.with_projection(mask).build()?
184 } else {
185 builder.build()?
186 };
187
188 let mut id_interner = StringInterner::new();
189 let mut obs_code_interner = StringInterner::new();
190
191 let mut all_id = Vec::new();
192 let mut all_time = Vec::new();
193 let mut all_ra = Vec::new();
194 let mut all_dec = Vec::new();
195 let mut all_obs_code = Vec::new();
196 let mut all_object_id = Vec::new();
197 let mut all_night = Vec::new();
198
199 for batch in reader {
200 let batch = batch?;
201 let n = batch.num_rows();
202
203 let ids_str = get_string_column(&batch, "id")?;
205 let night = get_i64_column(&batch, "night")?;
206
207 let time_mjd = if batch.column_by_name("time").is_some() {
209 get_time_as_mjd(&batch)?
210 } else {
211 vec![0.0; n]
212 };
213
214 let ra = if batch.column_by_name("ra").is_some() {
215 get_f64_column(&batch, "ra")?
216 } else {
217 vec![0.0; n]
218 };
219
220 let dec = if batch.column_by_name("dec").is_some() {
221 get_f64_column(&batch, "dec")?
222 } else {
223 vec![0.0; n]
224 };
225
226 let obs_codes_str = if batch.column_by_name("observatory_code").is_some() {
227 get_string_column(&batch, "observatory_code")?
228 } else {
229 vec![String::new(); n]
230 };
231
232 let object_ids_str = get_optional_string_column(&batch, "object_id")?;
233
234 for i in 0..n {
235 all_id.push(id_interner.intern(&ids_str[i]));
236 all_time.push(time_mjd[i]);
237 all_ra.push(ra[i]);
238 all_dec.push(dec[i]);
239 all_obs_code.push(obs_code_interner.intern(&obs_codes_str[i]) as u32);
240 all_object_id.push(
241 object_ids_str[i]
242 .as_ref()
243 .map(|s| id_interner.intern(s))
244 .unwrap_or(crate::types::NO_OBJECT),
245 );
246 all_night.push(night[i]);
247 }
248 }
249
250 let observations = Observations::new(
251 all_id,
252 all_time,
253 all_ra,
254 all_dec,
255 all_obs_code,
256 all_object_id,
257 all_night,
258 );
259
260 Ok((observations, id_interner, obs_code_interner))
261}
262
263pub fn read_linkage_members(
268 path: &Path,
269 id_interner: &mut StringInterner,
270) -> Result<LinkageMembers> {
271 let file = std::fs::File::open(path)?;
272 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
273
274 let mut all_linkage_id = Vec::new();
275 let mut all_obs_id = Vec::new();
276
277 for batch in reader {
278 let batch = batch?;
279
280 let linkage_ids_str = get_string_column(&batch, "linkage_id")?;
281 let obs_ids_str = get_string_column(&batch, "obs_id")?;
282
283 for i in 0..batch.num_rows() {
284 all_linkage_id.push(id_interner.intern(&linkage_ids_str[i]));
285 all_obs_id.push(id_interner.intern(&obs_ids_str[i]));
286 }
287 }
288
289 Ok(LinkageMembers {
290 linkage_id: all_linkage_id,
291 obs_id: all_obs_id,
292 })
293}
294
295fn get_u64_string_column(batch: &RecordBatch, name: &str) -> Result<Vec<u64>> {
297 let strings = get_string_column(batch, name)?;
298 strings
299 .iter()
300 .map(|s| {
301 s.parse::<u64>().map_err(|e| {
302 Error::InvalidInput(format!("Column {name}: could not parse {s:?} as u64: {e}"))
303 })
304 })
305 .collect()
306}
307
308fn get_optional_i64_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<i64>>> {
310 let col = batch
311 .column_by_name(name)
312 .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
313 let arr = col
314 .as_any()
315 .downcast_ref::<Int64Array>()
316 .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Int64")))?;
317 Ok((0..arr.len())
318 .map(|i| {
319 if arr.is_null(i) {
320 None
321 } else {
322 Some(arr.value(i))
323 }
324 })
325 .collect())
326}
327
328fn get_optional_f64_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<f64>>> {
330 let col = batch
331 .column_by_name(name)
332 .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
333 let arr = col
334 .as_any()
335 .downcast_ref::<Float64Array>()
336 .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Float64")))?;
337 Ok((0..arr.len())
338 .map(|i| {
339 if arr.is_null(i) {
340 None
341 } else {
342 Some(arr.value(i))
343 }
344 })
345 .collect())
346}
347
348fn get_optional_bool_column(batch: &RecordBatch, name: &str) -> Result<Vec<Option<bool>>> {
350 let col = batch
351 .column_by_name(name)
352 .ok_or_else(|| Error::InvalidInput(format!("Missing column: {name}")))?;
353 let arr = col
354 .as_any()
355 .downcast_ref::<BooleanArray>()
356 .ok_or_else(|| Error::InvalidInput(format!("Column {name} is not Boolean")))?;
357 Ok((0..arr.len())
358 .map(|i| {
359 if arr.is_null(i) {
360 None
361 } else {
362 Some(arr.value(i))
363 }
364 })
365 .collect())
366}
367
368pub fn read_all_objects(path: &Path, id_interner: &mut StringInterner) -> Result<AllObjects> {
377 let file = std::fs::File::open(path)?;
378 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
379
380 let mut out = AllObjects::default();
381
382 for batch in reader {
383 let batch = batch?;
384
385 let object_ids_str = get_string_column(&batch, "object_id")?;
386 let partition_ids = get_u64_string_column(&batch, "partition_id")?;
387 let mjd_min = get_f64_column(&batch, "mjd_min")?;
388 let mjd_max = get_f64_column(&batch, "mjd_max")?;
389 let arc_length = get_f64_column(&batch, "arc_length")?;
390 let num_obs = get_i64_column(&batch, "num_obs")?;
391 let num_observatories = get_i64_column(&batch, "num_observatories")?;
392 let findable = get_optional_bool_column(&batch, "findable")?;
393 let found_pure = get_i64_column(&batch, "found_pure")?;
394 let found_contaminated = get_i64_column(&batch, "found_contaminated")?;
395 let pure = get_i64_column(&batch, "pure")?;
396 let pure_complete = get_i64_column(&batch, "pure_complete")?;
397 let contaminated = get_i64_column(&batch, "contaminated")?;
398 let contaminant = get_i64_column(&batch, "contaminant")?;
399 let mixed = get_i64_column(&batch, "mixed")?;
400 let obs_in_pure = get_i64_column(&batch, "obs_in_pure")?;
401 let obs_in_pure_complete = get_i64_column(&batch, "obs_in_pure_complete")?;
402 let obs_in_contaminated = get_i64_column(&batch, "obs_in_contaminated")?;
403 let obs_as_contaminant = get_i64_column(&batch, "obs_as_contaminant")?;
404 let obs_in_mixed = get_i64_column(&batch, "obs_in_mixed")?;
405
406 for s in &object_ids_str {
407 out.object_id.push(id_interner.intern(s));
408 }
409 out.partition_id.extend(partition_ids);
410 out.mjd_min.extend(mjd_min);
411 out.mjd_max.extend(mjd_max);
412 out.arc_length.extend(arc_length);
413 out.num_obs.extend(num_obs);
414 out.num_observatories.extend(num_observatories);
415 out.findable.extend(findable);
416 out.found_pure.extend(found_pure);
417 out.found_contaminated.extend(found_contaminated);
418 out.pure.extend(pure);
419 out.pure_complete.extend(pure_complete);
420 out.contaminated.extend(contaminated);
421 out.contaminant.extend(contaminant);
422 out.mixed.extend(mixed);
423 out.obs_in_pure.extend(obs_in_pure);
424 out.obs_in_pure_complete.extend(obs_in_pure_complete);
425 out.obs_in_contaminated.extend(obs_in_contaminated);
426 out.obs_as_contaminant.extend(obs_as_contaminant);
427 out.obs_in_mixed.extend(obs_in_mixed);
428 }
429
430 Ok(out)
431}
432
433pub fn read_partition_summaries(path: &Path) -> Result<Vec<PartitionSummary>> {
437 let file = std::fs::File::open(path)?;
438 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
439
440 let mut out = Vec::new();
441
442 for batch in reader {
443 let batch = batch?;
444
445 let ids = get_u64_string_column(&batch, "id")?;
446 let start_night = get_i64_column(&batch, "start_night")?;
447 let end_night = get_i64_column(&batch, "end_night")?;
448 let observations = get_i64_column(&batch, "observations")?;
449 let findable = get_optional_i64_column(&batch, "findable")?;
450 let found = get_optional_i64_column(&batch, "found")?;
451 let completeness = get_optional_f64_column(&batch, "completeness")?;
452 let pure_known = get_optional_i64_column(&batch, "pure_known")?;
453 let pure_unknown = get_optional_i64_column(&batch, "pure_unknown")?;
454 let contaminated = get_optional_i64_column(&batch, "contaminated")?;
455 let mixed = get_optional_i64_column(&batch, "mixed")?;
456
457 for i in 0..batch.num_rows() {
458 out.push(PartitionSummary {
459 id: ids[i],
460 start_night: start_night[i],
461 end_night: end_night[i],
462 observations: observations[i],
463 findable: findable[i],
464 found: found[i],
465 completeness: completeness[i],
466 pure_known: pure_known[i],
467 pure_unknown: pure_unknown[i],
468 contaminated: contaminated[i],
469 mixed: mixed[i],
470 });
471 }
472 }
473
474 Ok(out)
475}
476
477pub fn read_findable_observations(
487 path: &Path,
488 id_interner: &mut StringInterner,
489) -> Result<FindableObservations> {
490 let file = std::fs::File::open(path)?;
491 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
492
493 let mut out = FindableObservations::default();
494
495 for batch in reader {
496 let batch = batch?;
497
498 let partition_ids = get_u64_string_column(&batch, "partition_id")?;
499 let object_ids_str = get_string_column(&batch, "object_id")?;
500 let discovery_night = get_optional_i64_column(&batch, "discovery_night")?;
501
502 for i in 0..batch.num_rows() {
503 out.partition_id.push(partition_ids[i]);
504 out.object_id.push(id_interner.intern(&object_ids_str[i]));
505 out.discovery_night.push(discovery_night[i]);
506 out.obs_ids.push(None);
507 }
508 }
509
510 Ok(out)
511}
512
513fn write_props() -> WriterProperties {
518 WriterProperties::builder()
519 .set_compression(Compression::SNAPPY)
520 .build()
521}
522
523pub fn write_all_objects(
525 path: &Path,
526 all_objects: &AllObjects,
527 id_interner: &StringInterner,
528) -> Result<()> {
529 let schema = Arc::new(Schema::new(vec![
530 Field::new("object_id", DataType::LargeUtf8, false),
531 Field::new("partition_id", DataType::LargeUtf8, false),
532 Field::new("mjd_min", DataType::Float64, false),
533 Field::new("mjd_max", DataType::Float64, false),
534 Field::new("arc_length", DataType::Float64, false),
535 Field::new("num_obs", DataType::Int64, false),
536 Field::new("num_observatories", DataType::Int64, false),
537 Field::new("findable", DataType::Boolean, true),
538 Field::new("found_pure", DataType::Int64, false),
539 Field::new("found_contaminated", DataType::Int64, false),
540 Field::new("pure", DataType::Int64, false),
541 Field::new("pure_complete", DataType::Int64, false),
542 Field::new("contaminated", DataType::Int64, false),
543 Field::new("contaminant", DataType::Int64, false),
544 Field::new("mixed", DataType::Int64, false),
545 Field::new("obs_in_pure", DataType::Int64, false),
546 Field::new("obs_in_pure_complete", DataType::Int64, false),
547 Field::new("obs_in_contaminated", DataType::Int64, false),
548 Field::new("obs_as_contaminant", DataType::Int64, false),
549 Field::new("obs_in_mixed", DataType::Int64, false),
550 ]));
551
552 let object_ids: Vec<&str> = all_objects
553 .object_id
554 .iter()
555 .map(|&id| id_interner.resolve(id).unwrap_or(""))
556 .collect();
557 let partition_ids: Vec<String> = all_objects
558 .partition_id
559 .iter()
560 .map(|id| id.to_string())
561 .collect();
562 let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
563
564 let columns: Vec<Arc<dyn Array>> = vec![
565 Arc::new(LargeStringArray::from(object_ids)),
566 Arc::new(LargeStringArray::from(partition_id_refs)),
567 Arc::new(Float64Array::from(all_objects.mjd_min.clone())),
568 Arc::new(Float64Array::from(all_objects.mjd_max.clone())),
569 Arc::new(Float64Array::from(all_objects.arc_length.clone())),
570 Arc::new(Int64Array::from(all_objects.num_obs.clone())),
571 Arc::new(Int64Array::from(all_objects.num_observatories.clone())),
572 Arc::new(BooleanArray::from(all_objects.findable.clone())),
573 Arc::new(Int64Array::from(all_objects.found_pure.clone())),
574 Arc::new(Int64Array::from(all_objects.found_contaminated.clone())),
575 Arc::new(Int64Array::from(all_objects.pure.clone())),
576 Arc::new(Int64Array::from(all_objects.pure_complete.clone())),
577 Arc::new(Int64Array::from(all_objects.contaminated.clone())),
578 Arc::new(Int64Array::from(all_objects.contaminant.clone())),
579 Arc::new(Int64Array::from(all_objects.mixed.clone())),
580 Arc::new(Int64Array::from(all_objects.obs_in_pure.clone())),
581 Arc::new(Int64Array::from(all_objects.obs_in_pure_complete.clone())),
582 Arc::new(Int64Array::from(all_objects.obs_in_contaminated.clone())),
583 Arc::new(Int64Array::from(all_objects.obs_as_contaminant.clone())),
584 Arc::new(Int64Array::from(all_objects.obs_in_mixed.clone())),
585 ];
586
587 let batch = RecordBatch::try_new(schema.clone(), columns)?;
588 let file = std::fs::File::create(path)?;
589 let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
590 writer.write(&batch)?;
591 writer.close()?;
592 Ok(())
593}
594
595pub fn write_all_linkages(
597 path: &Path,
598 all_linkages: &AllLinkages,
599 id_interner: &StringInterner,
600) -> Result<()> {
601 let schema = Arc::new(Schema::new(vec![
602 Field::new("linkage_id", DataType::LargeUtf8, false),
603 Field::new("partition_id", DataType::LargeUtf8, false),
604 Field::new("linked_object_id", DataType::LargeUtf8, true),
605 Field::new("num_obs", DataType::Int64, false),
606 Field::new("num_obs_outside_partition", DataType::Int64, false),
607 Field::new("num_members", DataType::Int64, false),
608 Field::new("pure", DataType::Boolean, false),
609 Field::new("pure_complete", DataType::Boolean, false),
610 Field::new("contaminated", DataType::Boolean, false),
611 Field::new("contamination", DataType::Float64, false),
612 Field::new("mixed", DataType::Boolean, false),
613 Field::new("found_pure", DataType::Boolean, false),
614 Field::new("found_contaminated", DataType::Boolean, false),
615 ]));
616
617 let linkage_ids: Vec<&str> = all_linkages
618 .linkage_id
619 .iter()
620 .map(|&id| id_interner.resolve(id).unwrap_or(""))
621 .collect();
622 let partition_ids: Vec<String> = all_linkages
623 .partition_id
624 .iter()
625 .map(|id| id.to_string())
626 .collect();
627 let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
628 let linked_obj_ids: Vec<Option<&str>> = all_linkages
629 .linked_object_id
630 .iter()
631 .map(|&id| id_interner.resolve(id))
632 .collect();
633
634 let columns: Vec<Arc<dyn Array>> = vec![
635 Arc::new(LargeStringArray::from(linkage_ids)),
636 Arc::new(LargeStringArray::from(partition_id_refs)),
637 Arc::new(LargeStringArray::from(linked_obj_ids)),
638 Arc::new(Int64Array::from(all_linkages.num_obs.clone())),
639 Arc::new(Int64Array::from(
640 all_linkages.num_obs_outside_partition.clone(),
641 )),
642 Arc::new(Int64Array::from(all_linkages.num_members.clone())),
643 Arc::new(BooleanArray::from(all_linkages.pure.clone())),
644 Arc::new(BooleanArray::from(all_linkages.pure_complete.clone())),
645 Arc::new(BooleanArray::from(all_linkages.contaminated.clone())),
646 Arc::new(Float64Array::from(all_linkages.contamination.clone())),
647 Arc::new(BooleanArray::from(all_linkages.mixed.clone())),
648 Arc::new(BooleanArray::from(all_linkages.found_pure.clone())),
649 Arc::new(BooleanArray::from(all_linkages.found_contaminated.clone())),
650 ];
651
652 let batch = RecordBatch::try_new(schema.clone(), columns)?;
653 let file = std::fs::File::create(path)?;
654 let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
655 writer.write(&batch)?;
656 writer.close()?;
657 Ok(())
658}
659
660pub fn write_partition_summaries(path: &Path, summaries: &[PartitionSummary]) -> Result<()> {
662 let schema = Arc::new(Schema::new(vec![
663 Field::new("id", DataType::LargeUtf8, false),
664 Field::new("start_night", DataType::Int64, false),
665 Field::new("end_night", DataType::Int64, false),
666 Field::new("observations", DataType::Int64, false),
667 Field::new("findable", DataType::Int64, true),
668 Field::new("found", DataType::Int64, true),
669 Field::new("completeness", DataType::Float64, true),
670 Field::new("pure_known", DataType::Int64, true),
671 Field::new("pure_unknown", DataType::Int64, true),
672 Field::new("contaminated", DataType::Int64, true),
673 Field::new("mixed", DataType::Int64, true),
674 ]));
675
676 let ids: Vec<String> = summaries.iter().map(|s| s.id.to_string()).collect();
677 let id_refs: Vec<&str> = ids.iter().map(|s| s.as_str()).collect();
678
679 let columns: Vec<Arc<dyn Array>> = vec![
680 Arc::new(LargeStringArray::from(id_refs)),
681 Arc::new(Int64Array::from(
682 summaries.iter().map(|s| s.start_night).collect::<Vec<_>>(),
683 )),
684 Arc::new(Int64Array::from(
685 summaries.iter().map(|s| s.end_night).collect::<Vec<_>>(),
686 )),
687 Arc::new(Int64Array::from(
688 summaries.iter().map(|s| s.observations).collect::<Vec<_>>(),
689 )),
690 Arc::new(Int64Array::from(
691 summaries.iter().map(|s| s.findable).collect::<Vec<_>>(),
692 )),
693 Arc::new(Int64Array::from(
694 summaries.iter().map(|s| s.found).collect::<Vec<_>>(),
695 )),
696 Arc::new(Float64Array::from(
697 summaries.iter().map(|s| s.completeness).collect::<Vec<_>>(),
698 )),
699 Arc::new(Int64Array::from(
700 summaries.iter().map(|s| s.pure_known).collect::<Vec<_>>(),
701 )),
702 Arc::new(Int64Array::from(
703 summaries.iter().map(|s| s.pure_unknown).collect::<Vec<_>>(),
704 )),
705 Arc::new(Int64Array::from(
706 summaries.iter().map(|s| s.contaminated).collect::<Vec<_>>(),
707 )),
708 Arc::new(Int64Array::from(
709 summaries.iter().map(|s| s.mixed).collect::<Vec<_>>(),
710 )),
711 ];
712
713 let batch = RecordBatch::try_new(schema.clone(), columns)?;
714 let file = std::fs::File::create(path)?;
715 let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
716 writer.write(&batch)?;
717 writer.close()?;
718 Ok(())
719}
720
721pub fn write_findable_observations(
723 path: &Path,
724 findable: &FindableObservations,
725 id_interner: &StringInterner,
726) -> Result<()> {
727 let schema = Arc::new(Schema::new(vec![
728 Field::new("partition_id", DataType::LargeUtf8, false),
729 Field::new("object_id", DataType::LargeUtf8, false),
730 Field::new("discovery_night", DataType::Int64, true),
731 ]));
732
733 let partition_ids: Vec<String> = findable
734 .partition_id
735 .iter()
736 .map(|id| id.to_string())
737 .collect();
738 let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
739 let object_ids: Vec<&str> = findable
740 .object_id
741 .iter()
742 .map(|&id| id_interner.resolve(id).unwrap_or(""))
743 .collect();
744
745 let columns: Vec<Arc<dyn Array>> = vec![
746 Arc::new(LargeStringArray::from(partition_id_refs)),
747 Arc::new(LargeStringArray::from(object_ids)),
748 Arc::new(Int64Array::from(findable.discovery_night.clone())),
749 ];
750
751 let batch = RecordBatch::try_new(schema.clone(), columns)?;
752 let file = std::fs::File::create(path)?;
753 let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
754 writer.write(&batch)?;
755 writer.close()?;
756 Ok(())
757}
758
759pub fn write_ignored_linkages(
763 path: &Path,
764 ignored: &IgnoredLinkages,
765 id_interner: &StringInterner,
766) -> Result<()> {
767 let schema = Arc::new(Schema::new(vec![
768 Field::new("linkage_id", DataType::LargeUtf8, false),
769 Field::new("partition_id", DataType::LargeUtf8, false),
770 Field::new("reason", DataType::LargeUtf8, false),
771 Field::new("num_obs", DataType::Int64, false),
772 Field::new("num_members", DataType::Int64, false),
773 ]));
774
775 let linkage_ids: Vec<&str> = ignored
776 .linkage_id
777 .iter()
778 .map(|&id| id_interner.resolve(id).unwrap_or(""))
779 .collect();
780 let partition_ids: Vec<String> = ignored
781 .partition_id
782 .iter()
783 .map(|id| id.to_string())
784 .collect();
785 let partition_id_refs: Vec<&str> = partition_ids.iter().map(|s| s.as_str()).collect();
786 let reasons: Vec<&str> = ignored.reason.iter().map(|r| r.as_str()).collect();
787
788 let columns: Vec<Arc<dyn Array>> = vec![
789 Arc::new(LargeStringArray::from(linkage_ids)),
790 Arc::new(LargeStringArray::from(partition_id_refs)),
791 Arc::new(LargeStringArray::from(reasons)),
792 Arc::new(Int64Array::from(ignored.num_obs.clone())),
793 Arc::new(Int64Array::from(ignored.num_members.clone())),
794 ];
795
796 let batch = RecordBatch::try_new(schema.clone(), columns)?;
797 let file = std::fs::File::create(path)?;
798 let mut writer = ArrowWriter::try_new(file, schema, Some(write_props()))?;
799 writer.write(&batch)?;
800 writer.close()?;
801 Ok(())
802}
803
804pub mod columns {
806 pub const CIFI: &[&str] = &[
808 "id",
809 "night",
810 "object_id",
811 "time",
812 "ra",
813 "dec",
814 "observatory_code",
815 ];
816
817 pub const DIFI: &[&str] = &["id", "night", "object_id"];
819}