tinyagents 0.2.0

A recursive language-model (RLM) harness for Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
//! Durable graph builder and compile contract.
//!
//! This is the authoring entry point for the recursive graph runtime: a
//! [`GraphBuilder`] accumulates nodes, edges, conditional routing, and a reducer,
//! and [`GraphBuilder::compile`] validates that topology and freezes it into an
//! immutable [`crate::graph::CompiledGraph`]. Because a node handler can itself
//! drive another compiled graph or a sub-agent, the same builder API is what
//! both hand-written Rust and model-authored `.rag`/`.ragsh` programs lower into
//! when they assemble a workflow that may recurse into sub-workflows.
//!
//! See [`types`] for the builder data types. `compile` validates the topology
//! and freezes it into an immutable [`crate::graph::CompiledGraph`].

mod types;

pub(crate) use types::{Branch, BuilderNode, NodeMeta};
pub use types::{
    END, ForkId, GraphBuilder, GraphDefaults, NodeContext, NodeFuture, NodeHandler, Route,
    RouterFn, START,
};

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::Arc;

use crate::graph::command::NodeResult;
use crate::graph::compiled::CompiledGraph;
use crate::graph::reducer::{OverwriteStateReducer, StateReducer};
use crate::harness::ids::{GraphId, NodeId};
use crate::{Result, TinyAgentsError};

impl<State, Update> Default for GraphBuilder<State, Update>
where
    State: Clone + Send + Sync + 'static,
    Update: Send + 'static,
{
    fn default() -> Self {
        Self::new()
    }
}

