weavegraph 0.3.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
#[macro_use]
extern crate proptest;

use proptest::prelude::{Strategy, prop};

// Generators shared by property tests for graphs and routing

/// Generate valid custom node names.
///
/// Constraints:
/// - Starts with a letter
/// - Followed by 0..16 of [A-Za-z0-9_]
/// - Excludes reserved endpoint names ("Start", "End")
fn node_name_strategy() -> impl Strategy<Value = String> {
    // Base regex for candidate names
    let base = prop::string::string_regex("[A-Za-z][A-Za-z0-9_]{0,16}").unwrap();
    // Filter out reserved endpoint names
    base.prop_filter("exclude reserved and reserved root name", |s| {
        s != "Start" && s != "End" && s != "Root"
    })
}

// Minimal sanity property using the generator (real graph properties will follow in later steps)
proptest! {
    #[test]
    fn prop_node_name_non_empty(name in node_name_strategy()) {
        prop_assert!(!name.is_empty());
        prop_assert!(name.chars().next().unwrap().is_ascii_alphabetic());
    }
}

mod common;
use common::*;

use proptest::prelude::any;
use rustc_hash::FxHashSet;
use std::sync::Arc;
use weavegraph::graphs::{EdgePredicate, GraphBuilder};
use weavegraph::runtimes::{AppRunner, CheckpointerType, SessionInit, StepOptions, StepResult};
use weavegraph::state::StateSnapshot;
use weavegraph::types::NodeKind;

fn block_on<F: std::future::Future<Output = ()>>(fut: F) {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(fut);
}

proptest! {
    #[test]
    fn prop_valid_only_predicate_targets(
        mut names in prop::collection::vec(node_name_strategy(), 1..8),
        include_end in any::<bool>(),
    ) {
        // Dedup names to avoid duplicate node registrations
        names.sort();
        names.dedup();

        block_on(async move {
            // Build graph: Start -> Root -> End; register all names
            let mut gb = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
                .add_edge(NodeKind::Custom("Root".into()), NodeKind::End);

            for n in &names {
                gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
            }

            // Predicate returns only registered names (+ optional End)
            let mut targets: Vec<String> = names.clone();
            if include_end { targets.push("End".into()); }
            let predicate: EdgePredicate = Arc::new(move |_snap| targets.clone());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), predicate);

            let app = gb.compile().unwrap();
            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            let initial = state_with_user("seed");
            match runner.create_session("sess_valid".into(), initial).await.unwrap() {
                SessionInit::Fresh => {}
                _ => panic!("expected fresh session"),
            }
            let report = runner.run_step("sess_valid", StepOptions::default()).await.unwrap();
            let rep = match report { StepResult::Completed(rep) => rep, _ => panic!("expected completed") };

            let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();

            // All predicate targets must appear (translated)
            let allowed: FxHashSet<_> = names.clone().into_iter().collect();
            for n in names.clone() {
                assert!(nf.contains(&NodeKind::Custom(n)));
            }
            if include_end { assert!(nf.contains(&NodeKind::End)); }

            // Frontier must not contain unknown custom nodes
            for k in nf {
                if let NodeKind::Custom(s) = k { assert!(allowed.contains(&s)); }
            }
        });
    }
}

proptest! {
    #[test]
    fn prop_mixed_valid_invalid_targets(
        mut valid in prop::collection::vec(node_name_strategy(), 1..6),
        mut invalid in prop::collection::vec(node_name_strategy(), 1..6),
    ) {
        // Ensure disjoint sets
        valid.sort(); valid.dedup();
        invalid.sort(); invalid.dedup();
        invalid.retain(|n| !valid.contains(n));
        prop_assume!(!valid.is_empty());
        prop_assume!(!invalid.is_empty());

        block_on(async move {
            let mut gb = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
                .add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
            for n in &valid { gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" }); }

            // Mixed targets: valid + invalid + maybe End
            let mut targets = valid.clone();
            targets.extend(invalid.clone());
            targets.push("End".into());
            let predicate: EdgePredicate = Arc::new(move |_snap| targets.clone());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), predicate);

            let app = gb.compile().unwrap();
            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            match runner.create_session("sess_mix".into(), state_with_user("x")).await.unwrap() {
                SessionInit::Fresh => {}, _ => panic!("fresh")
            }
            let rep = match runner.run_step("sess_mix", StepOptions::default()).await.unwrap() { StepResult::Completed(rep) => rep, _ => unreachable!() };

            let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
            // Valid appear
            for n in &valid { assert!(nf.contains(&NodeKind::Custom(n.clone()))); }
            assert!(nf.contains(&NodeKind::End));
            // Invalid never appear
            for n in &invalid { assert!(!nf.contains(&NodeKind::Custom(n.clone()))); }
        });
    }
}

