agentwerk 0.1.8

A minimal Rust crate that gives any application agentic capabilities.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
//! Run many agents with a shared concurrency cap. `Batch::run` waits for a fixed set; `Batch::spawn` hands back a pool you can submit into while it's running.

use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_util::stream::{FuturesUnordered, Stream, StreamExt};
use tokio::sync::mpsc;

use crate::error::{AgenticError, Result};

use super::agent::Agent;
use super::output::AgentOutput;

const DEFAULT_CONCURRENCY: usize = 1;

/// Pool of agents that all share a concurrency cap. Build with
/// [`Batch::new`], chain [`concurrency`](Self::concurrency) and
/// [`agent`](Self::agent) / [`agents`](Self::agents), then finish with
/// [`run`](Self::run) (wait for all) or [`spawn`](Self::spawn) (dynamic pool).
pub struct Batch {
    concurrency: usize,
    agents: Vec<Agent>,
    cancel_signal: Option<Arc<AtomicBool>>,
}

impl Default for Batch {
    fn default() -> Self {
        Self {
            concurrency: DEFAULT_CONCURRENCY,
            agents: Vec::new(),
            cancel_signal: None,
        }
    }
}

impl Batch {
    pub fn new() -> Self {
        Self::default()
    }

    /// Cap on simultaneous in-flight agents. Clamped to at least 1.
    pub fn concurrency(mut self, n: usize) -> Self {
        self.concurrency = n.max(1);
        self
    }

    /// Add one agent to run.
    pub fn agent(mut self, agent: Agent) -> Self {
        self.agents.push(agent);
        self
    }

    /// Add many agents to run.
    pub fn agents<I>(mut self, agents: I) -> Self
    where
        I: IntoIterator<Item = Agent>,
    {
        self.agents.extend(agents);
        self
    }

    /// Share an external cancel signal with the pool. Every submitted agent
    /// uses it, and [`BatchHandle::cancel`] writes to it. Useful when the
    /// caller already owns a signal (e.g. wired to Ctrl-C) and wants in-flight
    /// agents to observe it.
    pub fn cancel_signal(mut self, signal: Arc<AtomicBool>) -> Self {
        self.cancel_signal = Some(signal);
        self
    }

    /// Run every added agent to completion. Returns results in **submission**
    /// order: `results[i]` corresponds to the `i`th agent added via
    /// [`agent`](Self::agent) / [`agents`](Self::agents). A failing agent does
    /// not abort the others.
    pub async fn run(self) -> Vec<Result<AgentOutput>> {
        let total = self.agents.len();
        let (handle, stream) = self.spawn();
        handle.drain();

        let mut slots: Vec<Option<Result<AgentOutput>>> = (0..total).map(|_| None).collect();
        for (index, result) in stream.collect().await {
            if index < slots.len() {
                slots[index] = Some(result);
            }
        }
        slots
            .into_iter()
            .enumerate()
            .map(|(i, slot)| {
                slot.unwrap_or_else(|| {
                    Err(AgenticError::Other(format!(
                        "batch: missing result for submission index {i}"
                    )))
                })
            })
            .collect()
    }

    /// Start a dispatcher on a background tokio task and return a pair:
    ///
    /// - [`BatchHandle`] — cheap, clonable handle for submitting more agents
    ///   or cancelling.
    /// - [`BatchOutputStream`] — yields
    ///   `(submission_index, Result<AgentOutput>)` in completion order. The
    ///   `submission_index` matches the position the agent was added:
    ///   preloaded [`agents`](Self::agents) take indices `0..n`, then dynamic
    ///   [`submit`](BatchHandle::submit) calls continue the sequence. Ends
    ///   once all handles are dropped or [`drain`](BatchHandle::drain)ed (let
    ///   in-flight finish), or [`cancel`](BatchHandle::cancel) is called
    ///   (interrupt in-flight) and the backlog completes.
    ///
    /// Requires a running tokio runtime.
    pub fn spawn(self) -> (BatchHandle, BatchOutputStream) {
        let concurrency = self.concurrency;
        let (submit_tx, submit_rx) = mpsc::unbounded_channel::<(usize, Agent)>();
        let (output_tx, output_rx) = mpsc::unbounded_channel::<(usize, Result<AgentOutput>)>();
        let cancel = self
            .cancel_signal
            .unwrap_or_else(|| Arc::new(AtomicBool::new(false)));
        let counter = Arc::new(AtomicUsize::new(0));

        for agent in self.agents {
            let index = counter.fetch_add(1, Ordering::Relaxed);
            let _ = submit_tx.send((index, agent));
        }

        let dispatcher_cancel = cancel.clone();
        tokio::spawn(async move {
            dispatch(submit_rx, output_tx, concurrency, dispatcher_cancel).await;
        });

        let handle = BatchHandle {
            sender: submit_tx,
            cancel,
            counter,
        };
        let output = BatchOutputStream { rx: output_rx };
        (handle, output)
    }
}

