forge-guardrails 0.1.2

Foundation types for an LLM-agent workflow framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
# User Guide

Practical usage patterns for forge — from single-turn tool calling to multi-turn conversations.

For model and backend selection, see [MODEL_GUIDE.md](MODEL_GUIDE.md). For backend installation, see [BACKEND_SETUP.md](BACKEND_SETUP.md).

---

## Integration Modes

Forge's guardrail stack (retry nudges, step enforcement, error recovery, context compaction, VRAM budgeting) can be consumed in three ways. All three share the same underlying guardrail logic.

### At a glance

Each mode trades control for convenience. WorkflowRunner handles everything; the proxy applies guardrails transparently but drops workflow-level features; the middleware gives you building blocks and nothing else.

| Feature | WorkflowRunner | Proxy | Middleware |
|---------|:-:|:-:|:-:|
| Validation + rescue parsing | Yes | Yes | Yes |
| Retry nudges | Yes | Yes | Yes |
| Respond tool | Caller adds | Auto-injected | Caller adds |
| Step enforcement | Yes | No | Yes (caller wires) |
| Prerequisites | Yes | No | Yes (caller wires) |
| Max iterations | Yes | Bounded by max_retries | Caller's responsibility |
| Context compaction | Yes | Yes | Caller wires ContextManager |
| Context threshold warnings | Yes | No | Caller wires ContextManager |
| Cancellation | Between iterations | Between retries | Caller's responsibility |
| Streaming (token-by-token) | Yes | Post-hoc SSE | Caller's responsibility |
| Tool execution | Yes | No (client executes) | No (caller executes) |
| Callbacks (on_message, on_compact) | Yes | No | No |

The proxy is intentionally bare-bones — it applies response-quality guardrails (validation, rescue, retry, respond tool) without requiring workflow knowledge. Features like step enforcement and prerequisites require workflow structure that doesn't exist in the OpenAI chat completions API. See [Proxy design boundaries](#proxy-design-boundaries) for details.

### Mode 1: Standalone Runner (batteries included)

Forge owns the full agentic loop — LLM communication, guardrail policy, tool execution, and orchestration. You provide tools and a task, forge handles everything.

```python
from forge import WorkflowRunner

runner = WorkflowRunner(client=client, context_manager=ctx)
result = await runner.run(workflow, "What's the weather in Paris?")
```

