agentwerk 0.1.7

A minimal Rust crate that gives any application agentic capabilities.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
//! Starts an agent on a background task and returns a handle + future pair, so the caller can send instructions, cancel, or await the result without blocking.

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use tokio::task::JoinHandle;

use crate::agent::agent::Agent;
use crate::agent::output::AgentOutput;
use crate::agent::queue::{CommandQueue, CommandSource, QueuePriority, QueuedCommand};
use crate::error::{AgenticError, Result};

/// Shared atomic state between every [`AgentHandle`] clone, the
/// [`AgentOutputFuture`], and the running loop.
pub(crate) struct HandleState {
    pub(crate) queue: Arc<CommandQueue>,
    pub(crate) cancel: Arc<AtomicBool>,
}

/// Drop-detection token: every handle and the future own a clone. When the
/// last one drops, `Drop::drop` sets the shared cancel flag, so the loop
/// exits on its next poll.
pub(crate) struct LifeToken {
    cancel: Arc<AtomicBool>,
}

impl LifeToken {
    pub(crate) fn new(cancel: Arc<AtomicBool>) -> Arc<Self> {
        Arc::new(Self { cancel })
    }
}

impl Drop for LifeToken {
    fn drop(&mut self) {
        self.cancel.store(true, Ordering::Relaxed);
    }
}

/// Cheap, clonable handle to an agent whose loop runs on a background tokio
/// task. Obtained from [`Agent::spawn`](crate::Agent::spawn).
///
/// While any clone of the handle is alive, the loop idles after producing
/// output; dropping the last clone (or calling [`cancel`](Self::cancel))
/// signals the loop to exit.
#[derive(Clone)]
pub struct AgentHandle {
    state: Arc<HandleState>,
    _life: Arc<LifeToken>,
}

impl AgentHandle {
    pub(crate) fn new(state: Arc<HandleState>, life: Arc<LifeToken>) -> Self {
        Self { state, _life: life }
    }

    /// Deliver a new instruction to the running agent. Picked up at the next
    /// turn boundary, or immediately if the agent is parked idle.
    pub fn send(&self, instruction: impl Into<String>) {
        self.state.queue.enqueue(QueuedCommand {
            content: instruction.into(),
            priority: QueuePriority::Next,
            source: CommandSource::UserInput,
            agent_name: None,
        });
    }

    /// Signal the agent to stop. The loop observes this at the next turn
    /// boundary or idle-wait poll and exits.
    pub fn cancel(&self) {
        self.state.cancel.store(true, Ordering::Relaxed);
    }

    /// Returns `true` if a cancel signal has been raised (explicitly via
    /// [`cancel`](Self::cancel) or implicitly via the last handle being
    /// dropped).
    pub fn is_cancelled(&self) -> bool {
        self.state.cancel.load(Ordering::Relaxed)
    }

    #[cfg(test)]
    pub(crate) fn queue_for_test(&self) -> Arc<CommandQueue> {
        self.state.queue.clone()
    }
}

/// Future that resolves to the agent's final
/// [`AgentOutput`](crate::agent::AgentOutput).
///
/// The future does not own a [`LifeToken`]: only [`AgentHandle`] clones do.
/// Dropping this future just abandons the result; whether the loop keeps
/// running is decided by whether any handles remain.
pub struct AgentOutputFuture {
    join: Mutex<Option<JoinHandle<Result<AgentOutput>>>>,
}

impl AgentOutputFuture {
    pub(crate) fn new(join: JoinHandle<Result<AgentOutput>>) -> Self {
        Self {
            join: Mutex::new(Some(join)),
        }
    }
}

impl Future for AgentOutputFuture {
    type Output = Result<AgentOutput>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut guard = self.join.lock().unwrap();
        let join = match guard.as_mut() {
            Some(j) => j,
            None => {
                return Poll::Ready(Err(AgenticError::Other(
                    "AgentOutputFuture polled after completion".into(),
                )))
            }
        };
        let pinned = Pin::new(join);
        match pinned.poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Ok(result)) => {
                *guard = None;
                Poll::Ready(result)
            }
            Poll::Ready(Err(e)) => {
                *guard = None;
                Poll::Ready(Err(AgenticError::Other(format!("agent task failed: {e}"))))
            }
        }
    }
}

