use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::time::Instant;
use anyhow::{Context, Result, bail};
use clap::Args as ClapArgs;
use difi::cifi::analyze_observations;
use difi::io::{
columns, read_observations_projected, write_all_objects, write_findable_observations,
write_partition_summaries,
};
use difi::partitions::PartitionSummary;
use difi::types::{
AllObjects, FindableObservations, ObservationTable, Observations, StringInterner,
};
use crate::common::{
self, HostInfo, InputFingerprint, Manifest, MetricArgs, PartitionArgs, ProgressEvent,
RunContext, ScenarioEntry, ScenarioManifest, ensure_dir, fingerprint_input, now_unix_s,
read_scenarios, version_string, write_manifest,
};
#[derive(ClapArgs, Debug)]
pub struct Args {
#[arg(short = 'i', long)]
pub observations: Option<PathBuf>,
#[arg(short = 'o', long)]
pub output_dir: PathBuf,
#[arg(long)]
pub scenarios: Option<PathBuf>,
#[command(flatten)]
pub metric: MetricArgs,
#[command(flatten)]
pub partition: PartitionArgs,
}
pub fn run(args: Args, ctx: &RunContext) -> Result<()> {
ensure_dir(&args.output_dir)?;
if let Some(scenarios_path) = &args.scenarios {
run_batch(&args, scenarios_path, ctx)
} else {
run_single(&args, ctx)
}
}
fn run_single(args: &Args, ctx: &RunContext) -> Result<()> {
let obs_path = args.observations.as_ref().ok_or_else(|| {
anyhow::anyhow!("--observations is required (or pass --scenarios <FILE>)")
})?;
ctx.emit(ProgressEvent::Start {
subcommand: "analyze-observations",
step: "cifi",
input: &obs_path.display().to_string(),
});
let obs_fp = fingerprint_input(obs_path)?;
let load_t0 = Instant::now();
let (obs, id_interner, _obs_code_interner) =
read_observations_projected(obs_path, Some(columns::CIFI))
.with_context(|| format!("read observations from {}", obs_path.display()))?;
ctx.emit(ProgressEvent::LoadedObservations {
count: obs.len(),
elapsed_s: load_t0.elapsed().as_secs_f64(),
});
if obs.len() == 0 {
bail!("no observations in {}", obs_path.display());
}
let scenario_outputs = run_cifi_scenario(
"default",
&obs,
&id_interner,
&args.metric,
&args.partition,
&args.output_dir,
ctx,
)?;
let manifest = Manifest {
difi_version: version_string(),
command: ctx.command.clone(),
observations_input: obs_fp,
linkages_input: None,
reused_cifi: None,
warnings: Default::default(),
scenarios: vec![scenario_outputs],
host: HostInfo::capture(),
started_at_unix_s: ctx.started_at_unix_s(),
finished_at_unix_s: now_unix_s(),
};
write_manifest(&args.output_dir.join("run_manifest.json"), &manifest)?;
ctx.emit(ProgressEvent::Done {
total_elapsed_s: ctx.elapsed_s(),
output_dir: &args.output_dir,
});
Ok(())
}
fn run_batch(args: &Args, scenarios_path: &Path, ctx: &RunContext) -> Result<()> {
let scenarios = read_scenarios(scenarios_path)?;
ctx.emit(ProgressEvent::Start {
subcommand: "analyze-observations",
step: "cifi",
input: &scenarios_path.display().to_string(),
});
let mut scenario_manifests = Vec::new();
let mut fingerprint_cache: BTreeMap<PathBuf, InputFingerprint> = BTreeMap::new();
let mut obs_cache: BTreeMap<PathBuf, (Observations, StringInterner)> = BTreeMap::new();
let mut top_level_obs_fp: Option<InputFingerprint> = None;
for entry in &scenarios.scenarios {
let obs_path = resolve_obs_path(entry, &scenarios, args)?;
let fp = if let Some(fp) = fingerprint_cache.get(&obs_path) {
fp.clone()
} else {
let fp = fingerprint_input(&obs_path)?;
fingerprint_cache.insert(obs_path.clone(), fp.clone());
fp
};
if top_level_obs_fp.is_none() {
top_level_obs_fp = Some(fp.clone());
}
if !obs_cache.contains_key(&obs_path) {
let load_t0 = Instant::now();
let (obs, id_interner, _obs_code_interner) =
read_observations_projected(&obs_path, Some(columns::CIFI))
.with_context(|| format!("read observations from {}", obs_path.display()))?;
ctx.emit(ProgressEvent::LoadedObservations {
count: obs.len(),
elapsed_s: load_t0.elapsed().as_secs_f64(),
});
if obs.len() == 0 {
bail!("no observations in {}", obs_path.display());
}
obs_cache.insert(obs_path.clone(), (obs, id_interner));
}
let (obs, id_interner) = obs_cache.get(&obs_path).unwrap();
let scen_dir = args.output_dir.join(&entry.name);
ensure_dir(&scen_dir)?;
let metric = entry.to_metric_args();
let partition = entry.to_partition_args();
let sm = run_cifi_scenario(
&entry.name,
obs,
id_interner,
&metric,
&partition,
&scen_dir,
ctx,
)?;
scenario_manifests.push(sm);
}
let obs_fp = top_level_obs_fp.ok_or_else(|| anyhow::anyhow!("no scenarios executed"))?;
let manifest = Manifest {
difi_version: version_string(),
command: ctx.command.clone(),
observations_input: obs_fp,
linkages_input: None,
reused_cifi: None,
warnings: Default::default(),
scenarios: scenario_manifests,
host: HostInfo::capture(),
started_at_unix_s: ctx.started_at_unix_s(),
finished_at_unix_s: now_unix_s(),
};
write_manifest(&args.output_dir.join("run_manifest.json"), &manifest)?;
ctx.emit(ProgressEvent::Done {
total_elapsed_s: ctx.elapsed_s(),
output_dir: &args.output_dir,
});
Ok(())
}
fn resolve_obs_path(
entry: &ScenarioEntry,
scenarios: &common::ScenariosFile,
args: &Args,
) -> Result<PathBuf> {
entry
.observations
.clone()
.or_else(|| args.observations.clone())
.or_else(|| scenarios.defaults.observations.clone())
.ok_or_else(|| {
anyhow::anyhow!(
"scenario {} has no observations path (set per-scenario, --observations, or [defaults])",
entry.name
)
})
}
fn run_cifi_scenario(
name: &str,
obs: &Observations,
id_interner: &StringInterner,
metric_args: &MetricArgs,
partition_args: &PartitionArgs,
out_dir: &Path,
ctx: &RunContext,
) -> Result<ScenarioManifest> {
let metric = metric_args.build();
let partitions = partition_args.build(&obs.night)?;
ctx.emit(ProgressEvent::ScenarioStart {
name,
metric: metric_args.metric.as_str(),
partitions: partitions.len(),
});
let t0 = Instant::now();
let (all_objects, findable, mut summaries) =
analyze_observations(obs, Some(&partitions), metric.as_ref())
.context("analyze_observations failed")?;
let elapsed = t0.elapsed().as_secs_f64();
summaries.sort_by_key(|s| s.start_night);
let findable_count: i64 = summaries.iter().filter_map(|s| s.findable).sum();
let (unique_findable_count, _unique_found) = common::compute_unique_counts(&all_objects);
let outputs = write_cifi_outputs(out_dir, &all_objects, &findable, &summaries, id_interner)?;
ctx.emit(ProgressEvent::ScenarioDone {
name,
findable: findable_count,
found: None,
elapsed_s: elapsed,
});
Ok(ScenarioManifest {
name: name.to_string(),
metric: metric_args.to_manifest(),
partitions: partition_args.to_manifest(),
cifi_elapsed_s: elapsed,
difi_elapsed_s: None,
findable_count,
found_count: None,
unique_findable_count,
unique_found_count: None,
unique_completeness: None,
outputs,
})
}
pub(crate) fn write_cifi_outputs(
out_dir: &Path,
all_objects: &AllObjects,
findable: &FindableObservations,
summaries: &[PartitionSummary],
id_interner: &StringInterner,
) -> Result<BTreeMap<String, String>> {
let all_objects_path = out_dir.join("all_objects.parquet");
let findable_path = out_dir.join("findable_observations.parquet");
let summaries_path = out_dir.join("partition_summaries.parquet");
write_all_objects(&all_objects_path, all_objects, id_interner)
.with_context(|| format!("write {}", all_objects_path.display()))?;
write_findable_observations(&findable_path, findable, id_interner)
.with_context(|| format!("write {}", findable_path.display()))?;
write_partition_summaries(&summaries_path, summaries)
.with_context(|| format!("write {}", summaries_path.display()))?;
let mut outputs = BTreeMap::new();
outputs.insert(
"all_objects".to_string(),
all_objects_path.display().to_string(),
);
outputs.insert(
"findable_observations".to_string(),
findable_path.display().to_string(),
);
outputs.insert(
"partition_summaries".to_string(),
summaries_path.display().to_string(),
);
Ok(outputs)
}