**Best for:** Projects where forge is the primary framework. Scripts, pipelines, and applications built around forge from the start. See [Single-Turn Workflow](#single-turn-workflow) and [Multi-Turn Conversations](#multi-turn-conversations) below.

### Mode 2: Proxy Server (drop-in, zero code changes)

Forge sits between any client and your model server, intercepting requests and applying guardrails transparently. It speaks both the OpenAI chat-completions API and the Anthropic Messages API (`/v1/messages`), so OpenAI-compatible tools and Claude Code both work. The client doesn't know forge is there.

```bash
# External mode — you manage the backend
python -m forge.proxy --backend-url http://localhost:8080 --port 8081

# Managed mode — forge starts llama-server and the proxy together
python -m forge.proxy --backend llamaserver --gguf path/to/model.gguf --port 8081
```

Then point any client at forge instead of the model server:

```python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8081/v1")
```

**Best for:** Adding guardrails to existing tools without modifying them. Works with any tool that speaks the OpenAI-compatible API, plus Claude Code via the Anthropic Messages API — no per-client wrappers needed.

**Reliability note:** The proxy automatically injects a synthetic `respond` tool when tools are present in the request. The model calls `respond(message="...")` instead of producing bare text, keeping it in tool-calling mode where forge's full guardrail stack applies. The `respond` call is stripped from the outbound response — the client sees a normal text response and never knows the tool exists. This is essential for small local models (~8B), which cannot be trusted to choose correctly between text and tool calls — eval testing showed that trusting the model's text intent dropped workflow completion from 100% to as low as 4%. Guiding the model to a tool is a must. See [ADR-013](decisions/013-text-response-intent.md) for the full analysis.

#### Using forge with Claude Code

Claude Code speaks the Anthropic Messages API, which the proxy serves on `POST /v1/messages` — so you can point Claude Code at a forge-guarded local model. Start the proxy against any backend, then set two environment variables for the Claude Code process:

```bash
# Start the proxy (managed mode against a local GGUF; native FC by default)
python -m forge.proxy --backend llamaserver --gguf path/to/model.gguf --port 8081

# Point Claude Code at it — scope these to the claude process only
ANTHROPIC_BASE_URL=http://localhost:8081 \
ANTHROPIC_AUTH_TOKEN=forge \
claude
```

`ANTHROPIC_AUTH_TOKEN` can be any non-empty string — forge ignores it. The model name Claude Code sends is also ignored; forge serves whatever backend the proxy was started with.

**Function-calling mode.** `--mode native` (default) uses the backend's chat-template tool-calling and is the smoother default for Claude Code's heavy multi-turn tool use. `--mode prompt` injects the tool surface into the prompt for backends without a tool-calling template; whether a model stays coherent across multi-turn tool results in prompt mode varies by model, so prefer native when the backend supports it.

**Downstream protocol.**

- **Local model (default, `--backend-protocol openai`)** — forge translates Claude Code's Anthropic requests to OpenAI for llama.cpp / Ollama and converts the reply back to Anthropic SSE. Anthropic-only fields with no OpenAI analog (`cache_control`, `thinking`, `document` blocks) are dropped at that boundary; see [ADR-015](decisions/015-cache-control-preservation-path1.md).
- **Anthropic-shape downstream (`--backend-protocol anthropic`, external mode)** — forge forwards to an Anthropic Messages endpoint (e.g. LiteLLM or the Anthropic API), passing unknown fields through verbatim and preserving `cache_control` on clean turns. This path uses the Anthropic SDK: `pip install forge-guardrails[anthropic]`.

#### Proxy design boundaries

The proxy is intentionally bare-bones: it applies response-quality guardrails without requiring workflow knowledge. The following features are available in WorkflowRunner but not in the proxy, by design:

- **Step enforcement and prerequisites.** These require workflow structure (required steps, terminal tool, tool dependencies) that doesn't exist in the OpenAI chat completions API. The proxy receives tool definitions per request but has no concept of workflow progression. If you need step enforcement, use WorkflowRunner or the middleware directly.

- **Max iterations.** The proxy calls `run_inference` once per request. Each call is bounded at `max_retries + 1` LLM attempts (default 4). There is no outer loop — a runaway model cannot loop indefinitely. This is sufficient for the proxy's single-request model.

- **Real streaming.** The proxy accepts `stream=true` and returns SSE events, but the full inference completes before SSE conversion. Token-by-token streaming during inference would require validating partial responses, which is incompatible with guardrails that need complete responses (rescue parsing, retry nudges). The guardrail-first design is the proxy's value proposition.

- **Context threshold warnings.** The proxy is stateless — the client sends the full conversation history in every request and decides what to include. Context pressure is the client's concern. Compaction still fires when the budget is exceeded.

- **Cancellation on disconnect.** Client disconnects are detected but do not cancel in-flight inference. This is the same granularity as WorkflowRunner, which checks `cancel_event` between loop iterations but does not interrupt a running LLM call. The worst case is `max_retries + 1` wasted calls (default 4) for a disconnected client.

### Mode 3: Middleware (composable guardrails)

Import forge's guardrail components directly into your own orchestration loop. You own the loop, forge provides the reliability logic.

**Simple API** (two calls -- covers most use cases):

```python
from forge.guardrails import Guardrails

guardrails = Guardrails(
    tool_names=["search", "lookup", "answer"],
    required_steps=["search", "lookup"],
    terminal_tool="answer",
)

# After each LLM response:
result = guardrails.check(response)

if result.action in ("retry", "step_blocked"):
    messages.append({"role": result.nudge.role, "content": result.nudge.content})
    continue

if result.action == "fatal":
    raise RuntimeError(result.reason)

# result.action == "execute" -- run the tools, then tell forge what succeeded:
execute(result.tool_calls)
done = guardrails.record([tc.tool for tc in result.tool_calls])
```

The `Guardrails` facade covers validation, retry nudges, and step enforcement. It does **not** enforce tool prerequisites — for those, use the Granular API (`StepEnforcer.check_prerequisites`) shown next.

**Granular API** (individual components for custom control):

```python
from forge.guardrails import ResponseValidator, StepEnforcer, ErrorTracker

validator = ResponseValidator(tool_names=["search", "lookup", "answer"])
enforcer = StepEnforcer(required_steps=["search", "lookup"], terminal_tools=frozenset(["answer"]))
errors = ErrorTracker(max_retries=3, max_tool_errors=2)

# Inside your loop:
result = validator.validate(response)
if result.needs_retry:
    errors.record_retry()
    messages.append({"role": result.nudge.role, "content": result.nudge.content})
    continue

step_check = enforcer.check(result.tool_calls)
if step_check.needs_nudge:
    messages.append({"role": step_check.nudge.role, "content": step_check.nudge.content})
    continue

for tc in result.tool_calls:
    ok = execute(tc)
    enforcer.record(tc.tool)
    errors.record_result(success=ok)
```

**What you own:** The middleware provides validation, rescue parsing, retry nudges, and step enforcement. Your loop is responsible for: iteration caps, cancellation, context management (including compaction and threshold callbacks), and streaming. These are handled automatically by WorkflowRunner but are intentionally left to the caller in middleware mode — the middleware is an advisory layer, not an execution engine.

**Best for:** Framework developers embedding forge's guardrails inside a custom agent, a proprietary pipeline, or another open-source framework. For a complete runnable example showing both APIs, see [`examples/foreign_loop.py`](../examples/foreign_loop.py). For design rationale, see [ADR-011](decisions/011-guardrail-middleware.md).

### How they relate

```
forge.guardrails/            <-- extracted guardrail logic
    ^                ^
forge.server         forge.core.runner
(proxy mode)         (standalone mode)
```

The middleware layer is the foundation. Both the proxy server and the standalone runner compose the same guardrail components internally. The proxy wraps them behind an OpenAI-compatible API. The runner wraps them in a complete agentic loop. The middleware exposes them as building blocks.

| | Standalone | Proxy | Middleware |
|---|---|---|---|
| Who owns the loop? | Forge | Forge (transparent) | You |
| Code changes needed? | Build on forge | Change one URL | Import + integrate |
| Works with existing tools? | No | Yes | Depends on integration |
| Best for | New projects | Existing toolchains | Framework developers |

---

## Concepts

A forge workflow has four main pieces:

- **Tools** — Python functions the LLM can call, each described by a `ToolSpec` with typed parameters.
- **Workflow** — A named bundle of tools, with optional `required_steps` (tools the LLM *must* call) and a `terminal_tool` (the tool or tools that end the workflow — accepts `str` or `list[str]`).
- **Client** — An LLM backend adapter (`OllamaClient`, `LlamafileClient`, `AnthropicClient`).
- **Runner** — `WorkflowRunner` drives the agentic loop: send messages, parse tool calls, execute tools, enforce guardrails, manage compaction.

---

## Single-Turn Workflow

A two-step weather workflow: look up weather, then report it.

```python
from pydantic import BaseModel, Field
from forge.core.workflow import Workflow, ToolDef, ToolSpec
from forge.core.runner import WorkflowRunner
from forge.clients.llamafile import LlamafileClient
from forge.server import setup_backend, BudgetMode

# Define tools
def get_weather(city: str) -> str:
    return f"72°F and sunny in {city}"

def report_weather(city: str, weather: str) -> str:
    return f"Weather report: {weather}"

class GetWeatherParams(BaseModel):
    city: str = Field(description="City name")

class ReportWeatherParams(BaseModel):
    city: str = Field(description="City name")
    weather: str = Field(description="Weather description")

workflow = Workflow(
    name="weather",
    description="Look up weather and report it.",
    tools={
        "get_weather": ToolDef(
            spec=ToolSpec(
                name="get_weather",
                description="Get current weather for a city",
                parameters=GetWeatherParams,
            ),
            callable=get_weather,
        ),
        "report_weather": ToolDef(
            spec=ToolSpec(
                name="report_weather",
                description="Report the weather",
                parameters=ReportWeatherParams,
            ),
            callable=report_weather,
        ),
    },
    required_steps=["get_weather"],
    terminal_tool="report_weather",
)

# setup_backend() auto-manages llama-server: starts the process, health-checks,
# resolves a VRAM-aware context budget, and returns a ContextManager ready to use.
server, ctx = await setup_backend(
    backend="llamaserver",
    gguf_path="path/to/Ministral-3-8B-Instruct-2512-Q8_0.gguf",
    budget_mode=BudgetMode.FORGE_FULL,
)
# Or manage the server yourself and create the ContextManager directly:
# ctx = ContextManager(strategy=TieredCompact(keep_recent=2), budget_tokens=8192)

client = LlamafileClient(
    gguf_path="path/to/Ministral-3-8B-Instruct-2512-Q8_0.gguf",
    mode="native",
    recommended_sampling=True,
)
runner = WorkflowRunner(client=client, context_manager=ctx, stream=True)
await runner.run(workflow, "What's the weather in Paris?")
await server.stop()
```

### What happens under the hood

1. `setup_backend()` starts the server, detects available VRAM, and calculates a context budget.
2. `WorkflowRunner.run()` builds a system prompt describing the available tools.
3. The LLM calls `get_weather(city="Paris")` — forge executes it and feeds the result back.
4. Step enforcement verifies `get_weather` was called (it's in `required_steps`).
5. The LLM calls `report_weather(...)` — forge executes it, sees it's the `terminal_tool`, and ends the loop.
6. If any step fails: retry nudges, rescue loops, and error recovery kick in automatically. Step-enforcement and prerequisite violations surface as tool-error responses on the tool channel (the same wire shape models are trained on for "tool call failed, try again"); bare-text retry nudges still arrive as user messages.

---

## Multi-Turn Conversations

`WorkflowRunner` accepts an optional `on_message` callback that fires each time a `Message` is appended to the conversation during `run()`. This is the primary observability hook — use it for logging, eval metric collection, or building conversation history for multi-turn flows.

- **Single-turn (default):** `on_message` fires for every message the runner creates — system prompt, user input, assistant responses, tool results, nudges.
- **Multi-turn (`initial_messages`):** `run()` accepts an optional `initial_messages` parameter that seeds the conversation with prior history. `on_message` fires **only for new messages created during this turn**, not for the replayed history.

`WorkflowRunner` does not manage server lifecycle or track conversation history across `run()` calls — both are the consumer's responsibility.

```python
from forge.server import setup_backend, BudgetMode
from forge.core.runner import WorkflowRunner
from forge.core.messages import Message, MessageMeta, MessageRole, MessageType

# 1. Start server once — stays up for the lifetime of the consumer
client = OllamaClient(model="ministral-3:8b-instruct-2512-q4_K_M", recommended_sampling=True)
server, ctx = await setup_backend(
    backend="ollama", model="ministral-3:8b-instruct-2512-q4_K_M",
    budget_mode=BudgetMode.FORGE_FULL, client=client,
)

# 2. Consumer owns the conversation history
conversation: list[Message] = []

# Turn 0 — normal run, on_message collects everything (system prompt, user input, etc.)
runner = WorkflowRunner(client=client, context_manager=ctx,
                        on_message=lambda msg: conversation.append(msg))
await runner.run(workflow, "first question")

# Turn 1+ — seed with full history, append new user message
turn_messages: list[Message] = []
runner = WorkflowRunner(client=client, context_manager=ctx,
                        on_message=lambda msg: turn_messages.append(msg))
seed = list(conversation)
seed.append(Message(MessageRole.USER, "follow-up question",
                    MessageMeta(MessageType.USER_INPUT)))
await runner.run(workflow, "follow-up question", initial_messages=seed)
conversation.extend(turn_messages)

# 3. Shut down when the consumer is done (not per-turn)
await server.stop()
```

The system prompt lives in `conversation` from turn 0 — it is not rebuilt or duplicated on subsequent turns. `StepEnforcer` and `tool_call_counter` reset each `run()` call since they are per-turn state.

### Long-Running Sessions: Filtering Transient Messages

`on_message` emits everything the runner creates during a turn, including transient retry artifacts — failed bare text responses, retry nudges, step nudges, and prerequisite nudges. This is by design: consumers get full visibility for logging and debugging.

For long-running sessions where conversation history persists across turns, these transient messages accumulate. The model sees its own past failures and corrective nudges on every subsequent turn, polluting effective context and degrading coherence — especially on smaller models (8-14B).

**Who's affected:** Any consumer that appends all `on_message` outputs to a persistent message list and reuses it via `initial_messages` on subsequent turns.

**Not affected:** Single-shot workflows, eval scenarios, or consumers that rebuild the message list from scratch each turn.

**Fix:** Filter transient message types before persisting. The metadata already tags these:

```python
from forge.core.messages import MessageType

TRANSIENT_TYPES = {
    MessageType.RETRY_NUDGE,
    MessageType.STEP_NUDGE,
    MessageType.PREREQUISITE_NUDGE,
    MessageType.TEXT_RESPONSE,
}

def on_message(self, msg: Message) -> None:
    if msg.metadata.type not in TRANSIENT_TYPES:
        self.messages.append(msg)
```

`TEXT_RESPONSE` is included because in tool-calling workflows, bare text is always a failed attempt that triggered a retry — the successful response comes as a `TOOL_CALL`. Consumers using the respond tool for conversational replies should keep `TEXT_RESPONSE` in their persist list.

**Why not fix this in forge?** The runner's job is to emit everything — within a turn, retry nudges are useful (the model needs to see the nudge to self-correct). The distinction between "within a turn" and "across turns" is a consumer concern. Compaction handles context overflow but doesn't proactively clean up transient messages — it fires based on token budget pressure, not session hygiene.

---

## Choosing a Backend

See [BACKEND_SETUP.md](BACKEND_SETUP.md) for the supported-backend table, boot commands, and client snippets. [MODEL_GUIDE.md](MODEL_GUIDE.md) covers which model to pick.

### Sampling Parameters

Each model family has its own recommended temperature / top_p / top_k — and those recommendations differ substantially across families. Running everything at a single default is a measurable handicap for most models. Forge ships a per-model recommendations map that consumers opt into explicitly via a constructor flag:

```python
from forge.clients import LlamafileClient

client = LlamafileClient(
    gguf_path="path/to/Qwen3.5-27B-Q4_K_M.gguf",
    mode="native",
    recommended_sampling=True,
)
```

For local-server backends, the GGUF (or llamafile) path is the canonical model identity — its filename stem (e.g. `Qwen3.5-27B-Q4_K_M`) is what forge uses for sampling-defaults lookup, the wire-format `model` field, and JSONL eval rows. For vLLM the equivalent is `model_path` (a model directory or HF repo id), whose trailing segment serves the same role. Use Ollama-style strings only with `OllamaClient`.

The flag is opt-in. Default behavior (`recommended_sampling=False`) leaves sampling to backend defaults; if forge has opinions about the model, it logs a one-shot INFO message pointing the caller at the flag. With `recommended_sampling=True`, an unknown model raises `UnsupportedModelError`.

#### Proxy mode

The proxy does not consult the recommendations map. It plumbs whatever sampling params the inbound request body carries (OpenAI-compatible fields: `temperature`, `top_p`, `top_k`, `min_p`, `repeat_penalty`, `presence_penalty`, `seed`) through to the backend on a per-call basis. The proxy's pre-built client is treated as a "blank slate" — body fields are the only sampling source.

```bash
curl http://localhost:8081/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "qwen3.5:27b-q4_K_M",
    "messages": [{"role": "user", "content": "hi"}],
    "temperature": 1.0,
    "top_p": 0.95,
    "presence_penalty": 1.5
  }'
```

To get card-recommended sampling in proxy mode, the calling client looks up `forge.clients.get_sampling_defaults(model)` and includes the values in the request body — the proxy is intentionally pure pass-through.

See [MODEL_GUIDE.md#sampling-parameters](MODEL_GUIDE.md#sampling-parameters) for the supported-models table, source citations, and override patterns.

---

## Context Management

Forge automatically manages the context window. When the conversation approaches the budget limit, tiered compaction fires:

- **Phase 1** — Summarize older tool results, keep recent messages intact.
- **Phase 2** — Compress mid-conversation exchanges, preserve system prompt and recent context.
- **Phase 3** — Aggressive compression, retain only system prompt and last few exchanges.

You can configure this via the `ContextManager`:

```python
from forge.context import ContextManager, TieredCompact, NoCompact

# Default: tiered compaction with 2 recent messages preserved
ctx = ContextManager(strategy=TieredCompact(keep_recent=2), budget_tokens=8192)

# No compaction (for short workflows that won't hit the limit)
ctx = ContextManager(strategy=NoCompact(), budget_tokens=8192)
```

Or let `setup_backend()` handle it — it detects your VRAM and calculates the budget automatically.

---

## Guardrails

Forge's guardrail stack runs automatically. Each layer can be independently disabled via [ablation presets](../tests/eval/ablation.py) for testing:

| Guardrail | What it does |
|-----------|-------------|
| **Step enforcement** | Verifies required tools were called before the terminal tool fires |
| **Prerequisites** | Enforces conditional tool dependencies (e.g. must read before edit) |
| **Retry nudges** | Prompts the LLM to try again when a tool call fails validation |
| **Rescue loops** | Recovers malformed tool calls from the LLM's text output |
| **Error recovery** | Re-prompts after tool execution errors instead of crashing |
| **Compaction** | Prevents context overflow in long conversations |

The eval harness measures each guardrail's contribution — see [EVAL_GUIDE.md](EVAL_GUIDE.md) for ablation results.

---

## Tool Prerequisites

Tools can declare conditional dependencies — "if you call this tool, you must have called tool X first." This is enforced at runtime via nudge-and-retry, the same pattern as step enforcement.

```python
ToolDef(
    spec=edit_spec,
    callable=edit_file,
    # Name-only: any prior call to read_file satisfies it
    prerequisites=["read_file"],
)

ToolDef(
    spec=edit_spec,
    callable=edit_file,
    # Arg-matched: must have called read_file with the same path
    prerequisites=[{"tool": "read_file", "match_arg": "path"}],
)
```

If the model calls a tool without satisfying its prerequisites, the runner blocks the batch and emits one tool-error response per blocked tool call (`[PrereqError] ...` on the tool channel, with `PREREQUISITE_NUDGE` message type for compaction prioritization). The model retries off the canonical "tool failed" wire shape rather than a trailing user message — friendlier to OpenAI-tool-trained models. After `max_prereq_violations` (default 2) consecutive violations, `PrerequisiteError` is raised.

Prerequisites are not included in the tool schema — the model discovers constraints via the tool-error reply, same as step enforcement.

---

## Multiple Terminal Tools

Workflows can have multiple valid exit points. Pass a list to `terminal_tool`:

```python
workflow = Workflow(
    ...
    terminal_tool=["set_ac", "no_action"],  # either can end the workflow
)
```

Internally normalized to a `frozenset` for O(1) membership checks. A single string is still accepted and works as before.

---

## Cancellation

`WorkflowRunner.run()` accepts an optional `cancel_event` parameter for cooperative cancellation:

```python
import asyncio

cancel = asyncio.Event()

# In another coroutine or callback:
cancel.set()

try:
    result = await runner.run(workflow, "task", cancel_event=cancel)
except WorkflowCancelledError as e:
    print(f"Cancelled at iteration {e.iteration}")
    print(f"Completed steps: {e.completed_steps}")
    print(f"Messages so far: {len(e.messages)}")
```

The runner checks the event once per iteration, before the inference call. This is cooperative — if the model is mid-inference, the runner waits for it to finish before checking. The `WorkflowCancelledError` includes the full conversation state for the caller to resume, discard, or log.

---

## SlotWorker — Shared Slot Access

`SlotWorker` serializes workflow execution on a single inference slot with priority-based queuing and auto-preemption. Use it when multiple callers need to share a slot — for example, a home assistant's specialist workflows (calendar, AC management, escalation) all sharing slot 1 while the main conversation runs on slot 0.

### Basic usage (FIFO)

```python
from forge import SlotWorker, WorkflowRunner

# One runner pinned to a slot, one worker wrapping it
runner = WorkflowRunner(client=client, context_manager=ctx)
worker = SlotWorker(runner)
await worker.start()

# From anywhere — multiple concurrent callers are serialized
result = await worker.submit(workflow, "do the thing")
```

### Priority

Priority is an `int` — lower values run first. Forge imposes no semantics; the consumer defines what the levels mean:

```python
# Consumer defines their own levels
USER = 0
ESCALATED = 1
ROUTINE = 2

# User-initiated request — highest priority
result = await worker.submit(calendar_wf, "what's on my schedule?", priority=USER)

# Background cron — lowest priority, can be preempted
result = await worker.submit(ac_wf, "check temperature", priority=ROUTINE)
```

Without an explicit priority, all tasks default to 0 (pure FIFO).

### Auto-preemption

If a higher-priority task is submitted while a lower-priority task is running, the running task is automatically cancelled and the higher-priority task takes over. The cancelled task's `submit()` raises `WorkflowCancelledError`.

```python
# Routine AC check is running (priority=2)...
# User asks about calendar (priority=0) — AC check is auto-cancelled
result = await worker.submit(calendar_wf, "what's next?", priority=0)
```

You can also cancel manually:

```python
worker.cancel_current()  # cancels whatever is running
```

### Multi-slot architecture

For multi-slot setups (e.g., with `--kv-unified`), create one `SlotWorker` per shared slot. The main conversation slot typically doesn't need a worker — it's dedicated to one persistent session.

```python
# Slot 0: main conversation (no worker needed — dedicated)
main_client = LlamafileClient(gguf_path="path/to/model.gguf", slot_id=0)
main_runner = WorkflowRunner(client=main_client, context_manager=ctx)

# Slot 1: shared specialist slot (needs a worker)
service_client = LlamafileClient(gguf_path="path/to/model.gguf", slot_id=1)
service_runner = WorkflowRunner(client=service_client, context_manager=ctx)
service_worker = SlotWorker(service_runner)
await service_worker.start()

# Tools route through the worker
async def query_calendar(**kwargs):
    return await service_worker.submit(calendar_wf, kwargs["query"], priority=0)
```