Skip to main content

disney_loop/
lib.rs

1#![doc = "Disney Loop: ρ(t) → ∂(¬σ⁻¹) → ∃(ν) → ρ(t+1)"]
2#![doc = ""]
3#![doc = "Forward-only compound discovery pipeline."]
4#![doc = "Assess state → reject regression → search for novelty → arrive at new state."]
5#![forbid(unsafe_code)]
6#![deny(clippy::unwrap_used)]
7#![deny(clippy::expect_used)]
8#![deny(clippy::panic)]
9#![warn(missing_docs)]
10use nexcore_dataframe::{Agg, Column, DataFrame, DataFrameError};
11use std::path::Path;
12
13/// Errors specific to the Disney Loop pipeline.
14#[derive(Debug, nexcore_error::Error)]
15#[non_exhaustive]
16pub enum DisneyError {
17    #[error("dataframe error: {0}")]
18    DataFrame(#[from] DataFrameError),
19
20    #[error("io error: {0}")]
21    Io(#[from] std::io::Error),
22
23    #[error("json error: {0}")]
24    Json(#[from] serde_json::Error),
25
26    #[error("empty pipeline: no records after ingestion")]
27    EmptyPipeline,
28}
29
30pub type Result<T> = std::result::Result<T, DisneyError>;
31
32pub mod humanize;
33
34/// Stage 2: ∂(¬σ⁻¹) — Anti-Regression Gate
35///
36/// Filters out any records where `direction == "backward"`.
37/// Only forward-moving state survives this gate.
38pub fn transform_anti_regression_gate(df: DataFrame) -> Result<DataFrame> {
39    tracing::info!(
40        stage = "anti-regression-gate",
41        expression = "direction != 'backward'",
42        "Applying filter: reject regression"
43    );
44    let filtered = df.filter_by("direction", |v| v.as_str() != Some("backward"))?;
45
46    if filtered.height() == 0 {
47        return Err(DisneyError::EmptyPipeline);
48    }
49
50    Ok(filtered)
51}
52
53/// Stage 3: ∃(ν) — Curiosity Search
54///
55/// Aggregates novelty by domain: sums `novelty_score` and counts discoveries.
56pub fn transform_curiosity_search(df: DataFrame) -> Result<DataFrame> {
57    tracing::info!(stage = "curiosity-search", "Aggregating novelty by domain");
58    let aggregated = df
59        .group_by(&["domain"])?
60        .agg(&[Agg::Sum("novelty_score".into()), Agg::Count])?;
61    Ok(aggregated)
62}
63
64/// Stage 4: ρ(t+1) — New State Sink
65///
66/// Writes the transformed state to a JSON file. The old state is gone;
67/// the new state is all that remains. Forward only.
68#[allow(clippy::as_conversions, reason = "DataFrame height fits in u64")]
69pub fn sink_new_state(df: DataFrame, output_path: &Path) -> Result<u64> {
70    tracing::info!(
71        stage = "new-state",
72        path = %output_path.display(),
73        "Writing new state to JSON"
74    );
75
76    // Ensure parent directory exists
77    if let Some(parent) = output_path.parent() {
78        if !parent.as_os_str().is_empty() && !parent.exists() {
79            std::fs::create_dir_all(parent)?;
80        }
81    }
82
83    let rows = df.height() as u64;
84    df.to_json_file(output_path)?;
85
86    tracing::info!(records = rows, "State written successfully");
87    Ok(rows)
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use std::io::Read;
94
95    fn sample_frame() -> Result<DataFrame> {
96        Ok(DataFrame::new(vec![
97            Column::from_strs(
98                "domain",
99                &[
100                    "signals",
101                    "signals",
102                    "primitives",
103                    "primitives",
104                    "regression",
105                ],
106            ),
107            Column::from_strs(
108                "direction",
109                &["forward", "forward", "forward", "backward", "backward"],
110            ),
111            Column::from_i64s("novelty_score", vec![10, 20, 15, 5, 0]),
112            Column::from_strs("discovery", &["prr", "ror", "sigma", "none", "none"]),
113        ])?)
114    }
115
116    #[test]
117    fn anti_regression_gate_filters_backward() -> Result<()> {
118        let df = transform_anti_regression_gate(sample_frame()?)?;
119        // Started with 5 rows, 2 are "backward" → 3 remain
120        assert_eq!(df.height(), 3);
121        Ok(())
122    }
123
124    #[test]
125    fn curiosity_search_aggregates_by_domain() -> Result<()> {
126        use nexcore_dataframe::Scalar;
127        // First filter, then aggregate (the real pipeline order)
128        let df = transform_curiosity_search(transform_anti_regression_gate(sample_frame()?)?)?;
129        // After filtering backward: signals(2 rows) + primitives(1 row) = 2 domains
130        assert_eq!(df.height(), 2);
131        let sums = df.column("novelty_score_sum")?;
132        let counts = df.column("count")?;
133        let domains = df.column("domain")?;
134        let mut found_signals = false;
135        let mut found_primitives = false;
136        for i in 0..df.height() {
137            match domains.get(i).as_ref().map(|s| s.to_string()).as_deref() {
138                Some("signals") => {
139                    assert_eq!(
140                        sums.get(i),
141                        Some(Scalar::Int64(30)),
142                        "signals sum must be 30"
143                    );
144                    assert_eq!(
145                        counts.get(i),
146                        Some(Scalar::UInt64(2)),
147                        "signals count must be 2"
148                    );
149                    found_signals = true;
150                }
151                Some("primitives") => {
152                    assert_eq!(
153                        sums.get(i),
154                        Some(Scalar::Int64(15)),
155                        "primitives sum must be 15"
156                    );
157                    assert_eq!(
158                        counts.get(i),
159                        Some(Scalar::UInt64(1)),
160                        "primitives count must be 1"
161                    );
162                    found_primitives = true;
163                }
164                _ => {}
165            }
166        }
167        assert!(found_signals, "signals domain must appear in result");
168        assert!(found_primitives, "primitives domain must appear in result");
169        Ok(())
170    }
171
172    #[test]
173    fn sink_writes_json_file() -> Result<()> {
174        let dir = tempfile::tempdir()?;
175        let path = dir.path().join("state_next.json");
176        let input = DataFrame::new(vec![
177            Column::from_strs("domain", &["signals"]),
178            Column::from_i64s("total_novelty", vec![30]),
179            Column::from_i64s("discoveries", vec![2]),
180        ])?;
181        sink_new_state(input, &path)?;
182        assert!(path.exists());
183        let mut contents = String::new();
184        std::fs::File::open(&path)?.read_to_string(&mut contents)?;
185        assert!(contents.contains("signals"));
186        Ok(())
187    }
188
189    #[test]
190    fn anti_regression_gate_rejects_all_backward() -> Result<()> {
191        // All records are backward → EmptyPipeline error
192        let df = DataFrame::new(vec![
193            Column::from_strs("domain", &["signals", "primitives"]),
194            Column::from_strs("direction", &["backward", "backward"]),
195            Column::from_i64s("novelty_score", vec![5, 0]),
196            Column::from_strs("discovery", &["none", "none"]),
197        ])?;
198        let result = transform_anti_regression_gate(df);
199        assert!(result.is_err());
200        if let Err(err) = result {
201            assert!(
202                err.to_string().contains("empty pipeline"),
203                "Expected EmptyPipeline error, got: {err}"
204            );
205        }
206        Ok(())
207    }
208
209    #[test]
210    fn full_pipeline_forward_only() -> Result<()> {
211        let dir = tempfile::tempdir()?;
212        let path = dir.path().join("output/state.json");
213        let df = transform_anti_regression_gate(sample_frame()?)?;
214        let df = transform_curiosity_search(df)?;
215        let rows = sink_new_state(df, &path)?;
216        assert_eq!(rows, 2); // 2 forward domains
217        Ok(())
218    }
219}