impl<State, Update> GraphBuilder<State, Update>
where
    State: Clone + Send + Sync + 'static,
    Update: Send + 'static,
{
    /// Creates an empty builder with a generated graph id and a default
    /// recursion limit of 50. A reducer must be set before [`Self::compile`].
    pub fn new() -> Self {
        Self {
            graph_id: GraphId::new(format!("graph-{}", crate::graph::compiled::next_seq())),
            name: None,
            nodes: HashMap::new(),
            edges: HashMap::new(),
            branches: HashMap::new(),
            command_nodes: HashSet::new(),
            waiting: HashMap::new(),
            reducer: None,
            recursion_limit: 50,
            parallel: false,
            max_concurrency: None,
            node_timeout: None,
            node_meta: HashMap::new(),
        }
    }

    /// Applies a bundle of [`GraphDefaults`] in one call. Only the `Some` fields
    /// override the builder's current configuration, so this composes with
    /// explicit `with_*` calls regardless of ordering.
    pub fn set_defaults(mut self, defaults: GraphDefaults) -> Self {
        if let Some(limit) = defaults.recursion_limit {
            self.recursion_limit = limit;
        }
        if let Some(parallel) = defaults.parallel {
            self.parallel = parallel;
        }
        if let Some(max) = defaults.max_concurrency {
            self.max_concurrency = Some(max);
        }
        if let Some(timeout) = defaults.node_timeout {
            self.node_timeout = Some(timeout);
        }
        self
    }

    /// Bounds the number of branches run concurrently within a single superstep
    /// (only meaningful with [`Self::with_parallel`] enabled). The executor runs
    /// the active set in chunks of at most `n` futures, so at most `n` node
    /// handlers are in flight at once. `0` is treated as unbounded.
    pub fn with_max_concurrency(mut self, n: usize) -> Self {
        self.max_concurrency = (n > 0).then_some(n);
        self
    }

    /// Sets a default wall-clock timeout applied to every node handler. A node
    /// whose future does not resolve within `timeout` fails the run with
    /// [`crate::TinyAgentsError::Timeout`].
    pub fn with_node_timeout(mut self, timeout: std::time::Duration) -> Self {
        self.node_timeout = Some(timeout);
        self
    }

    /// Enables or disables concurrent execution of the active node set within a
    /// superstep. Defaults to `false` (sequential), which preserves the exact
    /// milestone-1 ordering and semantics.
    ///
    /// When enabled, a superstep with more than one active node runs every
    /// branch concurrently via `futures::future::join_all`. Each branch gets its
    /// own cloned `State` snapshot (`State: Clone`) and a distinct
    /// [`ForkId`] on its [`NodeContext`]. Branch results are still folded into
    /// the reducer in deterministic active-set order at the step boundary, so a
    /// downstream node always observes the same merged state regardless of which
    /// branch finished first. See [`crate::graph::CompiledGraph`] for the full
    /// concurrency and interrupt semantics.
    pub fn with_parallel(mut self, parallel: bool) -> Self {
        self.parallel = parallel;
        self
    }

    /// Overrides the graph id.
    pub fn with_graph_id(mut self, id: impl Into<GraphId>) -> Self {
        self.graph_id = id.into();
        self
    }

    /// Sets an optional human-readable graph name surfaced by the topology
    /// export. The `graph_id` remains the stable identifier; the name is purely
    /// descriptive (e.g. for diagrams a model authored).
    pub fn with_name(mut self, name: impl Into<String>) -> Self {
        self.name = Some(name.into());
        self
    }

    /// Overrides the recursion limit (max number of supersteps).
    pub fn with_recursion_limit(mut self, limit: usize) -> Self {
        self.recursion_limit = limit;
        self
    }

    /// Sets the state reducer used to merge partial updates at step boundaries.
    pub fn set_reducer<R>(mut self, reducer: R) -> Self
    where
        R: StateReducer<State, Update> + 'static,
    {
        self.reducer = Some(Arc::new(reducer));
        self
    }

    /// Adds an async node returning a [`NodeResult`].
    pub fn add_node<F, Fut>(mut self, id: impl Into<NodeId>, handler: F) -> Self
    where
        F: Fn(State, NodeContext) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<NodeResult<Update>>> + Send + 'static,
    {
        let id = id.into();
        self.nodes.insert(
            id.clone(),
            BuilderNode {
                id,
                handler: Arc::new(move |state, ctx| Box::pin(handler(state, ctx))),
            },
        );
        self
    }

    /// Adds a direct edge `from -> to`. Use [`START`]/[`END`] for the virtual
    /// entry/terminal nodes.
    pub fn add_edge(mut self, from: impl Into<NodeId>, to: impl Into<NodeId>) -> Self {
        self.edges.insert(from.into(), to.into());
        self
    }

    /// Adds a chain of direct edges over the given nodes: `add_sequence([a, b,
    /// c])` is equivalent to `add_edge(a, b).add_edge(b, c)`. The nodes must
    /// already have been added with [`Self::add_node`]; this only wires edges. A
    /// sequence of fewer than two nodes adds no edges.
    pub fn add_sequence<I, N>(mut self, nodes: I) -> Self
    where
        I: IntoIterator<Item = N>,
        N: Into<NodeId>,
    {
        let nodes: Vec<NodeId> = nodes.into_iter().map(Into::into).collect();
        for pair in nodes.windows(2) {
            self.edges.insert(pair[0].clone(), pair[1].clone());
        }
        self
    }

    /// Adds a barrier/waiting edge `from -> to`: like [`Self::add_edge`] but `to`
    /// only activates once *all* of its registered predecessors (every `from`
    /// declared via `add_waiting_edge`) have completed — possibly across
    /// different supersteps. This is the join/fan-in primitive for diamond
    /// topologies where several branches must finish before a synthesis node
    /// runs. Calling it repeatedly with the same `to` accumulates the required
    /// predecessor set.
    pub fn add_waiting_edge(mut self, from: impl Into<NodeId>, to: impl Into<NodeId>) -> Self {
        let from = from.into();
        let to = to.into();
        self.edges.insert(from.clone(), to.clone());
        self.waiting.entry(to).or_default().insert(from);
        self
    }

    /// Sets the entry node, i.e. `add_edge(START, node)`.
    pub fn set_entry(self, node: impl Into<NodeId>) -> Self {
        self.add_edge(START, node)
    }

    /// Marks `node` as terminal, i.e. `add_edge(node, END)`.
    pub fn set_finish(self, node: impl Into<NodeId>) -> Self {
        self.add_edge(node, END)
    }

    /// Adds conditional edges: a router closure mapped against a label table.
    ///
    /// Both the router's return value and the route-table labels are
    /// `impl ToString`, so a user-defined route enum that implements `Display`
    /// (or the [`Route`] newtype) can be used directly without manual
    /// `.to_string()` calls. Plain `&str`/`String` labels still work unchanged —
    /// the label is resolved against the table by its string form at the step
    /// boundary.
    pub fn add_conditional_edges<F, R, I, K, V>(
        mut self,
        from: impl Into<NodeId>,
        router: F,
        routes: I,
    ) -> Self
    where
        F: Fn(&State) -> R + Send + Sync + 'static,
        R: ToString,
        I: IntoIterator<Item = (K, V)>,
        K: ToString,
        V: Into<NodeId>,
    {
        let routes = routes
            .into_iter()
            .map(|(k, v)| (k.to_string(), v.into()))
            .collect();
        self.branches.insert(
            from.into(),
            Branch {
                router: Arc::new(move |state| router(state).to_string()),
                routes,
            },
        );
        self
    }

    /// Declares that `node` routes exclusively via [`crate::graph::Command`]
    /// `goto` (not static or conditional edges). Compile rejects nodes that mix
    /// command routing with static/conditional edges.
    pub fn mark_command_routing(mut self, node: impl Into<NodeId>) -> Self {
        self.command_nodes.insert(node.into());
        self
    }

    /// Records the declared `goto` destination hints for a command-routing node.
    ///
    /// These are advisory only — the runtime always resolves the real successor
    /// from the [`crate::graph::Command`] a node emits — but they let the
    /// [topology export](crate::graph::export) draw and validate the set of
    /// nodes a command node may jump to. Implies [`Self::mark_command_routing`].
    pub fn with_command_destinations<I, N>(
        mut self,
        node: impl Into<NodeId>,
        destinations: I,
    ) -> Self
    where
        I: IntoIterator<Item = N>,
        N: Into<NodeId>,
    {
        let node = node.into();
        self.command_nodes.insert(node.clone());
        let dests = destinations.into_iter().map(Into::into).collect();
        self.node_meta.entry(node).or_default().command_destinations = dests;
        self
    }

    /// Sets a human-readable kind for `node` (e.g. `model`, `tool`, `subgraph`)
    /// surfaced as [`crate::graph::NodeInfo::kind`] in the export.
    pub fn with_node_kind(mut self, node: impl Into<NodeId>, kind: impl Into<String>) -> Self {
        self.node_meta.entry(node.into()).or_default().kind = Some(kind.into());
        self
    }

    /// Records a sorted, free-form `key=value` annotation on `node`, carried
    /// verbatim into [`crate::graph::NodeInfo::metadata`].
    pub fn with_node_metadata(
        mut self,
        node: impl Into<NodeId>,
        key: impl Into<String>,
        value: impl Into<String>,
    ) -> Self {
        self.node_meta
            .entry(node.into())
            .or_default()
            .metadata
            .insert(key.into(), value.into());
        self
    }

    /// Marks `node` as a subgraph-embedding node for the export (an introspection
    /// marker only; it does not change how the node executes).
    pub fn mark_subgraph(mut self, node: impl Into<NodeId>) -> Self {
        self.node_meta.entry(node.into()).or_default().subgraph = true;
        self
    }

    /// Marks `node` as an interrupt point for the export.
    pub fn mark_interrupt(mut self, node: impl Into<NodeId>) -> Self {
        self.node_meta.entry(node.into()).or_default().interrupt = true;
        self
    }

    /// Marks `node` as a deferred join for the export.
    pub fn mark_deferred(mut self, node: impl Into<NodeId>) -> Self {
        self.node_meta.entry(node.into()).or_default().deferred = true;
        self
    }

    /// Validates topology and freezes the graph into a [`CompiledGraph`].
    pub fn compile(self) -> Result<CompiledGraph<State, Update>> {
        if self.reducer.is_none() {
            return Err(TinyAgentsError::Validation(
                "no state reducer set; call set_reducer (or GraphBuilder::overwrite)".to_string(),
            ));
        }

        // entry must exist
        let entry = self
            .edges
            .get(&NodeId::from(START))
            .cloned()
            .ok_or(TinyAgentsError::MissingStart)?;
        if entry.as_str() == END {
            return Err(TinyAgentsError::Validation(
                "START cannot route directly to END".to_string(),
            ));
        }
        self.require_node(&entry)?;

        // static edges
        for (from, to) in &self.edges {
            if from.as_str() != START {
                self.require_node(from)?;
            }
            if to.as_str() != END {
                self.require_node(to)?;
            }
            if to.as_str() == START {
                return Err(TinyAgentsError::Validation(
                    "START cannot be an edge target".to_string(),
                ));
            }
            if from.as_str() == END {
                return Err(TinyAgentsError::Validation(
                    "END cannot be an edge source".to_string(),
                ));
            }
        }

        // conditional edges
        for (from, branch) in &self.branches {
            self.require_node(from)?;
            if self.edges.contains_key(from) {
                return Err(TinyAgentsError::Validation(format!(
                    "node `{from}` has both a static edge and conditional edges"
                )));
            }
            for target in branch.routes.values() {
                if target.as_str() != END {
                    self.require_node(target)?;
                }
            }
        }

        // barrier/waiting edges: every source and target must exist
        for (to, froms) in &self.waiting {
            self.require_node(to)?;
            for from in froms {
                self.require_node(from)?;
            }
        }

        // command-routing nodes must not also have static/conditional edges
        for node in &self.command_nodes {
            self.require_node(node)?;
            if self.edges.contains_key(node) || self.branches.contains_key(node) {
                return Err(TinyAgentsError::Validation(format!(
                    "node `{node}` declares command routing but also has static/conditional edges"
                )));
            }
        }

        let Self {
            graph_id,
            name,
            nodes,
            edges,
            branches,
            command_nodes,
            waiting,
            reducer,
            recursion_limit,
            parallel,
            max_concurrency,
            node_timeout,
            node_meta,
        } = self;

        Ok(CompiledGraph::from_parts(
            graph_id,
            name,
            nodes,
            edges,
            branches,
            command_nodes,
            waiting,
            entry,
            reducer.expect("reducer presence checked above"),
            recursion_limit,
            parallel,
            max_concurrency,
            node_timeout,
            node_meta,
        ))
    }

    fn require_node(&self, id: &NodeId) -> Result<()> {
        if self.nodes.contains_key(id) {
            Ok(())
        } else {
            Err(TinyAgentsError::MissingNode(id.to_string()))
        }
    }
}

impl<State> GraphBuilder<State, State>
where
    State: Clone + Send + Sync + 'static,
{
    /// Creates a builder that uses whole-state overwrite updates — the
    /// milestone-1 default where each node returns the full next state.
    pub fn overwrite() -> Self {
        Self::new().set_reducer(OverwriteStateReducer)
    }
}

#[cfg(test)]
mod test;