nornir 0.4.18

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Materialised view of the funnel: pure in-memory state built by
//! replaying [`super::event::Event`]s.
//!
//! Loading a fresh funnel:
//!   1. `Funnel::default()` — empty state.
//!   2. For each row in the Iceberg `funnel_events` table, call [`Funnel::apply`].
//!   3. Query.
//!
//! Writing:
//!   1. Build the [`Event`] with `chrono::Utc::now()`.
//!   2. `Funnel::apply` it (validates + mutates in-memory state).
//!   3. Append to the log (caller's job — see [`super::store`]).
//!
//! Validation lives in `apply`: we reject events that reference
//! unknown ids, would create duplicate edges, or violate the lifecycle
//! (e.g. moving a `Done` node back to `Pending` without going through
//! `Failed` first). Bad events are rejected with `anyhow::Error`; the
//! log stays clean because the store calls `apply` before `append`.

use std::collections::{BTreeMap, BTreeSet};

use anyhow::{Result, anyhow, bail};
use chrono::{DateTime, Utc};

use super::event::{Event, NodeStatus, PlanStatus, TriageDecision};
use super::ids::{IdeaId, NodeId, PlanId};

#[derive(Debug, Clone)]
pub struct Idea {
    pub id: IdeaId,
    pub source: String,
    pub text: String,
    pub refs: Vec<String>,
    pub submitted_at: DateTime<Utc>,
    pub triage: Option<(TriageDecision, Option<String>, DateTime<Utc>)>,
}

#[derive(Debug, Clone)]
pub struct Plan {
    pub id: PlanId,
    pub idea_id: IdeaId,
    pub summary: String,
    pub planner: String,
    pub created_at: DateTime<Utc>,
    pub status: PlanStatus,
    /// Nodes keyed by id.
    pub nodes: BTreeMap<NodeId, PlanNode>,
    /// Edges as (from, to). Stored as a Set so duplicates can't slip in.
    pub edges: BTreeSet<(NodeId, NodeId)>,
}

#[derive(Debug, Clone)]
pub struct PlanNode {
    pub id: NodeId,
    pub kind: String,
    pub params: serde_json::Map<String, serde_json::Value>,
    pub targets: Vec<String>,
    pub prompt_excerpt: Option<String>,
    pub created_at: DateTime<Utc>,
    pub status: NodeStatus,
    pub last_status_change: DateTime<Utc>,
}

#[derive(Debug, Default, Clone)]
pub struct Funnel {
    pub ideas: BTreeMap<IdeaId, Idea>,
    pub plans: BTreeMap<PlanId, Plan>,
    /// High-water marks so new ids stay monotonic across sessions.
    pub next_idea: u64,
    pub next_plan: u64,
    pub next_node: u64,
    pub next_run: u64,
}