async fn dispatch(
    mut submit_rx: mpsc::UnboundedReceiver<(usize, Agent)>,
    output_tx: mpsc::UnboundedSender<(usize, Result<AgentOutput>)>,
    concurrency: usize,
    cancel: Arc<AtomicBool>,
) {
    let mut in_flight: FuturesUnordered<tokio::task::JoinHandle<(usize, Result<AgentOutput>)>> =
        FuturesUnordered::new();
    let mut closed = false;

    loop {
        if cancel.load(Ordering::Relaxed) && !closed {
            submit_rx.close();
            closed = true;
        }

        tokio::select! {
            biased;
            Some(join) = in_flight.next(), if !in_flight.is_empty() => {
                // A task-level JoinError means the spawned future panicked or was
                // aborted — the submission index is then unrecoverable, so the slot
                // will be backfilled with a synthetic error by `Batch::run`.
                if let Ok(pair) = join {
                    let _ = output_tx.send(pair);
                }
            }
            maybe = submit_rx.recv(), if !closed && in_flight.len() < concurrency => {
                let Some((index, agent)) = maybe else {
                    closed = true;
                    continue;
                };
                let agent = agent.cancel_signal(cancel.clone());
                in_flight.push(tokio::spawn(async move {
                    (index, agent.run().await)
                }));
            }
            else => return,
        }
    }
}

/// Cheap, clonable handle to a running [`Batch`] pool. Obtained from
/// [`Batch::spawn`].
///
/// While any clone of the handle is alive, the pool accepts new submissions.
/// Dropping the last clone (or calling [`drain`](Self::drain) on it) closes
/// the pool gracefully: queued and in-flight agents finish, then the output
/// stream ends. Use [`cancel`](Self::cancel) to interrupt instead.
#[derive(Clone)]
pub struct BatchHandle {
    sender: mpsc::UnboundedSender<(usize, Agent)>,
    cancel: Arc<AtomicBool>,
    counter: Arc<AtomicUsize>,
}

impl BatchHandle {
    /// Enqueue another agent for the pool. Returns the submission index that
    /// will accompany this agent's result on the [`BatchOutputStream`].
    /// Indices are assigned monotonically and continue the sequence begun by
    /// the preloaded [`Batch::agents`] / [`Batch::agent`] calls. If the pool
    /// has already been cancelled or the dispatcher has exited the agent is
    /// silently dropped; the returned index is still reserved but no result
    /// will arrive for it.
    pub fn submit(&self, agent: Agent) -> usize {
        let index = self.counter.fetch_add(1, Ordering::Relaxed);
        let _ = self.sender.send((index, agent));
        index
    }

    /// Signal all in-flight agents to stop (via their `cancel_signal`) and
    /// stop the dispatcher from pulling new submissions. In-flight agents
    /// observe the flag at their next turn boundary; the stream ends once
    /// they complete.
    ///
    /// The pool owns one cancel signal and sets it on every submitted agent,
    /// overriding any per-agent signal the caller attached. To share an
    /// external signal with the pool, pass it to
    /// [`Batch::cancel_signal`](Batch::cancel_signal).
    pub fn cancel(&self) {
        self.cancel.store(true, Ordering::Relaxed);
    }

