use std::collections::HashSet;
use std::path::Path;
use chrono::{Datelike, Duration, NaiveDate};
use indexmap::IndexMap;
use super::super::schema::{Blueprint, CalendarLink, JunctionEdge, NodeSpec};
use super::{csv_cell_to_value, resolve_csv_path, resolve_source_spec, resolve_source_spec_mut};
#[allow(clippy::too_many_arguments)]
pub fn run_calendar(
blueprint: &mut Blueprint,
input_root: &Path,
node_type: &str,
start: &str,
end: &str,
next_edge: &str,
in_month_edge: Option<&str>,
in_quarter_edge: Option<&str>,
in_year_edge: Option<&str>,
links: &[CalendarLink],
) -> Result<(), String> {
let start_d = NaiveDate::parse_from_str(start, "%Y-%m-%d")
.map_err(|e| format!("calendar: invalid start '{}': {}", start, e))?;
let end_d = NaiveDate::parse_from_str(end, "%Y-%m-%d")
.map_err(|e| format!("calendar: invalid end '{}': {}", end, e))?;
if start_d > end_d {
return Err(format!(
"calendar: start ({}) must be <= end ({})",
start, end
));
}
let computed = input_root.join("computed");
std::fs::create_dir_all(&computed)
.map_err(|e| format!("calendar: create {}: {}", computed.display(), e))?;
let date_csv_path = computed.join(format!("calendar_{}.csv", sanitize(node_type)));
let mut date_writer = csv::WriterBuilder::new()
.from_path(&date_csv_path)
.map_err(|e| format!("calendar: open {}: {}", date_csv_path.display(), e))?;
date_writer
.write_record(["iso", "year", "month", "day", "quarter", "weekday"])
.map_err(|e| format!("calendar: write Date header: {}", e))?;
let mut months: HashSet<String> = HashSet::new();
let mut quarters: HashSet<String> = HashSet::new();
let mut years: HashSet<i32> = HashSet::new();
let mut d = start_d;
while d <= end_d {
let iso = d.format("%Y-%m-%d").to_string();
let q = (d.month() - 1) / 3 + 1;
let month_iso = d.format("%Y-%m").to_string();
let quarter_iso = format!("{}-Q{}", d.year(), q);
let weekday = d.format("%A").to_string();
date_writer
.write_record([
iso.as_str(),
&d.year().to_string(),
&d.month().to_string(),
&d.day().to_string(),
&q.to_string(),
weekday.as_str(),
])
.map_err(|e| format!("calendar: write Date row: {}", e))?;
months.insert(month_iso);
quarters.insert(quarter_iso);
years.insert(d.year());
d += Duration::days(1);
}
date_writer
.flush()
.map_err(|e| format!("calendar: flush Date: {}", e))?;
drop(date_writer);
let next_csv_path = computed.join(format!(
"calendar_{}_{}.csv",
sanitize(node_type),
sanitize(next_edge)
));
let mut nd_writer = csv::WriterBuilder::new()
.from_path(&next_csv_path)
.map_err(|e| format!("calendar: open {}: {}", next_csv_path.display(), e))?;
nd_writer
.write_record(["iso", "next_iso"])
.map_err(|e| format!("calendar: write NEXT_DAY header: {}", e))?;
let mut d = start_d;
while d < end_d {
let n = d + Duration::days(1);
nd_writer
.write_record([
d.format("%Y-%m-%d").to_string().as_str(),
n.format("%Y-%m-%d").to_string().as_str(),
])
.map_err(|e| format!("calendar: write NEXT_DAY row: {}", e))?;
d = n;
}
nd_writer
.flush()
.map_err(|e| format!("calendar: flush NEXT_DAY: {}", e))?;
drop(nd_writer);
let mut date_spec = NodeSpec {
csv: Some(format!("computed/calendar_{}.csv", sanitize(node_type))),
pk: Some("iso".to_string()),
title: Some("iso".to_string()),
..NodeSpec::default()
};
for (col, ty) in [
("year", "int"),
("month", "int"),
("day", "int"),
("quarter", "int"),
("weekday", "string"),
] {
date_spec.properties.insert(col.to_string(), ty.to_string());
}
date_spec.connections.junction_edges.insert(
next_edge.to_string(),
JunctionEdge {
csv: format!(
"computed/calendar_{}_{}.csv",
sanitize(node_type),
sanitize(next_edge)
),
source_fk: "iso".to_string(),
target: node_type.to_string(),
target_fk: "next_iso".to_string(),
properties: vec![],
property_types: IndexMap::new(),
},
);
if let Some(edge_name) = in_month_edge {
write_hierarchy(
blueprint,
input_root,
"Month",
months.iter().cloned().collect(),
node_type,
edge_name,
"iso",
"month_iso",
|iso| iso.get(..7).unwrap_or("").to_string(),
)?;
}
if let Some(edge_name) = in_quarter_edge {
write_hierarchy(
blueprint,
input_root,
"Quarter",
quarters.iter().cloned().collect(),
node_type,
edge_name,
"iso",
"quarter_iso",
|iso| {
let m: u32 = iso.get(5..7).unwrap_or("01").parse().unwrap_or(1);
let q = (m - 1) / 3 + 1;
format!("{}-Q{}", iso.get(..4).unwrap_or(""), q)
},
)?;
}
if let Some(edge_name) = in_year_edge {
write_hierarchy_year(blueprint, input_root, years, node_type, edge_name)?;
}
blueprint.nodes.insert(node_type.to_string(), date_spec);
for link in links {
write_link(blueprint, input_root, node_type, link)?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn write_hierarchy<F>(
blueprint: &mut Blueprint,
input_root: &Path,
hier_type: &str,
keys: Vec<String>,
date_type: &str,
edge_name: &str,
date_pk_col: &str,
hier_fk_col: &str,
key_from_iso: F,
) -> Result<(), String>
where
F: Fn(&str) -> String,
{
let computed = input_root.join("computed");
let node_csv_path = computed.join(format!("calendar_{}.csv", sanitize(hier_type)));
let mut w = csv::WriterBuilder::new()
.from_path(&node_csv_path)
.map_err(|e| format!("calendar: open {}: {}", node_csv_path.display(), e))?;
w.write_record([hier_fk_col])
.map_err(|e| format!("calendar: write hier header: {}", e))?;
let mut sorted = keys.clone();
sorted.sort();
for k in &sorted {
w.write_record([k.as_str()])
.map_err(|e| format!("calendar: write hier row: {}", e))?;
}
w.flush()
.map_err(|e| format!("calendar: flush hier: {}", e))?;
drop(w);
let junc_csv_path = computed.join(format!(
"calendar_{}_{}.csv",
sanitize(date_type),
sanitize(edge_name)
));
let mut jw = csv::WriterBuilder::new()
.from_path(&junc_csv_path)
.map_err(|e| format!("calendar: open {}: {}", junc_csv_path.display(), e))?;
jw.write_record([date_pk_col, hier_fk_col])
.map_err(|e| format!("calendar: write hier junction header: {}", e))?;
let date_csv_path = computed.join(format!("calendar_{}.csv", sanitize(date_type)));
let mut rdr = csv::ReaderBuilder::new()
.from_path(&date_csv_path)
.map_err(|e| format!("calendar: reopen date csv: {}", e))?;
let headers: Vec<String> = rdr
.headers()
.map_err(|e| format!("calendar: reread header: {}", e))?
.iter()
.map(|s| s.to_string())
.collect();
let iso_idx = headers
.iter()
.position(|h| h == "iso")
.ok_or_else(|| "calendar: date csv missing iso column".to_string())?;
for r in rdr.records() {
let r = r.map_err(|e| format!("calendar: reread row: {}", e))?;
let iso = r.get(iso_idx).unwrap_or("");
let key = key_from_iso(iso);
jw.write_record([iso, key.as_str()])
.map_err(|e| format!("calendar: write hier junction row: {}", e))?;
}
jw.flush()
.map_err(|e| format!("calendar: flush hier junction: {}", e))?;
drop(jw);
let node_spec = NodeSpec {
csv: Some(format!("computed/calendar_{}.csv", sanitize(hier_type))),
pk: Some(hier_fk_col.to_string()),
title: Some(hier_fk_col.to_string()),
..NodeSpec::default()
};
blueprint.nodes.insert(hier_type.to_string(), node_spec);
let _ = junc_csv_path; Ok(())
}
#[allow(clippy::too_many_arguments)]
fn write_hierarchy_year(
_blueprint: &mut Blueprint,
_input_root: &Path,
_years: HashSet<i32>,
_date_type: &str,
_edge_name: &str,
) -> Result<(), String> {
Ok(())
}
fn write_link(
blueprint: &mut Blueprint,
input_root: &Path,
date_type: &str,
link: &CalendarLink,
) -> Result<(), String> {
let src_spec = resolve_source_spec(blueprint, &link.from)
.ok_or_else(|| format!("calendar link: unknown source type '{}'", link.from))?;
let src_pk = src_spec
.pk
.clone()
.ok_or_else(|| format!("calendar link: source '{}' has no pk", link.from))?;
let src_csv = src_spec
.csv
.clone()
.ok_or_else(|| format!("calendar link: source '{}' has no csv", link.from))?;
let src_csv_path = resolve_csv_path(input_root, &src_csv);
if !src_csv_path.exists() {
return Ok(());
}
let date_type_param = src_spec.properties.get(&link.date_col).cloned();
let mut reader = csv::ReaderBuilder::new()
.from_path(&src_csv_path)
.map_err(|e| format!("calendar link: open {}: {}", src_csv_path.display(), e))?;
let headers: Vec<String> = reader
.headers()
.map_err(|e| format!("calendar link: header: {}", e))?
.iter()
.map(|s| s.to_string())
.collect();
let src_pk_idx = headers
.iter()
.position(|h| h == &src_pk)
.ok_or_else(|| format!("calendar link: pk '{}' not in source headers", src_pk))?;
let date_idx = headers
.iter()
.position(|h| h == &link.date_col)
.ok_or_else(|| {
format!(
"calendar link: date_col '{}' not in source headers",
link.date_col
)
})?;
let junc_path = input_root.join("computed").join(format!(
"calendar_link_{}_{}.csv",
sanitize(&link.from),
sanitize(&link.edge)
));
let mut w = csv::WriterBuilder::new()
.from_path(&junc_path)
.map_err(|e| format!("calendar link: open {}: {}", junc_path.display(), e))?;
w.write_record([src_pk.as_str(), "iso"])
.map_err(|e| format!("calendar link: write header: {}", e))?;
for r in reader.records() {
let r = r.map_err(|e| format!("calendar link: row: {}", e))?;
let pk_v = r.get(src_pk_idx).unwrap_or("");
let raw_date = r.get(date_idx).unwrap_or("");
let v = csv_cell_to_value(raw_date, date_type_param.as_deref());
let iso = match v {
super::super::expr::Value::String(s) => normalise_to_iso(&s),
_ => continue,
};
if iso.is_empty() {
continue;
}
w.write_record([pk_v, iso.as_str()])
.map_err(|e| format!("calendar link: write row: {}", e))?;
}
w.flush()
.map_err(|e| format!("calendar link: flush: {}", e))?;
drop(w);
let computed_rel = format!(
"computed/calendar_link_{}_{}.csv",
sanitize(&link.from),
sanitize(&link.edge)
);
let src_mut = resolve_source_spec_mut(blueprint, &link.from)
.expect("calendar link source spec disappeared between resolve and mutate");
src_mut.connections.junction_edges.insert(
link.edge.clone(),
JunctionEdge {
csv: computed_rel,
source_fk: src_pk,
target: date_type.to_string(),
target_fk: "iso".to_string(),
properties: vec![],
property_types: IndexMap::new(),
},
);
Ok(())
}
fn normalise_to_iso(s: &str) -> String {
if s.len() >= 10
&& s.chars().nth(4) == Some('-')
&& s.chars().nth(7) == Some('-')
&& s[..10]
.chars()
.take(10)
.all(|c| c == '-' || c.is_ascii_digit())
{
s[..10].to_string()
} else {
String::new()
}
}
fn sanitize(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
#[test]
fn calendar_emits_date_csv_and_next_day_chain() {
let tmp = tempfile::tempdir().unwrap();
let mut bp = Blueprint::default();
run_calendar(
&mut bp,
tmp.path(),
"Date",
"2025-01-01",
"2025-01-05",
"NEXT_DAY",
None,
None,
None,
&[],
)
.unwrap();
let date_csv = fs::read_to_string(tmp.path().join("computed/calendar_Date.csv")).unwrap();
let lines: Vec<&str> = date_csv.lines().collect();
assert_eq!(lines.len(), 6, "{}", date_csv);
assert!(date_csv.contains("2025-01-01"));
assert!(date_csv.contains("2025-01-05"));
let next_csv =
fs::read_to_string(tmp.path().join("computed/calendar_Date_NEXT_DAY.csv")).unwrap();
let next_lines: Vec<&str> = next_csv.lines().collect();
assert_eq!(next_lines.len(), 5);
assert!(next_csv.contains("2025-01-01,2025-01-02"));
assert!(next_csv.contains("2025-01-04,2025-01-05"));
assert!(bp.nodes.contains_key("Date"));
let date_spec = &bp.nodes["Date"];
assert_eq!(date_spec.pk.as_deref(), Some("iso"));
assert!(date_spec
.connections
.junction_edges
.contains_key("NEXT_DAY"));
}
#[test]
fn calendar_links_source_to_date() {
let tmp = tempfile::tempdir().unwrap();
fs::write(
tmp.path().join("tx.csv"),
"id,date\n1,2025-01-02\n2,2025-01-04\n",
)
.unwrap();
let mut spec = NodeSpec {
csv: Some("tx.csv".to_string()),
pk: Some("id".to_string()),
..Default::default()
};
spec.properties
.insert("date".to_string(), "string".to_string());
let mut bp = Blueprint::default();
bp.nodes.insert("Txn".to_string(), spec);
let links = vec![CalendarLink {
from: "Txn".to_string(),
date_col: "date".to_string(),
edge: "ON_DATE".to_string(),
}];
run_calendar(
&mut bp,
tmp.path(),
"Date",
"2025-01-01",
"2025-01-31",
"NEXT_DAY",
None,
None,
None,
&links,
)
.unwrap();
let junc =
fs::read_to_string(tmp.path().join("computed/calendar_link_Txn_ON_DATE.csv")).unwrap();
assert!(junc.contains("1,2025-01-02"));
assert!(junc.contains("2,2025-01-04"));
let edge = &bp.nodes["Txn"].connections.junction_edges["ON_DATE"];
assert_eq!(edge.target, "Date");
assert_eq!(edge.target_fk, "iso");
}
#[test]
fn calendar_rejects_invalid_dates() {
let tmp = tempfile::tempdir().unwrap();
let mut bp = Blueprint::default();
let err = run_calendar(
&mut bp,
tmp.path(),
"Date",
"2025-13-99",
"2025-12-31",
"NEXT_DAY",
None,
None,
None,
&[],
)
.unwrap_err();
assert!(err.contains("invalid start"), "{}", err);
}
#[test]
fn calendar_rejects_inverted_range() {
let tmp = tempfile::tempdir().unwrap();
let mut bp = Blueprint::default();
let err = run_calendar(
&mut bp,
tmp.path(),
"Date",
"2030-01-01",
"2020-12-31",
"NEXT_DAY",
None,
None,
None,
&[],
)
.unwrap_err();
assert!(err.contains("must be <= end"), "{}", err);
}
#[test]
fn calendar_skips_non_iso_date_cells() {
let tmp = tempfile::tempdir().unwrap();
fs::write(
tmp.path().join("tx.csv"),
"id,date\n1,2025-01-02\n2,not-a-date\n3,\n",
)
.unwrap();
let mut spec = NodeSpec {
csv: Some("tx.csv".to_string()),
pk: Some("id".to_string()),
..Default::default()
};
spec.properties
.insert("date".to_string(), "string".to_string());
let mut bp = Blueprint::default();
bp.nodes.insert("Txn".to_string(), spec);
let links = vec![CalendarLink {
from: "Txn".to_string(),
date_col: "date".to_string(),
edge: "ON_DATE".to_string(),
}];
run_calendar(
&mut bp,
tmp.path(),
"Date",
"2025-01-01",
"2025-12-31",
"NEXT_DAY",
None,
None,
None,
&links,
)
.unwrap();
let junc =
fs::read_to_string(tmp.path().join("computed/calendar_link_Txn_ON_DATE.csv")).unwrap();
let lines: Vec<&str> = junc.lines().collect();
assert_eq!(lines.len(), 2, "{}", junc);
assert!(lines[1].contains("1,2025-01-02"));
}
}