impl Funnel {
    /// Apply a single event to the in-memory state. Returns Err if the
    /// event would violate an invariant (unknown id, duplicate edge,
    /// illegal status transition). The caller MUST NOT append a
    /// rejected event to the log.
    pub fn apply(&mut self, event: &Event) -> Result<()> {
        match event {
            Event::IdeaSubmitted { id, source, text, refs, ts } => {
                if self.ideas.contains_key(id) {
                    bail!("duplicate IdeaSubmitted for `{id}`");
                }
                self.ideas.insert(
                    id.clone(),
                    Idea {
                        id: id.clone(),
                        source: source.clone(),
                        text: text.clone(),
                        refs: refs.clone(),
                        submitted_at: *ts,
                        triage: None,
                    },
                );
                self.next_idea = self.next_idea.max(parse_seq(id.as_str()) + 1);
            }
            Event::IdeaTriaged { idea_id, decision, why, ts } => {
                let idea = self.ideas.get_mut(idea_id)
                    .ok_or_else(|| anyhow!("IdeaTriaged for unknown idea `{idea_id}`"))?;
                idea.triage = Some((*decision, why.clone(), *ts));
            }
            Event::PlanCreated { id, idea_id, summary, planner, ts } => {
                if !self.ideas.contains_key(idea_id) {
                    bail!("PlanCreated for unknown idea `{idea_id}`");
                }
                if self.plans.contains_key(id) {
                    bail!("duplicate PlanCreated for `{id}`");
                }
                self.plans.insert(
                    id.clone(),
                    Plan {
                        id: id.clone(),
                        idea_id: idea_id.clone(),
                        summary: summary.clone(),
                        planner: planner.clone(),
                        created_at: *ts,
                        status: PlanStatus::Draft,
                        nodes: BTreeMap::new(),
                        edges: BTreeSet::new(),
                    },
                );
                self.next_plan = self.next_plan.max(parse_seq(id.as_str()) + 1);
            }
            Event::NodeAdded { plan_id, node_id, kind, params, targets, prompt_excerpt, ts } => {
                let plan = self.plans.get_mut(plan_id)
                    .ok_or_else(|| anyhow!("NodeAdded for unknown plan `{plan_id}`"))?;
                if plan.nodes.contains_key(node_id) {
                    bail!("duplicate NodeAdded `{node_id}` in plan `{plan_id}`");
                }
                plan.nodes.insert(
                    node_id.clone(),
                    PlanNode {
                        id: node_id.clone(),
                        kind: kind.clone(),
                        params: params.clone(),
                        targets: targets.clone(),
                        prompt_excerpt: prompt_excerpt.clone(),
                        created_at: *ts,
                        status: NodeStatus::Pending,
                        last_status_change: *ts,
                    },
                );
                self.next_node = self.next_node.max(parse_seq(node_id.as_str()) + 1);
            }
            Event::EdgeAdded { plan_id, from_node, to_node, ts: _ } => {
                let plan = self.plans.get_mut(plan_id)
                    .ok_or_else(|| anyhow!("EdgeAdded for unknown plan `{plan_id}`"))?;
                if !plan.nodes.contains_key(from_node) {
                    bail!("EdgeAdded references unknown from_node `{from_node}`");
                }
                if !plan.nodes.contains_key(to_node) {
                    bail!("EdgeAdded references unknown to_node `{to_node}`");
                }
                if from_node == to_node {
                    bail!("EdgeAdded would create self-loop on `{from_node}`");
                }
                let inserted = plan.edges.insert((from_node.clone(), to_node.clone()));
                if !inserted {
                    bail!("duplicate edge {from_node} -> {to_node} in plan `{plan_id}`");
                }
            }
            Event::NodeStatusChanged { plan_id, node_id, status, why: _, ts } => {
                let plan = self.plans.get_mut(plan_id)
                    .ok_or_else(|| anyhow!("NodeStatusChanged for unknown plan `{plan_id}`"))?;
                let node = plan.nodes.get_mut(node_id)
                    .ok_or_else(|| anyhow!("NodeStatusChanged for unknown node `{node_id}`"))?;
                node.status = *status;
                node.last_status_change = *ts;
            }
            Event::RunRecorded { plan_id, node_id, run_id, .. } => {
                let plan = self.plans.get(plan_id)
                    .ok_or_else(|| anyhow!("RunRecorded for unknown plan `{plan_id}`"))?;
                if !plan.nodes.contains_key(node_id) {
                    bail!("RunRecorded for unknown node `{node_id}`");
                }
                self.next_run = self.next_run.max(parse_seq(run_id.as_str()) + 1);
            }
            Event::PlanStatusChanged { plan_id, status, why: _, ts: _ } => {
                let plan = self.plans.get_mut(plan_id)
                    .ok_or_else(|| anyhow!("PlanStatusChanged for unknown plan `{plan_id}`"))?;
                plan.status = *status;
            }
        }
        Ok(())
    }

}

// (legacy seq helpers removed — bumping is inlined in each apply arm)


/// Parse the trailing numeric suffix of an id like `"p-007"` → `7`.
/// Non-numeric or unsuffixed ids fall back to 0 (so seq-allocation
/// remains correct even if external tools generate uuid-style ids).
fn parse_seq(s: &str) -> u64 {
    s.rsplit_once('-')
        .and_then(|(_, tail)| tail.parse::<u64>().ok())
        .unwrap_or(0)
}

// The next_*_from helpers above return *what next_* should become*;
// they don't mutate. We do the actual assignments in `apply` for
// clarity / borrow-checker friendliness:
impl Funnel {
    /// Re-evaluate every node's `Pending` → `Ready` transition based
    /// on the current edge set. Called by [`super::topo::topo_ready`]
    /// just before returning a list, so callers see the latest
    /// readiness without needing to emit explicit
    /// `NodeStatusChanged` events for the trivial transition.
    pub fn promote_ready(&mut self) {
        for plan in self.plans.values_mut() {
            // Snapshot status map so we can borrow nodes mutably below.
            let status: BTreeMap<NodeId, NodeStatus> = plan
                .nodes
                .iter()
                .map(|(id, n)| (id.clone(), n.status))
                .collect();
            for node in plan.nodes.values_mut() {
                if node.status != NodeStatus::Pending {
                    continue;
                }
                let deps_ok = plan
                    .edges
                    .iter()
                    .filter(|(_, to)| to == &node.id)
                    .all(|(from, _)| matches!(status.get(from), Some(NodeStatus::Done)));
                if deps_ok {
                    node.status = NodeStatus::Ready;
                }
            }
        }
    }
}