    /// Returns `true` if [`cancel`](Self::cancel) has been called.
    pub fn is_cancelled(&self) -> bool {
        self.cancel.load(Ordering::Relaxed)
    }

    /// Release this handle. When the last clone is gone, the dispatcher
    /// flushes in-flight agents to completion and ends the output stream.
    /// Non-blocking: results still arrive on the [`BatchOutputStream`]. Sugar
    /// for `drop(handle)`, but visible at the call site — pairs with
    /// [`cancel`](Self::cancel) (interrupt) to name the two exit modes.
    pub fn drain(self) {}
}

/// Stream of per-agent results from a [`Batch::spawn`] pool. Yields
/// `(submission_index, Result<AgentOutput>)` in completion order. The
/// `submission_index` matches the position the agent was added — preloaded
/// [`Batch::agents`] first, then dynamic [`BatchHandle::submit`] calls — so
/// the caller can correlate a streamed result back to its input without
/// inspecting [`AgentOutput::name`]. Ends once the pool is closed (all
/// handles dropped, [`drain`](BatchHandle::drain)ed, or
/// [`cancel`](BatchHandle::cancel)led) and the backlog completes.
pub struct BatchOutputStream {
    rx: mpsc::UnboundedReceiver<(usize, Result<AgentOutput>)>,
}

impl Stream for BatchOutputStream {
    type Item = (usize, Result<AgentOutput>);

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.rx.poll_recv(cx)
    }
}

impl BatchOutputStream {
    /// Collect every remaining result in completion order.
    pub async fn collect(self) -> Vec<(usize, Result<AgentOutput>)> {
        StreamExt::collect(self).await
    }

    /// Await the next result, or `None` once the pool has drained.
    pub async fn next(&mut self) -> Option<(usize, Result<AgentOutput>)> {
        StreamExt::next(self).await
    }
}

