1use std::collections::HashMap;
6use std::fmt::Write;
7
8use crate::field::{self, Field, Ordering, RangeSpec, Transform, ZipfSpec};
9use crate::rng::Rng;
10
11#[derive(Clone)]
16pub struct Column {
17 pub name: String,
18 pub gen: ColumnGen,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum AggrFunc {
23 Sum,
24 Count,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum ExprOp {
29 Add,
30 Sub,
31 Mul,
32}
33
34#[derive(Clone)]
35pub enum ExprOperand {
36 Col(String),
37 Field { field: &'static Field, modifier: String, range: Option<RangeSpec> },
38}
39
40impl std::fmt::Debug for ExprOperand {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::Col(name) => write!(f, "Col({name})"),
44 Self::Field { field, modifier, range } => {
45 write!(f, "Field({}", field.name)?;
46 if let Some(r) = range {
47 write!(f, ":{r:?}")?;
48 }
49 if !modifier.is_empty() {
50 write!(f, ":{modifier}")?;
51 }
52 write!(f, ")")
53 }
54 }
55 }
56}
57
58#[derive(Clone, Default)]
62pub struct ParentCtx {
63 pub table_seed: u64,
65 pub locales: Vec<&'static crate::locale::Locale>,
66 pub script: crate::script::Script,
67 pub ctx: crate::script::Ctx,
68 pub tz_offset_minutes: i32,
69 pub since: i64,
70 pub until: i64,
71 pub parent_count: u64,
72}
73
74#[derive(Clone, Debug)]
75pub enum FkDistribution {
76 Uniform,
77 Zipf(f64),
78}
79
80#[derive(Clone)]
81pub enum ColumnGen {
82 Field {
83 field: &'static Field,
84 modifier: String,
85 transform: Transform,
86 range: Option<RangeSpec>,
87 ordering: Ordering,
88 omit_pct: Option<u8>,
89 zipf: Option<ZipfSpec>,
90 },
91 Literal(String),
92 Aggr {
93 func: AggrFunc,
94 source_col: String,
95 group_by: Option<String>,
96 },
97 Ref {
98 source_col: String,
99 modifier: String,
100 },
101 Expr {
102 left: ExprOperand,
103 op: ExprOp,
104 right: ExprOperand,
105 result_type: ExprResultType,
106 },
107 Fk {
110 parent_table: String,
111 parent_col_name: String,
112 parent_field: &'static Field,
113 parent_modifier: String,
114 parent_range: Option<RangeSpec>,
115 parent_ordering: Ordering,
119 parent_count: u64,
120 distribution: FkDistribution,
121 parent_domain_hash: u64,
123 parent_ctx: Box<ParentCtx>,
124 },
125 FkDeref {
129 anchor_col: String,
130 deref_col_name: String,
131 deref_field: &'static Field,
132 deref_modifier: String,
133 deref_range: Option<RangeSpec>,
134 deref_ordering: Ordering,
136 deref_domain_hash: u64,
138 parent_ctx: Box<ParentCtx>,
139 },
140}
141
142impl std::fmt::Debug for ColumnGen {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 match self {
145 Self::Field { field, modifier, transform, range, .. } => {
146 write!(f, "Field({}", field.name)?;
147 if let Some(r) = range {
148 write!(f, ":{r:?}")?;
149 }
150 if !modifier.is_empty() {
151 write!(f, ":{modifier}")?;
152 }
153 if *transform != Transform::None {
154 write!(f, ":{transform:?}")?;
155 }
156 write!(f, ")")
157 }
158 Self::Literal(s) => write!(f, "Literal({s:?})"),
159 Self::Aggr { func, source_col, group_by } => {
160 write!(f, "Aggr({func:?}({source_col}")?;
161 if let Some(g) = group_by {
162 write!(f, ", {g}")?;
163 }
164 write!(f, "))")
165 }
166 Self::Ref { source_col, modifier } => {
167 write!(f, "Ref({source_col}")?;
168 if !modifier.is_empty() {
169 write!(f, ":{modifier}")?;
170 }
171 write!(f, ")")
172 }
173 Self::Expr { left, op, right, result_type } => {
174 write!(f, "Expr({left:?} {op:?} {right:?} -> {result_type:?})")
175 }
176 Self::Fk { parent_table, parent_field, parent_modifier, .. } => {
177 write!(f, "Fk({parent_table}.{}", parent_field.name)?;
178 if !parent_modifier.is_empty() {
179 write!(f, ":{parent_modifier}")?;
180 }
181 write!(f, ")")
182 }
183 Self::FkDeref { anchor_col, deref_field, deref_modifier, .. } => {
184 write!(f, "FkDeref({anchor_col}->{}", deref_field.name)?;
185 if !deref_modifier.is_empty() {
186 write!(f, ":{deref_modifier}")?;
187 }
188 write!(f, ")")
189 }
190 }
191 }
192}
193
194pub struct ColumnSet {
195 columns: Vec<Column>,
196}
197
198impl ColumnSet {
199 pub fn new(columns: Vec<Column>) -> Self {
200 Self { columns }
201 }
202
203 pub fn len(&self) -> usize {
204 self.columns.len()
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.columns.is_empty()
209 }
210
211 pub fn iter(&self) -> std::slice::Iter<'_, Column> {
212 self.columns.iter()
213 }
214
215 pub fn names(&self) -> Vec<&str> {
216 self.columns.iter().map(|v| v.name.as_str()).collect()
217 }
218
219 pub fn index_of(&self, name: &str) -> Option<usize> {
220 self.columns.iter().position(|v| v.name == name)
221 }
222}
223
224impl<'a> IntoIterator for &'a ColumnSet {
225 type Item = &'a Column;
226 type IntoIter = std::slice::Iter<'a, Column>;
227
228 fn into_iter(self) -> Self::IntoIter {
229 self.columns.iter()
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum FieldType {
235 Int,
236 Float,
237 Money,
238 Date,
239 Timestamp,
240 Text,
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244pub enum ExprResultType {
245 Int,
246 Float,
247 Money,
248 Date,
249 Timestamp,
250}
251
252pub fn field_type(name: &str) -> FieldType {
257 match name {
258 "integer" | "age" | "serial" | "digit" | "bit" | "trit" | "dice" | "port" | "latency" => {
259 FieldType::Int
260 }
261 "float" | "latitude" | "longitude" => FieldType::Float,
262 "amount" => FieldType::Money,
263 "date" | "birthdate" => FieldType::Date,
264 "timestamp" => FieldType::Timestamp,
265 _ => FieldType::Text,
266 }
267}
268
269pub fn check_expr_types(
270 left: FieldType,
271 op: ExprOp,
272 right: FieldType,
273) -> Result<ExprResultType, &'static str> {
274 use ExprOp::{Add, Mul, Sub};
275 use FieldType::{Date, Float, Int, Money, Text, Timestamp};
276 match (left, op, right) {
277 (Int, Add | Sub | Mul, Int) => Ok(ExprResultType::Int),
278 (Float, Add | Sub | Mul, Float | Int) | (Int, Add | Sub | Mul, Float) => {
279 Ok(ExprResultType::Float)
280 }
281 (Money, Add | Sub, Money) => Ok(ExprResultType::Money),
282 (Money, Mul, Money) => Err("cannot multiply money by money"),
283 (Money, Add | Sub | Mul, Int | Float) | (Int | Float, Mul, Money) => {
284 Ok(ExprResultType::Money)
285 }
286 (Int | Float, Add | Sub, Money) => {
287 Err("cannot add/subtract int and money; put money on the left")
288 }
289 (Date, Add | Sub, Int) => Ok(ExprResultType::Date),
290 (Timestamp, Add | Sub, Int) => Ok(ExprResultType::Timestamp),
291 (Date | Timestamp, Mul, _) => Err("cannot multiply dates or timestamps"),
292 (Date | Timestamp, _, Float) => {
293 Err("date/timestamp arithmetic requires integer (whole days or seconds)")
294 }
295 (Date | Timestamp, _, Money) => Err("cannot combine date/timestamp with money"),
296 (Date | Timestamp, _, Date | Timestamp) => Err("cannot combine two date/timestamp values"),
297 (Text, _, _) | (_, _, Text) => Err("field does not support arithmetic"),
298 _ => Err("incompatible types for arithmetic"),
299 }
300}
301
302const RESERVED_COLUMNS: &[(&str, &str)] =
303 &[("serial", "built-in: 0-based record counter, available as {{serial}} in templates")];
304
305pub fn resolve_column(
306 col_name: &str,
307 value: &str,
308 all_columns: &[String],
309) -> Result<ColumnGen, String> {
310 for (reserved, description) in RESERVED_COLUMNS {
311 if col_name == *reserved {
312 return Err(format!(
313 "column '{col_name}' is reserved ({description}); remove it from columns"
314 ));
315 }
316 }
317
318 let trimmed = value.trim();
319 if trimmed.is_empty() {
320 return Err(format!("column '{col_name}': empty value"));
321 }
322
323 if let Some(expr) = parse_expr(col_name, trimmed, all_columns)? {
324 return Ok(expr);
325 }
326
327 if let Some(aggr) = parse_aggr_spec(trimmed) {
328 return Ok(aggr);
329 }
330
331 if trimmed != col_name && all_columns.iter().any(|d| d == trimmed) {
332 return Ok(ColumnGen::Ref { source_col: trimmed.to_string(), modifier: String::new() });
333 }
334 if let Some(colon) = trimmed.find(':') {
335 let base = &trimmed[..colon];
336 let modifier = &trimmed[colon + 1..];
337 if base != col_name && !modifier.is_empty() && all_columns.iter().any(|d| d == base) {
338 return Ok(ColumnGen::Ref {
339 source_col: base.to_string(),
340 modifier: modifier.to_string(),
341 });
342 }
343 }
344
345 resolve_field_spec(col_name, trimmed)
346}
347
348fn parse_expr(
349 col_name: &str,
350 value: &str,
351 all_columns: &[String],
352) -> Result<Option<ColumnGen>, String> {
353 for (ch, op) in &[('+', ExprOp::Add), ('*', ExprOp::Mul)] {
354 if let Some(pos) = value.find(*ch) {
355 let left_str = value[..pos].trim();
356 let right_str = value[pos + 1..].trim();
357 if left_str.is_empty() || right_str.is_empty() {
358 continue;
359 }
360 return build_expr(col_name, left_str, *op, right_str, all_columns).map(Some);
361 }
362 }
363
364 if value.contains('-') {
365 let is_field = field::parse_field_spec(value)
366 .ok()
367 .and_then(|(name, ..)| field::lookup(name))
368 .is_some();
369 if !is_field {
370 for (pos, _) in value.match_indices('-') {
371 let left_str = value[..pos].trim();
372 let right_str = value[pos + 1..].trim();
373 if left_str.is_empty() || right_str.is_empty() {
374 continue;
375 }
376 if is_valid_operand(left_str, all_columns)
377 && is_valid_operand(right_str, all_columns)
378 {
379 return build_expr(col_name, left_str, ExprOp::Sub, right_str, all_columns)
380 .map(Some);
381 }
382 }
383 }
384 }
385
386 Ok(None)
387}
388
389fn build_expr(
390 col_name: &str,
391 left_str: &str,
392 op: ExprOp,
393 right_str: &str,
394 all_columns: &[String],
395) -> Result<ColumnGen, String> {
396 let (left, left_type) = resolve_operand(col_name, left_str, all_columns)?;
397 let (right, right_type) = resolve_operand(col_name, right_str, all_columns)?;
398
399 let lt = if let ExprOperand::Col(_) = &left { FieldType::Int } else { left_type };
400 let rt = if let ExprOperand::Col(_) = &right { FieldType::Int } else { right_type };
401
402 let result_type =
403 if matches!(&left, ExprOperand::Col(_)) || matches!(&right, ExprOperand::Col(_)) {
404 ExprResultType::Int
405 } else {
406 check_expr_types(lt, op, rt).map_err(|e| format!("column '{col_name}': {e}"))?
407 };
408
409 Ok(ColumnGen::Expr { left, op, right, result_type })
410}
411
412pub fn resolve_col_field_type(col_name: &str, columns: &[Column]) -> FieldType {
413 for col in columns {
414 if col.name == col_name {
415 return match &col.gen {
416 ColumnGen::Field { field, .. } => field_type(field.name),
417 ColumnGen::Expr { result_type, .. } => match result_type {
418 ExprResultType::Int => FieldType::Int,
419 ExprResultType::Float => FieldType::Float,
420 ExprResultType::Money => FieldType::Money,
421 ExprResultType::Date => FieldType::Date,
422 ExprResultType::Timestamp => FieldType::Timestamp,
423 },
424 ColumnGen::Aggr { func, .. } => match func {
425 AggrFunc::Count => FieldType::Int,
426 AggrFunc::Sum => FieldType::Money,
427 },
428 ColumnGen::Ref { source_col, .. } => resolve_col_field_type(source_col, columns),
429 ColumnGen::Literal(_) => FieldType::Text,
430 ColumnGen::Fk { parent_field, .. } => field_type(parent_field.name),
431 ColumnGen::FkDeref { deref_field, .. } => field_type(deref_field.name),
432 };
433 }
434 }
435 FieldType::Int
436}
437
438fn is_valid_operand(spec: &str, all_columns: &[String]) -> bool {
439 if all_columns.iter().any(|c| c == spec) {
440 return true;
441 }
442 if let Ok((name, ..)) = field::parse_field_spec(spec) {
443 return field::lookup(name).is_some();
444 }
445 false
446}
447
448fn resolve_operand(
449 col_name: &str,
450 spec: &str,
451 all_columns: &[String],
452) -> Result<(ExprOperand, FieldType), String> {
453 if all_columns.iter().any(|c| c == spec) {
454 return Ok((ExprOperand::Col(spec.to_string()), FieldType::Int));
455 }
456
457 let (name, modifier, _transform, range, _ordering, _omit_pct, _zipf) =
458 field::parse_field_spec(spec).map_err(|e| format!("column '{col_name}': {e}"))?;
459 let f = field::lookup(name)
460 .ok_or_else(|| format!("column '{col_name}': unknown field or column '{name}'"))?;
461 let ft = field_type(name);
462 if ft == FieldType::Text {
463 return Err(format!("column '{col_name}': field '{name}' does not support arithmetic"));
464 }
465
466 Ok((ExprOperand::Field { field: f, modifier: modifier.to_string(), range }, ft))
467}
468
469pub fn dependencies(gen: &ColumnGen) -> Vec<&str> {
470 match gen {
471 ColumnGen::Ref { source_col, .. } => vec![source_col.as_str()],
472 ColumnGen::Aggr { source_col, group_by, .. } => {
473 let mut deps = vec![source_col.as_str()];
474 if let Some(g) = group_by {
475 deps.push(g.as_str());
476 }
477 deps
478 }
479 ColumnGen::Expr { left, right, .. } => {
480 let mut deps = Vec::new();
481 if let ExprOperand::Col(name) = left {
482 deps.push(name.as_str());
483 }
484 if let ExprOperand::Col(name) = right {
485 deps.push(name.as_str());
486 }
487 deps
488 }
489 ColumnGen::Field { .. } | ColumnGen::Literal(_) | ColumnGen::Fk { .. } => vec![],
490 ColumnGen::FkDeref { anchor_col, .. } => vec![anchor_col.as_str()],
491 }
492}
493
494const AGGR_FUNCS: &[(&str, AggrFunc)] = &[("sum", AggrFunc::Sum), ("count", AggrFunc::Count)];
495
496pub fn parse_aggr_spec(spec: &str) -> Option<ColumnGen> {
497 let segments: Vec<&str> = spec.split(':').collect();
498 for (seg_idx, seg) in segments.iter().enumerate() {
499 let (func_name, group_part) = match seg.split_once('=') {
500 Some((f, g)) => (f, Some(g)),
501 None => (*seg, None),
502 };
503 let func = match AGGR_FUNCS.iter().find(|(n, _)| *n == func_name) {
504 Some((_, f)) => *f,
505 None => continue,
506 };
507 if seg_idx == 0 {
508 return None;
509 }
510 let source = segments[..seg_idx].join(":");
511 if source.is_empty() {
512 return None;
513 }
514 let group_by = match (func, group_part) {
515 (_, Some(g)) if !g.is_empty() => Some(g.to_string()),
516 (AggrFunc::Count, _) => Some(source.clone()),
517 (AggrFunc::Sum, _) => None,
518 };
519 return Some(ColumnGen::Aggr { func, source_col: source, group_by });
520 }
521 None
522}
523
524fn resolve_field_spec(col_name: &str, spec: &str) -> Result<ColumnGen, String> {
525 if let Some(values) = spec.strip_prefix("enum:") {
526 crate::gen::validate_enum(values).map_err(|e| format!("column '{col_name}': {e}"))?;
527 let f = field::lookup("enum").ok_or_else(|| {
528 format!("column '{col_name}': internal error: 'enum' field not in registry")
529 })?;
530 return Ok(ColumnGen::Field {
531 field: f,
532 modifier: values.to_string(),
533 transform: Transform::None,
534 range: None,
535 ordering: Ordering::None,
536 omit_pct: None,
537 zipf: None,
538 });
539 }
540
541 field::validate_spec(spec).map_err(|e| format!("column '{col_name}': {e}"))?;
542
543 let (name, modifier, transform, range, ordering, omit_pct, zipf) =
544 field::parse_field_spec(spec).map_err(|e| format!("column '{col_name}': {e}"))?;
545
546 let f = field::lookup(name)
547 .ok_or_else(|| format!("column '{col_name}': unknown field or column '{name}'"))?;
548 Ok(ColumnGen::Field {
549 field: f,
550 modifier: modifier.to_string(),
551 transform,
552 range,
553 ordering,
554 omit_pct,
555 zipf,
556 })
557}
558
559pub fn topo_sort_columns(columns: &[Column]) -> Result<Vec<usize>, String> {
564 let n = columns.len();
565 let names: Vec<&str> = columns.iter().map(|c| c.name.as_str()).collect();
566
567 let mut in_degree = vec![0u32; n];
568 let mut dependents: Vec<Vec<usize>> = vec![Vec::new(); n];
569
570 for (i, col) in columns.iter().enumerate() {
571 let deps = dependencies(&col.gen);
572 for dep_name in deps {
573 if let Some(dep_idx) = names.iter().position(|n| *n == dep_name) {
574 dependents[dep_idx].push(i);
575 in_degree[i] += 1;
576 }
577 }
578 }
579
580 let mut queue: Vec<usize> = (0..n).filter(|i| in_degree[*i] == 0).collect();
581 let mut order = Vec::with_capacity(n);
582
583 while let Some(idx) = queue.pop() {
584 order.push(idx);
585 for &dep in &dependents[idx] {
586 in_degree[dep] -= 1;
587 if in_degree[dep] == 0 {
588 queue.push(dep);
589 }
590 }
591 }
592
593 if order.len() != n {
594 let cycle: Vec<&str> = (0..n).filter(|i| in_degree[*i] > 0).map(|i| names[i]).collect();
595 return Err(format!("circular dependency between columns: {}", cycle.join(", ")));
596 }
597
598 Ok(order)
599}
600
601pub fn resolve_expr_types(columns: &mut [Column]) -> Result<(), String> {
602 let snap_cols: Vec<Column> = columns.to_vec();
603
604 for col in columns.iter_mut() {
605 if let ColumnGen::Expr { left, op, right, result_type } = &mut col.gen {
606 let lt = operand_type(left, &snap_cols);
607 let rt = operand_type(right, &snap_cols);
608 *result_type =
609 check_expr_types(lt, *op, rt).map_err(|e| format!("column '{}': {e}", col.name))?;
610 }
611 }
612 Ok(())
613}
614
615fn operand_type(operand: &ExprOperand, columns: &[Column]) -> FieldType {
616 match operand {
617 ExprOperand::Col(name) => resolve_col_field_type(name, columns),
618 ExprOperand::Field { field, .. } => field_type(field.name),
619 }
620}
621
622struct AggrEntry {
627 col_idx: usize,
628 source_idx: usize,
629 group_idx: Option<usize>,
630 func: AggrFunc,
631 global: f64,
632 grouped: HashMap<u64, f64>,
633}
634
635pub struct AggrState {
636 entries: Vec<AggrEntry>,
637}
638
639impl AggrState {
640 pub fn new<S: AsRef<str>>(columns: &[Column], col_names: &[S]) -> Result<Self, String> {
641 let mut entries = Vec::new();
642 for (i, col) in columns.iter().enumerate() {
643 if let ColumnGen::Aggr { func, source_col, group_by } = &col.gen {
644 let source_idx =
645 col_names.iter().position(|n| n.as_ref() == source_col).ok_or_else(|| {
646 format!("aggregator source '{source_col}' not found in columns")
647 })?;
648 let group_idx = group_by
649 .as_ref()
650 .map(|g| {
651 col_names
652 .iter()
653 .position(|n| n.as_ref() == g.as_str())
654 .ok_or_else(|| format!("aggregator group '{g}' not found in columns"))
655 })
656 .transpose()?;
657 entries.push(AggrEntry {
658 col_idx: i,
659 source_idx,
660 group_idx,
661 func: *func,
662 global: 0.0,
663 grouped: HashMap::new(),
664 });
665 }
666 }
667 Ok(Self { entries })
668 }
669
670 pub fn update(
671 &mut self,
672 values: &mut [String],
673 raw_values: &[Option<f64>],
674 ) -> Result<(), String> {
675 for entry in &mut self.entries {
676 let src_raw = raw_values[entry.source_idx].ok_or_else(|| {
677 format!("aggregator source column {} is not a numeric field", entry.source_idx)
678 })?;
679 let delta = match entry.func {
680 AggrFunc::Sum => src_raw,
681 AggrFunc::Count => 1.0,
682 };
683 let current = if let Some(gidx) = entry.group_idx {
684 let key = crate::hash_seed(&values[gidx]);
685 let slot = entry.grouped.entry(key).or_insert(0.0);
686 *slot += delta;
687 *slot
688 } else {
689 entry.global += delta;
690 entry.global
691 };
692 values[entry.col_idx].clear();
693 match entry.func {
694 AggrFunc::Count => {
695 let _ = write!(values[entry.col_idx], "{}", current as u64);
696 }
697 AggrFunc::Sum => {
698 let _ = write!(values[entry.col_idx], "{current:.2}");
699 }
700 }
701 }
702 Ok(())
703 }
704}
705
706pub fn column_domain_hash(
713 master_seed: u64,
714 col_name: &str,
715 field: &crate::field::Field,
716 modifier: &str,
717) -> u64 {
718 let base = crate::pipeline::field_domain_hash(master_seed, field, modifier);
719 if is_alias(col_name, field.name, modifier) {
720 crate::rng::domain_hash(base, col_name)
721 } else {
722 base
723 }
724}
725
726pub fn compute_domain_hashes(columns: &[Column], master_seed: u64) -> Vec<u64> {
727 columns
728 .iter()
729 .map(|v| match &v.gen {
730 ColumnGen::Field { field, modifier, .. } => {
731 column_domain_hash(master_seed, &v.name, field, modifier)
732 }
733 ColumnGen::Expr { .. } => {
734 crate::rng::domain_hash(master_seed, &format!("_expr_{}", v.name))
735 }
736 ColumnGen::Literal(_)
737 | ColumnGen::Aggr { .. }
738 | ColumnGen::Ref { .. }
739 | ColumnGen::FkDeref { .. } => 0,
740 ColumnGen::Fk { .. } => {
741 crate::rng::domain_hash(master_seed, &format!("_fk_sample_{}", v.name))
742 }
743 })
744 .collect()
745}
746
747fn is_alias(col_name: &str, field_name: &str, modifier: &str) -> bool {
748 if col_name == field_name {
749 return false;
750 }
751 let normalized = field_name.replace('-', "_");
752 if col_name == normalized {
753 return false;
754 }
755 if !modifier.is_empty()
756 && (col_name == format!("{field_name}_{modifier}")
757 || col_name == format!("{normalized}_{modifier}")
758 || col_name == format!("{field_name}:{modifier}"))
759 {
760 return false;
761 }
762 true
763}
764
765pub struct ExprEnv<'a> {
766 pub raw_values: &'a [Option<f64>],
767 pub col_names: &'a [String],
768 pub domain_hashes: &'a [u64],
769 pub serial: u64,
770}
771
772pub fn eval_operand<'a>(
773 operand: &'a ExprOperand,
774 env: &ExprEnv<'_>,
775 ctx: &mut crate::ctx::GenContext<'a>,
776 expr_idx: usize,
777 is_left: bool,
778) -> f64 {
779 match operand {
780 ExprOperand::Col(name) => env
781 .col_names
782 .iter()
783 .position(|n| n == name)
784 .and_then(|idx| env.raw_values[idx])
785 .unwrap_or(0.0),
786 ExprOperand::Field { field, modifier, range } => {
787 let sub = if is_left { "L" } else { "R" };
788 let sub_hash = crate::rng::domain_hash(env.domain_hashes[expr_idx], sub);
789 ctx.rng = Rng::derive_fast(sub_hash, env.serial);
790 ctx.modifier = modifier;
791 ctx.range = range
792 .as_ref()
793 .and_then(|r| field::resolve_range(&Some(*r), field.name, ctx.since, ctx.until));
794 let mut buf = String::new();
795 let raw = field.generate(ctx, &mut buf);
796 raw.unwrap_or(0.0)
797 }
798 }
799}
800
801pub fn format_raw_typed(value: f64, result_type: ExprResultType, buf: &mut String) {
802 match result_type {
803 ExprResultType::Date => {
804 let epoch = value as i64;
805 let (y, m, d, _, _, _) = crate::gen::timestamp::epoch_to_parts(epoch);
806 let _ = write!(buf, "{y:04}-{m:02}-{d:02}");
807 }
808 ExprResultType::Timestamp => {
809 let epoch = value as i64;
810 let (y, m, d, h, min, s) = crate::gen::timestamp::epoch_to_parts(epoch);
811 let _ = write!(buf, "{y:04}-{m:02}-{d:02}T{h:02}:{min:02}:{s:02}Z");
812 }
813 ExprResultType::Money | ExprResultType::Float => {
814 let _ = write!(buf, "{value:.2}");
815 }
816 ExprResultType::Int => {
817 let mut tmp = itoa::Buffer::new();
818 buf.push_str(tmp.format(value as i64));
819 }
820 }
821}
822
823pub fn format_ref(raw: f64, modifier: &str, columns: &[Column], src_idx: usize, buf: &mut String) {
824 let ft = match &columns[src_idx].gen {
825 ColumnGen::Field { field, .. } => field_type(field.name),
826 ColumnGen::Expr { result_type, .. } => match result_type {
827 ExprResultType::Date => FieldType::Date,
828 ExprResultType::Timestamp => FieldType::Timestamp,
829 ExprResultType::Money => FieldType::Money,
830 ExprResultType::Float => FieldType::Float,
831 ExprResultType::Int => FieldType::Int,
832 },
833 _ => FieldType::Int,
834 };
835
836 match ft {
837 FieldType::Date => {
838 let epoch = raw as i64;
839 let (y, m, d, _, _, _) = crate::gen::timestamp::epoch_to_parts(epoch);
840 match modifier {
841 "us" => {
842 crate::gen::date::push_pad2(buf, m);
843 buf.push('/');
844 crate::gen::date::push_pad2(buf, d);
845 buf.push('/');
846 buf.push_str(itoa::Buffer::new().format(y));
847 }
848 "eu" => {
849 crate::gen::date::push_pad2(buf, d);
850 buf.push('.');
851 crate::gen::date::push_pad2(buf, m);
852 buf.push('.');
853 buf.push_str(itoa::Buffer::new().format(y));
854 }
855 _ => format_raw_typed(raw, ExprResultType::Date, buf),
856 }
857 }
858 FieldType::Timestamp => {
859 let epoch = raw as i64;
860 match modifier {
861 "unix" => buf.push_str(itoa::Buffer::new().format(epoch)),
862 "ms" => buf.push_str(itoa::Buffer::new().format(epoch * 1000)),
863 _ => format_raw_typed(raw, ExprResultType::Timestamp, buf),
864 }
865 }
866 FieldType::Money => {
867 let v = raw;
868 match modifier {
869 "usd" => {
870 buf.push('$');
871 push_money_formatted(v, buf, ',', '.');
872 }
873 "eur" => {
874 buf.push('\u{20ac}');
875 push_money_formatted(v, buf, '.', ',');
876 }
877 "gbp" => {
878 buf.push('\u{a3}');
879 push_money_formatted(v, buf, ',', '.');
880 }
881 _ => {
882 let _ = write!(buf, "{v:.2}");
883 }
884 }
885 }
886 _ => format_raw_typed(raw, ExprResultType::Int, buf),
887 }
888}
889
890fn push_money_formatted(v: f64, buf: &mut String, thousands: char, decimal: char) {
891 let abs = v.abs();
892 let whole = abs as i64;
893 let cents = ((abs - whole as f64) * 100.0).round() as i64;
894 if v < 0.0 {
895 buf.push('-');
896 }
897 let mut ib = itoa::Buffer::new();
898 let s = ib.format(whole);
899 let len = s.len();
900 for (i, ch) in s.chars().enumerate() {
901 if i > 0 && (len - i).is_multiple_of(3) {
902 buf.push(thousands);
903 }
904 buf.push(ch);
905 }
906 buf.push(decimal);
907 if cents < 10 {
908 buf.push('0');
909 }
910 buf.push_str(itoa::Buffer::new().format(cents));
911}
912
913pub fn spec_display_name(spec: &str) -> String {
919 if let Ok((name, modifier, transform, ..)) = field::parse_field_spec(spec) {
920 let base = name.to_string();
921 let suffix = if modifier.is_empty() {
922 match transform {
923 Transform::Upper => "upper",
924 Transform::Lower => "lower",
925 Transform::Capitalize => "capitalize",
926 Transform::None => "",
927 }
928 } else {
929 modifier
930 };
931 if suffix.is_empty() {
932 base
933 } else {
934 format!("{base}_{suffix}")
935 }
936 } else {
937 spec.to_string()
938 }
939}
940
941pub fn resolve_field_specs(fields: &[String]) -> Result<(Vec<Column>, Vec<usize>), String> {
944 let all_names: Vec<String> = fields
945 .iter()
946 .map(|spec| {
947 if let Some(eq) = spec.find('=') {
948 let colon = spec.find(':').unwrap_or(spec.len());
949 if eq < colon {
950 return spec[..eq].to_string();
951 }
952 }
953 spec_display_name(spec)
954 })
955 .collect();
956
957 let mut columns = Vec::with_capacity(fields.len());
958 for spec in fields {
959 let (alias, value) = if let Some(eq) = spec.find('=') {
960 let colon = spec.find(':').unwrap_or(spec.len());
961 if eq < colon {
962 (spec[..eq].to_string(), spec[eq + 1..].to_string())
963 } else {
964 (spec_display_name(spec), spec.clone())
965 }
966 } else {
967 (spec_display_name(spec), spec.clone())
968 };
969
970 let gen = resolve_column(&alias, &value, &all_names)?;
971 columns.push(Column { name: alias, gen });
972 }
973
974 resolve_expr_types(&mut columns)?;
975 let eval_order = topo_sort_columns(&columns)?;
976 Ok((columns, eval_order))
977}
978
979pub fn generate_records_from_specs(
982 fields: &[String],
983 opts: &crate::pipeline::RecordOpts<'_>,
984 n: u64,
985 start_serial: u64,
986) -> Result<(Vec<String>, Vec<Vec<String>>), String> {
987 let (columns, eval_order) = resolve_field_specs(fields)?;
988 let col_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
989 let records = generate_from_columns(&columns, &eval_order, opts, n, start_serial);
990 Ok((col_names, records))
991}
992
993pub fn generate_from_columns(
994 columns: &[Column],
995 eval_order: &[usize],
996 opts: &crate::pipeline::RecordOpts<'_>,
997 n: u64,
998 start_serial: u64,
999) -> Vec<Vec<String>> {
1000 let master_seed = opts.master_seed;
1001 let locales = opts.locales;
1002 let ctx_mode = opts.ctx;
1003 let corrupt_rate = opts.corrupt_rate;
1004 let tz_offset_minutes = opts.tz_offset_minutes;
1005 let since = opts.since;
1006 let until = opts.until;
1007 let col_count = columns.len();
1008 let col_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
1009 let domain_hashes: Vec<u64> = columns
1011 .iter()
1012 .map(|c| match &c.gen {
1013 ColumnGen::Field { field, modifier, .. } => {
1014 crate::pipeline::field_domain_hash(master_seed, field, modifier)
1015 }
1016 ColumnGen::Expr { .. } => {
1017 crate::rng::domain_hash(master_seed, &format!("_expr_{}", c.name))
1018 }
1019 ColumnGen::Literal(_)
1020 | ColumnGen::Aggr { .. }
1021 | ColumnGen::Ref { .. }
1022 | ColumnGen::Fk { .. }
1023 | ColumnGen::FkDeref { .. } => 0,
1024 })
1025 .collect();
1026 let resolved_ranges: Vec<Option<(i64, i64)>> = columns
1027 .iter()
1028 .map(|c| match &c.gen {
1029 ColumnGen::Field { range, field, .. } => {
1030 field::resolve_range(range, field.name, since, until)
1031 }
1032 _ => None,
1033 })
1034 .collect();
1035
1036 let needs_ctx = ctx_mode != crate::script::Ctx::None;
1037 let mut aggr =
1038 AggrState::new(columns, &col_names).unwrap_or_else(|_| AggrState { entries: Vec::new() });
1039 let mut records = Vec::with_capacity(n as usize);
1040
1041 for i in 0..n {
1042 let serial = start_serial + i;
1043
1044 let locked_locale: Option<&crate::locale::Locale> = match ctx_mode {
1045 crate::script::Ctx::Strict => {
1046 let mut lr = Rng::derive(master_seed, serial, crate::DOMAIN_LOCALE);
1047 Some(*lr.choice(locales))
1048 }
1049 crate::script::Ctx::Loose => {
1050 let mut lr = Rng::derive(master_seed, serial, crate::DOMAIN_LOCALE);
1051 if lr.maybe(0.7) {
1052 Some(*lr.choice(locales))
1053 } else {
1054 None
1055 }
1056 }
1057 crate::script::Ctx::None => None,
1058 };
1059 let locked_arr: [&crate::locale::Locale; 1];
1060 let effective_locales: &[&crate::locale::Locale] = if let Some(loc) = locked_locale {
1061 locked_arr = [loc];
1062 &locked_arr
1063 } else {
1064 locales
1065 };
1066
1067 let identity = if needs_ctx {
1068 let mut ir = Rng::derive(master_seed, serial, crate::DOMAIN_IDENTITY);
1069 Some(crate::ctx::Identity::new(&mut ir, effective_locales, None, since, until))
1070 } else {
1071 None
1072 };
1073
1074 let mut ctx = crate::ctx::GenContext {
1075 rng: Rng::new(0),
1076 locales: effective_locales,
1077 modifier: "",
1078 identity: identity.as_ref(),
1079 tz_offset_minutes,
1080 since,
1081 until,
1082 range: None,
1083 ordering: Ordering::None,
1084 zipf: None,
1085 numeric: None,
1086 };
1087
1088 let mut values: Vec<String> = (0..col_count).map(|_| String::with_capacity(32)).collect();
1089 let mut raw_values: Vec<Option<f64>> = vec![None; col_count];
1090
1091 for &idx in eval_order {
1092 match &columns[idx].gen {
1093 ColumnGen::Field {
1094 field, modifier, transform, ordering, omit_pct, zipf, ..
1095 } => {
1096 if let Some(pct) = omit_pct {
1097 let mut or = Rng::derive(domain_hashes[idx], serial, "omit");
1098 if or.range(0, 100) < i64::from(*pct) {
1099 continue;
1100 }
1101 }
1102 ctx.rng = Rng::derive_fast(domain_hashes[idx], serial);
1103 ctx.modifier = modifier;
1104 ctx.range = resolved_ranges[idx];
1105 ctx.ordering = *ordering;
1106 ctx.zipf = *zipf;
1107 raw_values[idx] = field.generate(&mut ctx, &mut values[idx]);
1108 if *transform != Transform::None {
1109 let s = std::mem::take(&mut values[idx]);
1110 values[idx] = transform.apply(&s);
1111 }
1112 }
1113 ColumnGen::Literal(s) => {
1114 values[idx].push_str(s);
1115 }
1116 ColumnGen::Aggr { .. } | ColumnGen::Fk { .. } | ColumnGen::FkDeref { .. } => {}
1117 ColumnGen::Ref { source_col, modifier } => {
1118 if let Some(src_idx) = col_names.iter().position(|n| n == source_col) {
1119 raw_values[idx] = raw_values[src_idx];
1120 if modifier.is_empty() {
1121 let src = values[src_idx].clone();
1122 values[idx].push_str(&src);
1123 } else if let Some(raw) = raw_values[src_idx] {
1124 format_ref(raw, modifier, columns, src_idx, &mut values[idx]);
1125 } else {
1126 let src = values[src_idx].clone();
1127 values[idx].push_str(&src);
1128 }
1129 }
1130 }
1131 ColumnGen::Expr { left, op, right, result_type } => {
1132 let env = ExprEnv {
1133 raw_values: &raw_values,
1134 col_names: &col_names,
1135 domain_hashes: &domain_hashes,
1136 serial,
1137 };
1138 let lv = eval_operand(left, &env, &mut ctx, idx, true);
1139 let rv = eval_operand(right, &env, &mut ctx, idx, false);
1140 let adjusted_rv = match result_type {
1141 ExprResultType::Date => rv * 86400.0,
1142 _ => rv,
1143 };
1144 let result = match op {
1145 ExprOp::Add => lv + adjusted_rv,
1146 ExprOp::Sub => lv - adjusted_rv,
1147 ExprOp::Mul => lv * rv,
1148 };
1149 raw_values[idx] = Some(result);
1150 format_raw_typed(result, *result_type, &mut values[idx]);
1151 }
1152 }
1153 }
1154
1155 let _ = aggr.update(&mut values, &raw_values);
1156
1157 if let Some(rate) = corrupt_rate {
1158 let mut cr = Rng::derive(master_seed, serial, crate::DOMAIN_CORRUPT);
1159 crate::corrupt::corrupt_values(&mut cr, &mut values, rate);
1160 }
1161
1162 records.push(values);
1163 }
1164
1165 records
1166}