impl Agent {
    /// Start the agent on a background tokio task and return a pair:
    ///
    /// - [`AgentHandle`] — cheap, clonable handle for injecting new
    ///   instructions, cancelling, or inspecting state.
    /// - [`AgentOutputFuture`] — resolves to the final
    ///   [`AgentOutput`](crate::agent::AgentOutput) once the loop exits.
    ///
    /// The loop idles after each terminal output as long as any handle is
    /// alive. Dropping the last handle calls [`AgentHandle::cancel`] for you
    /// (RAII safety); an explicit `.cancel()` does the same thing. For a
    /// pure one-shot run without a handle, use [`Agent::run`] instead — a
    /// `let (_, out) = agent.spawn(); out.await?` pattern will cancel
    /// before the first turn completes.
    ///
    /// Requires a running tokio runtime (`tokio::spawn` is invoked
    /// synchronously). Requires `.provider()` and `.instruction_prompt()`.
    pub fn spawn(self) -> (AgentHandle, AgentOutputFuture) {
        let queue = Arc::new(CommandQueue::new());
        let cancel = Arc::new(AtomicBool::new(false));
        let life = LifeToken::new(cancel.clone());

        let prepared = self
            .cancel_signal(cancel.clone())
            .command_queue(queue.clone())
            .keep_alive();

        let join = tokio::spawn(async move { prepared.run().await });

        let state = Arc::new(HandleState { queue, cancel });
        let handle = AgentHandle::new(state, life);
        let output = AgentOutputFuture::new(join);
        (handle, output)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Mutex as StdMutex;
    use std::time::Duration;

    use crate::agent::{Agent, AgentStatus, Event, EventKind};
    use crate::provider::types::{CompletionResponse, ContentBlock, Message};
    use crate::testutil::{text_response, MockProvider};
    use crate::CompletionRequest;

    #[tokio::test]
    async fn spawn_returns_handle_and_future() {
        let (handle, output) = one_shot_agent("hello");
        let clone = handle.clone();
        // AgentHandle is Clone; AgentOutputFuture is a Future. Cancel so the
        // keep-alive loop terminates.
        clone.cancel();
        let _: Result<AgentOutput> = output.await;
    }

    #[tokio::test]
    async fn spawn_starts_loop_immediately() {
        let events = EventLog::new();
        let (handle, output) = keep_alive_agent(vec![text_response("first")], &events);
        // AgentStarted is the first event emitted by run_loop — observable
        // before we await the future.
        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentStarted { .. }))
            .await;
        handle.cancel();
        let _ = output.await;
    }

    #[tokio::test]
    async fn send_enqueues_user_input_command() {
        let (handle, output) = one_shot_agent("done");
        handle.send("hi");
        let queue = handle.queue_for_test();
        let cmd = queue
            .dequeue_if(Some("anyone"), |_| true)
            .expect("queued command");
        assert_eq!(cmd.content, "hi");
        assert!(matches!(cmd.priority, QueuePriority::Next));
        assert!(matches!(cmd.source, CommandSource::UserInput));
        assert!(cmd.agent_name.is_none());
        handle.cancel();
        let _ = output.await;
    }

    #[tokio::test]
    async fn send_reaches_next_provider_request() {
        let events = EventLog::new();
        let (provider, handle, output) = keep_alive_agent_with_provider(
            vec![text_response("first"), text_response("second")],
            &events,
        );

        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        handle.send("follow-up");
        wait_until(|| provider.request_count() >= 2).await;

        let second = provider.last_request().expect("second request");
        let last_user = last_user_text(&second).expect("user message in second request");
        assert!(
            last_user.contains("follow-up"),
            "injected instruction must appear in turn 2's user message; got {last_user:?}",
        );

        handle.cancel();
        let out = output.await.expect("output");
        assert!(matches!(
            out.status,
            AgentStatus::Completed | AgentStatus::Cancelled
        ));
    }

    #[tokio::test]
    async fn clone_shares_queue() {
        let (handle, output) = one_shot_agent("done");
        let other = handle.clone();
        other.send("relay");
        let cmd = handle
            .queue_for_test()
            .dequeue_if(Some("anyone"), |_| true)
            .expect("queued command");
        assert_eq!(cmd.content, "relay");
        handle.cancel();
        let _ = output.await;
    }

    #[tokio::test]
    async fn clone_shares_cancel() {
        let (handle, output) = one_shot_agent("done");
        let other = handle.clone();
        assert!(!handle.is_cancelled());
        other.cancel();
        assert!(handle.is_cancelled() && other.is_cancelled());
        let _ = output.await;
    }

    #[tokio::test]
    async fn cancel_during_idle_preserves_completed_status() {
        let events = EventLog::new();
        let (handle, output) = keep_alive_agent(vec![text_response("first")], &events);

        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        handle.cancel();
        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentResumed))
            .await;
        let out = output.await.expect("output");
        assert_eq!(out.status, AgentStatus::Completed);
    }

    #[tokio::test]
    async fn cancel_from_spawned_task() {
        let events = EventLog::new();
        let (handle, output) = keep_alive_agent(vec![text_response("first")], &events);

        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        let canceller = handle.clone();
        tokio::spawn(async move {
            canceller.cancel();
        });
        let _ = output.await.expect("output");
    }

    #[tokio::test]
    async fn dropping_last_handle_triggers_cancel() {
        let events = EventLog::new();
        let (handle, output) = keep_alive_agent(vec![text_response("first")], &events);

        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        drop(handle);
        let out = output.await.expect("output");
        assert_eq!(out.status, AgentStatus::Completed);
    }

    #[tokio::test]
    async fn dropping_one_of_two_handles_does_not_cancel() {
        let events = EventLog::new();
        let (handle, output) = keep_alive_agent(vec![text_response("first")], &events);

        let survivor = handle.clone();
        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        drop(handle);
        // Cancel is NOT set while another handle is alive.
        assert!(!survivor.is_cancelled());
        // cleanup
        survivor.cancel();
        let _ = output.await;
    }

    #[tokio::test]
    async fn dropping_future_alone_does_not_cancel() {
        // The future holds no LifeToken, so dropping it doesn't cancel. The
        // loop keeps running — cleanup belongs to the handle.
        let events = EventLog::new();
        let (handle, output) = keep_alive_agent(vec![text_response("first")], &events);

        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        drop(output);
        assert!(!handle.is_cancelled());
        handle.cancel();
        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentFinished { .. }))
            .await;
    }

    #[tokio::test]
    async fn keep_alive_idle_and_resumed_events_still_fire() {
        let events = EventLog::new();
        let (provider, handle, output) = keep_alive_agent_with_provider(
            vec![text_response("first"), text_response("second")],
            &events,
        );
        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentPaused))
            .await;
        handle.send("wake up");
        wait_until(|| provider.request_count() >= 2).await;
        events
            .wait_for(|e| matches!(e.kind, EventKind::AgentResumed))
            .await;
        handle.cancel();
        let _ = output.await;
    }

    #[tokio::test]
    async fn awaiting_future_twice_returns_error() {
        // AgentOutputFuture consumes its inner JoinHandle on completion;
        // polling again surfaces an AgenticError::Other.
        let (handle, mut output) = one_shot_agent("done");
        handle.cancel();
        let _first = (&mut output).await;
        let second = output.await;
        assert!(matches!(second, Err(AgenticError::Other(_))));
    }

    fn one_shot_agent(text: &str) -> (AgentHandle, AgentOutputFuture) {
        Agent::new()
            .name("demo")
            .model_name("mock")
            .provider(Arc::new(MockProvider::text(text)))
            .identity_prompt("")
            .instruction_prompt("x")
            .spawn()
    }

    fn keep_alive_agent(
        responses: Vec<CompletionResponse>,
        events: &EventLog,
    ) -> (AgentHandle, AgentOutputFuture) {
        let (_, h, o) = keep_alive_agent_with_provider(responses, events);
        (h, o)
    }

    fn keep_alive_agent_with_provider(
        responses: Vec<CompletionResponse>,
        events: &EventLog,
    ) -> (Arc<MockProvider>, AgentHandle, AgentOutputFuture) {
        let provider = Arc::new(MockProvider::new(responses));
        let (h, o) = Agent::new()
            .name("root")
            .model_name("mock")
            .provider(provider.clone())
            .identity_prompt("")
            .instruction_prompt("initial")
            .event_handler(events.handler())
            .spawn();
        (provider, h, o)
    }

    struct EventLog {
        events: Arc<StdMutex<Vec<Event>>>,
    }

    impl EventLog {
        fn new() -> Self {
            Self {
                events: Arc::new(StdMutex::new(Vec::new())),
            }
        }

        fn handler(&self) -> Arc<dyn Fn(Event) + Send + Sync> {
            let events = self.events.clone();
            Arc::new(move |e| events.lock().unwrap().push(e))
        }

        async fn wait_for<F: Fn(&Event) -> bool>(&self, pred: F) {
            for _ in 0..200 {
                if self.events.lock().unwrap().iter().any(&pred) {
                    return;
                }
                tokio::time::sleep(Duration::from_millis(25)).await;
            }
            let seen: Vec<_> = self
                .events
                .lock()
                .unwrap()
                .iter()
                .map(|e| format!("{}:{:?}", e.agent_name, e.kind))
                .collect();
            panic!("timed out after 5s waiting for event; saw: {seen:#?}");
        }
    }

    async fn wait_until<F: FnMut() -> bool>(mut pred: F) {
        for _ in 0..200 {
            if pred() {
                return;
            }
            tokio::time::sleep(Duration::from_millis(25)).await;
        }
        panic!("timed out after 5s waiting for condition");
    }

    fn last_user_text(req: &CompletionRequest) -> Option<String> {
        req.messages.iter().rev().find_map(|m| match m {
            Message::User { content } => Some(
                content
                    .iter()
                    .filter_map(|b| match b {
                        ContentBlock::Text { text } => Some(text.as_str()),
                        _ => None,
                    })
                    .collect::<Vec<_>>()
                    .join("\n"),
            ),
            _ => None,
        })
    }
}