impl Unpin for BatchOutputStream {}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::testutil::{text_response, tool_response, MockProvider};
    use crate::tools::{Tool, ToolResult};
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::time::Duration;

    fn agent_with_response(name: &str, text: &str) -> Agent {
        Agent::new()
            .name(name)
            .model_name("mock")
            .identity_prompt("")
            .instruction_prompt("go")
            .provider(Arc::new(MockProvider::text(text)))
    }

    fn agent_with_delay(name: &str, delay_ms: u64, text: &str) -> Agent {
        let slow_tool = Tool::new("slow", "simulates work")
            .schema(serde_json::json!({"type": "object", "properties": {}}))
            .handler(move |_, _| {
                Box::pin(async move {
                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
                    Ok(ToolResult::success("done"))
                })
            });

        let provider = Arc::new(MockProvider::new(vec![
            tool_response("slow", "c1", serde_json::json!({})),
            text_response(text),
        ]));

        Agent::new()
            .name(name)
            .model_name("mock")
            .identity_prompt("")
            .instruction_prompt("go")
            .tool(slow_tool)
            .provider(provider)
    }

    #[tokio::test]
    async fn empty_run_yields_empty_vec() {
        let results = Batch::new().concurrency(4).run().await;
        assert!(results.is_empty());
    }

    #[tokio::test]
    async fn run_returns_results_in_submission_order() {
        let results = Batch::new()
            .concurrency(4)
            .agents(["a", "b", "c"].iter().map(|n| agent_with_response(n, "ok")))
            .run()
            .await;
        assert_eq!(results.len(), 3);
        let names: Vec<String> = results
            .iter()
            .map(|r| r.as_ref().unwrap().name.clone())
            .collect();
        assert_eq!(names, vec!["a", "b", "c"]);
    }

    #[tokio::test]
    async fn run_submission_order_ignores_completion_order() {
        // First submitted agent finishes last; result must still land at index 0.
        let slow = agent_with_delay("slow", 80, "slow");
        let fast = agent_with_response("fast", "fast");

        let results = Batch::new()
            .concurrency(4)
            .agent(slow)
            .agent(fast)
            .run()
            .await;
        assert_eq!(results[0].as_ref().unwrap().name, "slow");
        assert_eq!(results[1].as_ref().unwrap().name, "fast");
    }

    #[tokio::test]
    async fn run_surfaces_failures_without_blocking_others() {
        let failing = Agent::new()
            .name("fail")
            .model_name("mock")
            .identity_prompt("")
            .instruction_prompt("go")
            .provider(Arc::new(MockProvider::new(vec![])));

        let results = Batch::new()
            .concurrency(2)
            .agent(agent_with_response("ok1", "first"))
            .agent(failing)
            .agent(agent_with_response("ok2", "second"))
            .run()
            .await;
        assert_eq!(results.len(), 3);
        assert!(results[0].is_ok());
        assert!(results[1].is_err());
        assert!(results[2].is_ok());
    }

    #[tokio::test]
    async fn stream_yields_submission_indices() {
        let (pool, mut stream) = Batch::new()
            .concurrency(4)
            .agents(["a", "b", "c"].iter().map(|n| agent_with_response(n, "ok")))
            .spawn();
        drop(pool);

        let mut seen: Vec<(usize, String)> = Vec::new();
        while let Some((index, result)) = stream.next().await {
            seen.push((index, result.unwrap().name));
        }
        seen.sort_by_key(|(i, _)| *i);
        assert_eq!(
            seen,
            vec![(0, "a".into()), (1, "b".into()), (2, "c".into()),],
        );
    }

    #[tokio::test]
    async fn submit_returns_monotonic_indices_continuing_preloaded() {
        let (pool, mut stream) = Batch::new()
            .concurrency(4)
            .agent(agent_with_response("preloaded", "ok"))
            .spawn();

        let idx_b = pool.submit(agent_with_response("b", "ok"));
        let idx_c = pool.submit(agent_with_response("c", "ok"));
        assert_eq!(idx_b, 1);
        assert_eq!(idx_c, 2);

        drop(pool);
        let mut seen = Vec::new();
        while let Some((i, _)) = stream.next().await {
            seen.push(i);
        }
        seen.sort();
        assert_eq!(seen, vec![0, 1, 2]);
    }

    #[tokio::test]
    async fn concurrency_cap_bounds_parallelism() {
        let running = Arc::new(AtomicUsize::new(0));
        let max_concurrent = Arc::new(AtomicUsize::new(0));

        let make = |i: usize| {
            let r = running.clone();
            let m = max_concurrent.clone();
            let slow_tool = Tool::new("slow", "work")
                .schema(serde_json::json!({"type": "object", "properties": {}}))
                .handler(move |_, _| {
                    let r = r.clone();
                    let m = m.clone();
                    Box::pin(async move {
                        let cur = r.fetch_add(1, Ordering::SeqCst) + 1;
                        m.fetch_max(cur, Ordering::SeqCst);
                        tokio::time::sleep(Duration::from_millis(30)).await;
                        r.fetch_sub(1, Ordering::SeqCst);
                        Ok(ToolResult::success("done"))
                    })
                });
            Agent::new()
                .name(&format!("w{i}"))
                .model_name("mock")
                .identity_prompt("")
                .instruction_prompt("go")
                .tool(slow_tool)
                .provider(Arc::new(MockProvider::new(vec![
                    tool_response("slow", "c1", serde_json::json!({})),
                    text_response("finished"),
                ])))
        };

        let results = Batch::new()
            .concurrency(3)
            .agents((0..10).map(make))
            .run()
            .await;
        assert_eq!(results.len(), 10);
        assert!(results.iter().all(|r| r.is_ok()));
        let peak = max_concurrent.load(Ordering::SeqCst);
        assert!(peak <= 3, "peak concurrency {peak} exceeded cap of 3");
        assert!(
            peak >= 2,
            "peak concurrency {peak} never reached meaningful overlap"
        );
    }

    #[tokio::test]
    async fn concurrency_scales_throughput() {
        let start = tokio::time::Instant::now();
        let seq = Batch::new()
            .concurrency(1)
            .agents((0..10).map(|i| agent_with_delay("w", 30, &format!("r{i}"))))
            .run()
            .await;
        let seq_elapsed = start.elapsed();

        let start = tokio::time::Instant::now();
        let par = Batch::new()
            .concurrency(10)
            .agents((0..10).map(|i| agent_with_delay("w", 30, &format!("r{i}"))))
            .run()
            .await;
        let par_elapsed = start.elapsed();

        assert_eq!(seq.len(), 10);
        assert_eq!(par.len(), 10);
        assert!(
            seq_elapsed > par_elapsed * 3,
            "sequential ({seq_elapsed:?}) should dwarf parallel ({par_elapsed:?})",
        );
    }

    #[tokio::test]
    async fn high_throughput_smoke() {
        let results = Batch::new()
            .concurrency(50)
            .agents((0..500).map(|i| agent_with_response("w", &format!("r{i}"))))
            .run()
            .await;
        assert_eq!(results.len(), 500);
        assert!(results.iter().all(|r| r.is_ok()));
    }

    #[tokio::test]
    async fn spawn_accepts_dynamic_submissions() {
        let (pool, mut stream) = Batch::new().concurrency(2).spawn();
        pool.submit(agent_with_response("a", "first"));
        pool.submit(agent_with_response("b", "second"));

        let r1 = stream.next().await.expect("first result");
        let r2 = stream.next().await.expect("second result");

        pool.submit(agent_with_response("c", "third"));
        drop(pool);

        let r3 = stream.next().await.expect("third result");
        assert!(stream.next().await.is_none(), "stream must end after drop");

        let mut names: Vec<String> = [r1, r2, r3]
            .into_iter()
            .map(|(_, r)| r.unwrap().name)
            .collect();
        names.sort();
        assert_eq!(names, vec!["a", "b", "c"]);
    }

    #[tokio::test]
    async fn spawn_keeps_stream_open_while_any_handle_lives() {
        let (pool, mut stream) = Batch::new().concurrency(4).spawn();
        let clone = pool.clone();
        pool.submit(agent_with_response("a", "done"));
        drop(pool);
        assert!(stream.next().await.unwrap().1.is_ok());
        clone.submit(agent_with_response("b", "done"));
        assert!(stream.next().await.unwrap().1.is_ok());
        drop(clone);
        assert!(stream.next().await.is_none());
    }

    #[tokio::test]
    async fn spawn_drops_handle_drains_backlog_and_ends_stream() {
        let (pool, mut stream) = Batch::new().concurrency(2).spawn();
        pool.submit(agent_with_response("a", "done"));
        pool.submit(agent_with_response("b", "done"));
        drop(pool);

        let mut seen = 0;
        while let Some((_, r)) = stream.next().await {
            r.unwrap();
            seen += 1;
        }
        assert_eq!(seen, 2);
    }

    #[tokio::test]
    async fn drain_lets_in_flight_agents_finish_unlike_cancel() {
        let (pool, mut stream) = Batch::new().concurrency(2).spawn();
        pool.submit(agent_with_delay("a", 30, "done"));
        pool.submit(agent_with_delay("b", 30, "done"));
        pool.drain();

        let mut seen = 0;
        while let Some((_, r)) = stream.next().await {
            let out = r.unwrap();
            assert_eq!(out.status, crate::agent::AgentStatus::Completed);
            seen += 1;
        }
        assert_eq!(seen, 2);
    }

    #[tokio::test]
    async fn spawn_cancel_stops_in_flight_agents() {
        let (pool, mut stream) = Batch::new().concurrency(2).spawn();
        pool.submit(agent_with_delay("slow", 200, "never"));

        tokio::time::sleep(Duration::from_millis(20)).await;
        pool.cancel();

        let (_, result) = stream.next().await.expect("result after cancel");
        let out = result.unwrap();
        assert_eq!(out.status, crate::agent::AgentStatus::Cancelled);
        assert!(pool.is_cancelled());
        drop(pool);
        assert!(stream.next().await.is_none());
    }

    #[tokio::test]
    async fn preloaded_agents_run_without_explicit_submit() {
        let (pool, stream) = Batch::new()
            .concurrency(2)
            .agents(["a", "b"].iter().map(|n| agent_with_response(n, "ok")))
            .spawn();
        drop(pool);
        let results = stream.collect().await;
        assert_eq!(results.len(), 2);
    }
}