proptest! {
    #[test]
    fn prop_stress_fan_out_dedup(
        mut pool in prop::collection::vec(node_name_strategy(), 2..16),
        fanout in 1usize..64,
    ) {
        pool.sort(); pool.dedup();

        block_on(async move {
            let mut gb = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
                .add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
            for n in &pool { gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" }); }

            // Build predicate outputs with duplicates
            let mut outs: Vec<String> = Vec::new();
            for i in 0..fanout { outs.push(pool[i % pool.len()].clone()); }
            // Include End occasionally
            if fanout % 2 == 0 { outs.push("End".into()); }
            let predicate: EdgePredicate = Arc::new(move |_snap| outs.clone());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), predicate);

            let app = gb.compile().unwrap();
            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            match runner.create_session("sess_fan".into(), state_with_user("y")).await.unwrap() {
                SessionInit::Fresh => {}, _ => panic!("fresh")
            }
            let rep = match runner.run_step("sess_fan", StepOptions::default()).await.unwrap() { StepResult::Completed(rep) => rep, _ => unreachable!() };

            // Count occurrences per custom node
            let mut counts = std::collections::HashMap::<String, usize>::new();
            for k in rep.next_frontier {
                if let NodeKind::Custom(s) = k { *counts.entry(s).or_insert(0) += 1; }
            }
            // Each targeted custom node should appear at most once
            for n in pool { assert!(counts.get(&n).cloned().unwrap_or(0) <= 1); }
        });
    }
}

// ============================================================================
// Additional property tests for conditional edge routing
// ============================================================================

/// Generate a valid key for extra data that can be used in predicates.
fn extra_key_strategy() -> impl Strategy<Value = String> {
    prop::string::string_regex("[a-z][a-z0-9_]{0,8}").unwrap()
}

proptest! {
    /// Property: Conditional edges based on extra data correctly route
    /// to the appropriate target nodes based on state content.
    #[test]
    fn prop_conditional_routing_based_on_extra_data(
        key in extra_key_strategy(),
        threshold in 0i64..100,
        value in 0i64..100,
    ) {
        block_on(async move {
            // Create predicate that routes based on extra data comparison
            let key_clone = key.clone();
            let predicate: EdgePredicate = Arc::new(move |snap: StateSnapshot| {
                if let Some(val) = snap.extra.get(&key_clone)
                    && let Some(num) = val.as_i64()
                        && num >= threshold {
                            return vec!["HighPath".to_string()];
                        }
                vec!["LowPath".to_string()]
            });

            let app = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_node(NodeKind::Custom("HighPath".into()), TestNode { name: "high" })
                .add_node(NodeKind::Custom("LowPath".into()), TestNode { name: "low" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
                .add_edge(NodeKind::Custom("Root".into()), NodeKind::End)
                .add_edge(NodeKind::Custom("HighPath".into()), NodeKind::End)
                .add_edge(NodeKind::Custom("LowPath".into()), NodeKind::End)
                .add_conditional_edge(NodeKind::Custom("Root".into()), predicate)
                .compile()
                .unwrap();

            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            let mut state = state_with_user("test");
            state.extra.get_mut().insert(key.clone(), serde_json::json!(value));

            runner.create_session("sess_cond".into(), state).await.unwrap();
            let rep = match runner.run_step("sess_cond", StepOptions::default()).await.unwrap() {
                StepResult::Completed(rep) => rep,
                _ => panic!("expected completed"),
            };

            let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();

            // Verify routing matches expectation (use assert! instead of prop_assert! in async)
            if value >= threshold {
                assert!(nf.contains(&NodeKind::Custom("HighPath".into())),
                    "expected HighPath when value {} >= threshold {}", value, threshold);
                assert!(!nf.contains(&NodeKind::Custom("LowPath".into())),
                    "unexpected LowPath when value {} >= threshold {}", value, threshold);
            } else {
                assert!(nf.contains(&NodeKind::Custom("LowPath".into())),
                    "expected LowPath when value {} < threshold {}", value, threshold);
                assert!(!nf.contains(&NodeKind::Custom("HighPath".into())),
                    "unexpected HighPath when value {} < threshold {}", value, threshold);
            }
        });
    }
}

proptest! {
    /// Property: Multiple conditional edges from the same source node
    /// independently evaluate and route to their respective targets.
    #[test]
    fn prop_multiple_conditional_edges_same_source(
        mut targets_a in prop::collection::vec(node_name_strategy(), 1..4),
        mut targets_b in prop::collection::vec(node_name_strategy(), 1..4),
    ) {
        // Ensure disjoint and dedup
        targets_a.sort(); targets_a.dedup();
        targets_b.sort(); targets_b.dedup();
        targets_b.retain(|n| !targets_a.contains(n));
        prop_assume!(!targets_a.is_empty());
        prop_assume!(!targets_b.is_empty());

        block_on(async move {
            let mut gb = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
                .add_edge(NodeKind::Custom("Root".into()), NodeKind::End);

            // Register all target nodes
            for n in targets_a.iter().chain(targets_b.iter()) {
                gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
            }

            // First conditional edge returns targets_a
            let ta = targets_a.clone();
            let pred_a: EdgePredicate = Arc::new(move |_| ta.clone());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred_a);

            // Second conditional edge returns targets_b
            let tb = targets_b.clone();
            let pred_b: EdgePredicate = Arc::new(move |_| tb.clone());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred_b);

            let app = gb.compile().unwrap();
            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            runner.create_session("sess_multi".into(), state_with_user("x")).await.unwrap();

            let rep = match runner.run_step("sess_multi", StepOptions::default()).await.unwrap() {
                StepResult::Completed(rep) => rep,
                _ => panic!("expected completed"),
            };

            let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();

            // Both sets of targets should appear in the frontier (use assert! in async)
            for n in &targets_a {
                assert!(nf.contains(&NodeKind::Custom(n.clone())),
                    "expected target_a node {} in frontier", n);
            }
            for n in &targets_b {
                assert!(nf.contains(&NodeKind::Custom(n.clone())),
                    "expected target_b node {} in frontier", n);
            }
        });
    }
}

