Skip to main content

ezu_graph/
evaluator.rs

1//! Walk the DAG and evaluate one tile.
2
3use std::time::Instant;
4
5use xxhash_rust::xxh3::Xxh3;
6
7use crate::cache::{Cache, CacheKey, Hash128};
8use crate::eval::{AssetLoader, CanvasInfo, EvalCtx, EvalError, ParamValues, TileId};
9use crate::graph::{Graph, NodeIx};
10use crate::port::{CoordSpace, PortKind};
11use crate::value::PortValue;
12
13/// Entry point: evaluate a `Graph` for one tile.
14pub struct Evaluator<'a> {
15    pub graph: &'a Graph,
16    pub cache: &'a Cache,
17    pub assets: &'a dyn AssetLoader,
18}
19
20#[derive(Debug, thiserror::Error)]
21pub enum RenderError {
22    #[error(transparent)]
23    Eval(#[from] EvalError),
24}
25
26impl<'a> Evaluator<'a> {
27    pub fn new(graph: &'a Graph, cache: &'a Cache, assets: &'a dyn AssetLoader) -> Self {
28        Self {
29            graph,
30            cache,
31            assets,
32        }
33    }
34
35    /// Evaluate the graph and return the value at the output node.
36    /// Source nodes pull host data through `self.assets`; tile-scoped
37    /// bindings (MVT/GeoJSON layers, …) live under `tile.<name>` keys.
38    pub fn render(
39        &self,
40        tile: TileId,
41        canvas: CanvasInfo,
42        params: &ParamValues,
43        rng_seed: u64,
44    ) -> Result<PortValue, RenderError> {
45        let ctx = EvalCtx {
46            tile,
47            canvas,
48            assets: self.assets,
49            params,
50            rng_seed,
51        };
52        let n = self.graph.len();
53        let mut hashes: Vec<Hash128> = vec![0; n];
54        let mut values: Vec<Option<PortValue>> = vec![None; n];
55
56        for &ix in self.graph.topo_order() {
57            let (value, hash) = self.eval_one(ix, &ctx, &hashes, &values)?;
58            hashes[ix] = hash;
59            values[ix] = Some(value);
60        }
61        Ok(values[self.graph.output()].clone().expect("output unset"))
62    }
63
64    /// Like [`render`] but evaluates nodes in parallel per topological
65    /// "level" using Rayon. All nodes at the same level have no edges
66    /// between them, so they fan out across the global thread pool with
67    /// no synchronization cost beyond a per-level join.
68    ///
69    /// Falls back to sequential evaluation transparently when the
70    /// `parallel` feature is disabled, so callers don't need to branch.
71    pub fn render_parallel(
72        &self,
73        tile: TileId,
74        canvas: CanvasInfo,
75        params: &ParamValues,
76        rng_seed: u64,
77    ) -> Result<PortValue, RenderError> {
78        #[cfg(not(feature = "parallel"))]
79        {
80            self.render(tile, canvas, params, rng_seed)
81        }
82        #[cfg(feature = "parallel")]
83        {
84            use rayon::prelude::*;
85
86            let ctx = EvalCtx {
87                tile,
88                canvas,
89                assets: self.assets,
90                params,
91                rng_seed,
92            };
93            let n = self.graph.len();
94            let mut hashes: Vec<Hash128> = vec![0; n];
95            let mut values: Vec<Option<PortValue>> = vec![None; n];
96
97            for bucket in self.graph.level_buckets() {
98                let new: Vec<(NodeIx, PortValue, Hash128)> = bucket
99                    .par_iter()
100                    .map(|&ix| -> Result<_, RenderError> {
101                        let (v, h) = self.eval_one(ix, &ctx, &hashes, &values)?;
102                        Ok((ix, v, h))
103                    })
104                    .collect::<Result<_, _>>()?;
105                for (ix, v, h) in new {
106                    values[ix] = Some(v);
107                    hashes[ix] = h;
108                }
109            }
110            Ok(values[self.graph.output()].clone().expect("output unset"))
111        }
112    }
113
114    /// Evaluate one node given the current intermediate state. Pulled
115    /// out so the serial and parallel paths share the cache lookup and
116    /// hashing logic.
117    fn eval_one(
118        &self,
119        ix: NodeIx,
120        ctx: &EvalCtx<'_>,
121        hashes: &[Hash128],
122        values: &[Option<PortValue>],
123    ) -> Result<(PortValue, Hash128), RenderError> {
124        let node = self.graph.node(ix);
125
126        // Hash this node's own params, plus any asset bindings it samples.
127        let mut h = Xxh3::new();
128        node.param_hash(&mut h);
129        for name in node.asset_inputs() {
130            h.update(name.as_bytes());
131            h.update(&ctx.assets.hash(&name).to_le_bytes());
132        }
133        let params_hash: Hash128 = h.digest128();
134
135        // Collect input hashes (in port order) and input values.
136        let input_specs = node.inputs();
137        let mut input_hashes: Vec<Hash128> = Vec::with_capacity(input_specs.len());
138        let mut input_vals: Vec<Option<PortValue>> = Vec::with_capacity(input_specs.len());
139        for port_ix in 0..input_specs.len() {
140            match self.graph.incoming(ix, port_ix) {
141                Some(src) => {
142                    input_hashes.push(hashes[src]);
143                    input_vals.push(values[src].clone());
144                }
145                None => {
146                    input_hashes.push(0);
147                    input_vals.push(None);
148                }
149            }
150        }
151
152        // World-anchored nodes drop the tile id from their key so
153        // adjacent tiles can share intermediates.
154        let tile_for_key = match node.coord_space() {
155            CoordSpace::World => None,
156            _ => Some(ctx.tile),
157        };
158        let key = CacheKey::build(ctx.canvas, tile_for_key, params_hash, &input_hashes);
159
160        if let Some(v) = self.cache.get(key) {
161            tracing::debug!(
162                target: "ezu_graph::eval",
163                node = self.graph.node_id(ix),
164                op = node.op_name(),
165                cache = "hit",
166                output = %describe_value(&v),
167                tile = %format!("{}/{}/{}", ctx.tile.z, ctx.tile.x, ctx.tile.y),
168                "cache hit",
169            );
170            return Ok((v, key.0));
171        }
172        let t0 = Instant::now();
173        let value = node.eval(ctx, &input_vals)?;
174        let elapsed_us = t0.elapsed().as_micros();
175        tracing::debug!(
176            target: "ezu_graph::eval",
177            node = self.graph.node_id(ix),
178            op = node.op_name(),
179            cache = "miss",
180            output = %describe_value(&value),
181            tile = %format!("{}/{}/{}", ctx.tile.z, ctx.tile.x, ctx.tile.y),
182            elapsed_us,
183            "evaluated",
184        );
185        self.cache.insert(key, value.clone());
186        Ok((value, key.0))
187    }
188}
189
190/// One-line human-readable summary of a `PortValue` for debug logs.
191/// Keeps the format dense so node lines stay readable in a tail.
192fn describe_value(v: &PortValue) -> String {
193    match v {
194        PortValue::Raster(r) => format!("raster {}x{}", r.width, r.height),
195        PortValue::Sprite(s) => format!("sprite {}x{}", s.width, s.height),
196        PortValue::ScalarField(f) => format!(
197            "scalar-field {}x{} (mpp~{:.2})",
198            f.width,
199            f.height,
200            f.metres_per_pixel_x(),
201        ),
202        PortValue::Features(_) => "features".to_string(),
203        PortValue::Brush(_) => "brush".to_string(),
204        PortValue::Scalar(_) => format!("scalar:{}", PortKind::Scalar),
205    }
206}