proptest! {
    /// Property: Empty predicate result should not crash and should
    /// result in no additional frontier nodes from that edge.
    #[test]
    fn prop_empty_predicate_result_safe(
        mut registered in prop::collection::vec(node_name_strategy(), 1..5),
    ) {
        registered.sort(); registered.dedup();
        prop_assume!(!registered.is_empty());

        block_on(async move {
            let mut gb = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
                .add_edge(NodeKind::Custom("Root".into()), NodeKind::End);

            for n in &registered {
                gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
            }

            // Predicate that returns empty vec
            let pred: EdgePredicate = Arc::new(|_| Vec::new());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred);

            let app = gb.compile().unwrap();
            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            runner.create_session("sess_empty".into(), state_with_user("x")).await.unwrap();

            let rep = match runner.run_step("sess_empty", StepOptions::default()).await.unwrap() {
                StepResult::Completed(rep) => rep,
                _ => panic!("expected completed"),
            };

            let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();

            // Should only contain End from the unconditional edge, no custom nodes
            assert!(nf.contains(&NodeKind::End), "End should be in frontier");
            for n in &registered {
                assert!(!nf.contains(&NodeKind::Custom(n.clone())),
                    "registered node {} should not be in frontier with empty predicate", n);
            }
        });
    }
}

proptest! {
    /// Property: Predicate returning "End" explicitly adds End to frontier.
    #[test]
    fn prop_predicate_can_route_to_end(
        include_end in any::<bool>(),
        mut targets in prop::collection::vec(node_name_strategy(), 0..3),
    ) {
        targets.sort(); targets.dedup();

        block_on(async move {
            let mut gb = GraphBuilder::new()
                .add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
                .add_edge(NodeKind::Start, NodeKind::Custom("Root".into()));
            // Note: No unconditional edge to End from Root

            for n in &targets {
                gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
                gb = gb.add_edge(NodeKind::Custom(n.clone()), NodeKind::End);
            }

            let mut pred_targets = targets.clone();
            if include_end {
                pred_targets.push("End".to_string());
            }
            let pred: EdgePredicate = Arc::new(move |_| pred_targets.clone());
            gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred);

            let app = gb.compile().unwrap();
            let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
            runner.create_session("sess_end".into(), state_with_user("x")).await.unwrap();

            let rep = match runner.run_step("sess_end", StepOptions::default()).await.unwrap() {
                StepResult::Completed(rep) => rep,
                _ => panic!("expected completed"),
            };

            let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();

            // End should be in frontier iff include_end is true (use assert_eq! in async)
            assert_eq!(nf.contains(&NodeKind::End), include_end,
                "End presence in frontier should match include_end={}", include_end);

            // All valid custom targets should be in frontier
            for n in &targets {
                assert!(nf.contains(&NodeKind::Custom(n.clone())),
                    "target {} should be in frontier", n);
            }
        });
    }
}